Skip to main content

agent_engine/runtime/
mod.rs

1use reqwest::Client;
2use serde_json::{json, Value};
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Duration;
6use crate::{Result, RuntimeError, ToolRegistry};
7use std::sync::Mutex;
8use tokio::sync::{mpsc, RwLock};
9use tokio_stream::wrappers::UnboundedReceiverStream;
10use tokio_util::sync::CancellationToken;
11use futures::stream::Stream;
12use std::pin::Pin;
13
14mod types;
15mod auth;
16mod api;
17mod api_sync;
18mod request;
19mod stream;
20mod helpers;
21mod sse;
22mod sse_types;
23pub mod subagent;
24pub mod openai;
25pub mod telemetry;
26pub mod compaction;
27
28pub use types::{StreamEvent, LlmEvent, SessionEvent, AgentEvent};
29use types::AuthState;
30use auth::AuthMethods;
31use api::ApiMethods;
32use stream::StreamMethods;
33use helpers::HelperMethods;
34
35/// Result of resolving before_tool_call extension policy.
36pub enum BeforeToolCallDecision {
37    Continue { input: Value },
38    Block { reason: String },
39}
40
41/// Emit a `before_tool_call` event and include the runtime tool name when it
42/// differs from the API-safe name.
43pub async fn emit_before_tool_call(
44    hook_bus: &Arc<crate::extensions::hooks::HookBus>,
45    tool_name: &str,
46    runtime_tool_name: Option<&str>,
47    input: Value,
48) -> crate::extensions::hooks::events::HookResult {
49    let mut event = crate::extensions::hooks::events::HookEvent::before_tool_call(tool_name, input);
50    if let Some(runtime_tool_name) = runtime_tool_name {
51        event.tool_runtime_name = Some(runtime_tool_name.to_string());
52    }
53    hook_bus.emit(&event).await
54}
55
56
57/// Resolve a before_tool_call result that may request user confirmation.
58///
59/// When `auto_approve_confirms` is true, `Confirm` is short-circuited to `Continue`.
60/// Headless/non-interactive callers with `auto_approve_confirms = false` fail closed.
61pub async fn resolve_before_tool_call_result(
62    hook_result: crate::extensions::hooks::events::HookResult,
63    secret_prompt: Option<&crate::tools::SecretPromptHandle>,
64    auto_approve_confirms: bool,
65) -> crate::extensions::hooks::events::HookResult {
66    match hook_result {
67        crate::extensions::hooks::events::HookResult::Confirm { message } => {
68            if auto_approve_confirms {
69                tracing::info!(message = %message, "confirm auto-approved (auto_approve_confirms=true)");
70                return crate::extensions::hooks::events::HookResult::Continue;
71            }
72
73            let Some(prompt) = secret_prompt else {
74                return crate::extensions::hooks::events::HookResult::Block {
75                    reason: format!(
76                        "Tool call requires confirmation but no interactive prompt is available: {}",
77                        message
78                    ),
79                };
80            };
81
82            let response = prompt
83                .prompt(
84                    "Confirm tool call".to_string(),
85                    format!("{}\n\nType 'yes' or 'y' to allow.", message),
86                )
87                .await;
88
89            match response.as_deref().map(str::trim) {
90                Some(answer) if answer.eq_ignore_ascii_case("yes") || answer.eq_ignore_ascii_case("y") => {
91                    crate::extensions::hooks::events::HookResult::Continue
92                }
93                _ => crate::extensions::hooks::events::HookResult::Block {
94                    reason: format!("Tool call confirmation denied: {}", message),
95                },
96            }
97        }
98        other => other,
99    }
100}
101
102/// Resolve before_tool_call policy into executable input or a block reason.
103pub async fn resolve_before_tool_call_decision(
104    original_input: Value,
105    hook_result: crate::extensions::hooks::events::HookResult,
106    secret_prompt: Option<&crate::tools::SecretPromptHandle>,
107    auto_approve_confirms: bool,
108) -> BeforeToolCallDecision {
109    match resolve_before_tool_call_result(hook_result, secret_prompt, auto_approve_confirms).await {
110        crate::extensions::hooks::events::HookResult::Block { reason } => {
111            BeforeToolCallDecision::Block { reason }
112        }
113        crate::extensions::hooks::events::HookResult::Modify { input } => {
114            BeforeToolCallDecision::Continue { input }
115        }
116        _ => BeforeToolCallDecision::Continue { input: original_input },
117    }
118}
119
120/// Emit an `after_tool_call` event and include the runtime tool name when it
121/// differs from the API-safe name.
122pub async fn emit_after_tool_call(
123    hook_bus: &Arc<crate::extensions::hooks::HookBus>,
124    tool_name: &str,
125    runtime_tool_name: Option<&str>,
126    input: Value,
127    output: String,
128) -> crate::extensions::hooks::events::HookResult {
129    let mut event = crate::extensions::hooks::events::HookEvent::after_tool_call(
130        tool_name,
131        input,
132        output,
133    );
134    if let Some(runtime_tool_name) = runtime_tool_name {
135        event.tool_runtime_name = Some(runtime_tool_name.to_string());
136    }
137    hook_bus.emit(&event).await
138}
139
140/// The core runtime — manages API communication, tool execution, authentication,
141/// and streaming for all SynapsCLI binaries (chat, chatui, server, agent, watcher).
142pub struct Runtime {
143    client: Client,
144    auth: Arc<RwLock<AuthState>>,
145    model: String,
146    tools: Arc<RwLock<ToolRegistry>>,
147    system_prompt: Option<String>,
148    thinking_budget: u32,
149    /// User override for context window size (tokens). When set, takes
150    /// precedence over the model's auto-detected window from
151    /// `models::context_window_for_model`. Lets users cap context at e.g.
152    /// 200k even on models that natively support 1M.
153    context_window_override: Option<u64>,
154    /// Model used for compaction. Falls back to claude-sonnet-4-6 if not set.
155    compaction_model: Option<String>,
156    /// Shared registry for reactive subagent handles.
157    subagent_registry: Arc<Mutex<crate::runtime::subagent::SubagentRegistry>>,
158    /// Shared event queue — for Event Bus tooling.
159    event_queue: Arc<crate::events::EventQueue>,
160    /// Path for watcher_exit tool to write handoff state (agent mode only)
161    pub watcher_exit_path: Option<PathBuf>,
162    // New configurable fields
163    max_tool_output: usize,
164    bash_timeout: u64,
165    bash_max_timeout: u64,
166    subagent_timeout: u64,
167    api_retries: u32,
168    /// Telemetry level for structured per-request API logging (opt-in).
169    telemetry_level: crate::runtime::telemetry::TelemetryLevel,
170    /// Opt into the cache-diagnosis beta (`cache-diagnosis-2026-04-07`).
171    cache_diagnostics: bool,
172    /// Prompt-cache TTL strategy (5m default | 1h | hybrid). Threaded into
173    /// every request via `ApiOptions`.
174    cache_ttl: crate::core::config::CacheTtl,
175    /// One-time-per-session latch for the silent 1h-downgrade notice
176    /// (spec §3.4.1). Shared into `ApiOptions` for every request.
177    ttl_downgrade_notified: std::sync::Arc<std::sync::atomic::AtomicBool>,
178    /// Session-scoped "1h honored at least once" latch (spec §3.4.1) —
179    /// suppresses the downgrade notice on healthy Hybrid turns where the 1h
180    /// prefix is already cached. Shared into `ApiOptions` for every request.
181    saw_1h_honored: std::sync::Arc<std::sync::atomic::AtomicBool>,
182    /// Last Anthropic message id (`msg_...`) — threaded into the next
183    /// request's `diagnostics.previous_message_id` when diagnostics is on.
184    /// Reserved for the cache-diagnosis beta wiring (handoff item).
185    #[allow(dead_code)]
186    last_msg_id: Arc<Mutex<Option<String>>>,
187    session_manager: std::sync::Arc<crate::tools::shell::SessionManager>,
188    /// Extension hook bus for dispatching events to extensions.
189    hook_bus: Arc<crate::extensions::hooks::HookBus>,
190    // Held to keep the reaper task alive for the Runtime's lifetime; never read directly.
191    #[allow(dead_code)]
192    reaper_handle: Option<tokio::task::JoinHandle<()>>,
193    #[allow(dead_code)]
194    reaper_cancel: Option<tokio_util::sync::CancellationToken>,
195}
196
197impl Runtime {
198    pub async fn new() -> Result<Self> {
199        let (auth_token, auth_type, refresh_token, token_expires) = AuthMethods::get_auth_token()?;
200
201        let client = Client::builder()
202            .connect_timeout(Duration::from_secs(10))
203            .timeout(Duration::from_secs(300))
204            .build()
205            .map_err(|e| RuntimeError::Config(format!("Failed to build HTTP client: {}", e)))?;
206
207        let session_manager = {
208            let config = crate::tools::shell::ShellConfig::default();
209            crate::tools::shell::SessionManager::new(config)
210        };
211
212        // Start the idle session reaper
213        let mgr = session_manager.clone();
214        let cancel = tokio_util::sync::CancellationToken::new();
215        let reaper_handle = crate::tools::shell::session::start_reaper(mgr, cancel.clone());
216
217        Ok(Runtime {
218            client,
219            auth: Arc::new(RwLock::new(AuthState {
220                auth_token,
221                auth_type,
222                refresh_token,
223                token_expires,
224            })),
225            model: crate::models::default_model().to_string(),
226            tools: Arc::new(RwLock::new(ToolRegistry::new())),
227            system_prompt: None,
228            thinking_budget: 4096,
229            context_window_override: None,
230            compaction_model: None,
231            subagent_registry: Arc::new(Mutex::new(crate::runtime::subagent::SubagentRegistry::new())),
232            event_queue: Arc::new(crate::events::EventQueue::new(1000)),
233            watcher_exit_path: None,
234            max_tool_output: 30000,
235            bash_timeout: 30,
236            bash_max_timeout: 300,
237            subagent_timeout: 300,
238            api_retries: 3,
239            telemetry_level: crate::runtime::telemetry::TelemetryLevel::Off,
240            cache_diagnostics: false,
241            cache_ttl: crate::core::config::CacheTtl::default(),
242            ttl_downgrade_notified: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
243            saw_1h_honored: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
244            last_msg_id: Arc::new(Mutex::new(None)),
245            session_manager,
246            hook_bus: Arc::new(crate::extensions::hooks::HookBus::new()),
247            reaper_handle: Some(reaper_handle),
248            reaper_cancel: Some(cancel),
249        })
250    }
251
252    pub fn set_system_prompt(&mut self, prompt: String) {
253        self.system_prompt = Some(prompt);
254    }
255
256    pub fn system_prompt(&self) -> Option<&str> {
257        self.system_prompt.as_deref()
258    }
259
260    pub fn set_model(&mut self, model: String) {
261        // Strip any health/status prefix (e.g. "✅  339ms  groq/..." → "groq/...")
262        let cleaned = if let Some(pos) = model.find("claude-") {
263            model[pos..].to_string()
264        } else if let Some(pos) = model.find('/') {
265            let before = &model[..pos];
266            let key_start = before.rfind(|c: char| !c.is_ascii_alphanumeric() && c != '-' && c != '_')
267                .map(|i| i + before[i..].chars().next().map(|c| c.len_utf8()).unwrap_or(1))
268                .unwrap_or(0);
269            model[key_start..].to_string()
270        } else {
271            model
272        };
273        self.model = cleaned;
274    }
275
276    pub fn set_tools(&mut self, tools: ToolRegistry) {
277        self.tools = Arc::new(RwLock::new(tools));
278    }
279
280    pub fn subagent_registry(&self) -> &Arc<Mutex<crate::runtime::subagent::SubagentRegistry>> {
281        &self.subagent_registry
282    }
283
284    pub fn event_queue(&self) -> &Arc<crate::events::EventQueue> {
285        &self.event_queue
286    }
287
288    /// Get a shared reference to the extension hook bus.
289    pub fn hook_bus(&self) -> &Arc<crate::extensions::hooks::HookBus> {
290        &self.hook_bus
291    }
292
293    /// Get a shared reference to the tool registry (for MCP lazy loading).
294    pub fn tools_shared(&self) -> Arc<RwLock<ToolRegistry>> {
295        Arc::clone(&self.tools)
296    }
297
298    pub fn model(&self) -> &str {
299        &self.model
300    }
301
302    pub fn http_client(&self) -> &Client {
303        &self.client
304    }
305    pub fn set_thinking_budget(&mut self, budget: u32) {
306        self.thinking_budget = budget;
307    }
308
309    pub fn set_compaction_model(&mut self, model: Option<String>) {
310        self.compaction_model = model;
311    }
312
313    pub fn set_context_window(&mut self, window: Option<u64>) {
314        self.context_window_override = window;
315    }
316
317    /// Effective context window for the current model — user override if set,
318    /// otherwise the model's native window from `models::context_window_for_model`.
319    pub fn compaction_model(&self) -> &str {
320        self.compaction_model.as_deref().unwrap_or("claude-sonnet-4-6")
321    }
322
323    pub fn context_window(&self) -> u64 {
324        self.context_window_override
325            .unwrap_or_else(|| crate::models::context_window_for_model(&self.model))
326    }
327
328    /// Apply a parsed config file to this runtime (model, thinking budget, etc.)
329    pub fn apply_config(&mut self, config: &crate::config::SynapsConfig) {
330        if let Some(ref model) = config.model {
331            self.set_model(model.clone());
332        }
333        if let Some(budget) = config.thinking_budget {
334            self.set_thinking_budget(budget);
335        }
336        self.context_window_override = config.context_window;
337        self.compaction_model = config.compaction_model.clone();
338        self.max_tool_output = config.max_tool_output;
339        self.bash_timeout = config.bash_timeout;
340        self.bash_max_timeout = config.bash_max_timeout;
341        self.subagent_timeout = config.subagent_timeout;
342        self.api_retries = config.api_retries;
343        self.telemetry_level = crate::runtime::telemetry::TelemetryLevel::from_str_key(&config.telemetry);
344        self.cache_diagnostics = config.cache_diagnostics;
345        self.cache_ttl = config.cache_ttl;
346    }
347
348    pub fn thinking_budget(&self) -> u32 {
349        self.thinking_budget
350    }
351
352    pub fn max_tool_output(&self) -> usize {
353        self.max_tool_output
354    }
355
356    pub fn bash_timeout(&self) -> u64 {
357        self.bash_timeout
358    }
359
360    pub fn bash_max_timeout(&self) -> u64 {
361        self.bash_max_timeout
362    }
363
364    pub fn subagent_timeout(&self) -> u64 {
365        self.subagent_timeout
366    }
367
368    pub fn api_retries(&self) -> u32 {
369        self.api_retries
370    }
371
372    pub fn set_max_tool_output(&mut self, v: usize) {
373        self.max_tool_output = v;
374    }
375
376    pub fn set_bash_timeout(&mut self, v: u64) {
377        self.bash_timeout = v;
378    }
379
380    pub fn set_bash_max_timeout(&mut self, v: u64) {
381        self.bash_max_timeout = v;
382    }
383
384    pub fn set_subagent_timeout(&mut self, v: u64) {
385        self.subagent_timeout = v;
386    }
387
388    pub fn set_api_retries(&mut self, v: u32) {
389        self.api_retries = v;
390    }
391
392    pub fn telemetry_level(&self) -> crate::runtime::telemetry::TelemetryLevel {
393        self.telemetry_level
394    }
395
396    pub fn set_telemetry_level(&mut self, level: crate::runtime::telemetry::TelemetryLevel) {
397        self.telemetry_level = level;
398    }
399
400    pub fn cache_diagnostics(&self) -> bool {
401        self.cache_diagnostics
402    }
403
404    pub fn set_cache_diagnostics(&mut self, v: bool) {
405        self.cache_diagnostics = v;
406    }
407
408    pub fn cache_ttl(&self) -> crate::core::config::CacheTtl {
409        self.cache_ttl
410    }
411
412    /// Change the cache TTL strategy mid-session. The next request re-marks
413    /// with the new TTL; the old prefix expires naturally (single-last
414    /// strategy never prunes old markers, so no invalidation logic needed).
415    pub fn set_cache_ttl(&mut self, ttl: crate::core::config::CacheTtl) {
416        self.cache_ttl = ttl;
417    }
418
419    pub fn thinking_level(&self) -> &str {
420        crate::core::models::thinking_level_for_budget(self.thinking_budget)
421    }
422
423    /// Check if the OAuth token is expired and refresh it if needed.
424    pub async fn refresh_if_needed(&self) -> Result<()> {
425        AuthMethods::refresh_if_needed(Arc::clone(&self.auth), &self.client).await
426    }
427
428    /// Make a simple non-streaming API call for compaction (no tools).
429    ///
430    /// Uses a dedicated summarization system prompt (not the user's), omits
431    /// all tools, and returns the raw text response. Caller supplies the
432    /// full message array including the serialized conversation.
433    pub async fn compact_call(&self, messages: Vec<Value>) -> Result<String> {
434        self.refresh_if_needed().await?;
435
436        use crate::runtime::compaction::COMPACTION_SYSTEM_PROMPT;
437
438        ApiMethods::call_api_simple(
439            &self.auth,
440            &self.client,
441            self.compaction_model(),
442            COMPACTION_SYSTEM_PROMPT,
443            self.thinking_budget,
444            &messages,
445            self.api_retries,
446        ).await
447    }
448
449    /// Run a single prompt synchronously (non-streaming). Handles tool execution
450    /// internally, looping until the model produces a final text response.
451    pub async fn run_single(&self, prompt: &str) -> Result<String> {
452        // Refresh OAuth token if expired
453        self.refresh_if_needed().await?;
454
455        let mut messages = vec![json!({"role": "user", "content": prompt})];
456        
457        loop {
458            let response = ApiMethods::call_api(
459                &self.auth,
460                &self.client,
461                &self.model,
462                &*self.tools.read().await,
463                &self.system_prompt,
464                self.thinking_budget,
465                &messages,
466                self.api_retries,
467                &api::ApiOptions {
468                    use_1m_context: self.context_window_override == Some(1_000_000),
469                    cache_ttl: self.cache_ttl,
470                    ttl_downgrade_notified: self.ttl_downgrade_notified.clone(),
471                    saw_1h_honored: self.saw_1h_honored.clone(),
472                },
473            ).await?;
474            
475            // Check if Claude wants to use tools
476            if let Some(content) = response["content"].as_array() {
477                let mut response_text = String::new();
478                let mut tool_uses = Vec::new();
479                
480                // Process response content
481                for item in content {
482                    match item["type"].as_str() {
483                        Some("text") => {
484                            if let Some(text) = item["text"].as_str() {
485                                response_text.push_str(text);
486                            }
487                        }
488                        Some("tool_use") => {
489                            tool_uses.push(item.clone());
490                        }
491                        _ => {}
492                    }
493                }
494                
495                // If no tool uses, return the text response
496                if tool_uses.is_empty() {
497                    return Ok(response_text);
498                }
499                
500                // Add assistant's response to conversation (only content, role)
501                messages.push(json!({
502                    "role": "assistant",
503                    "content": content
504                }));
505                
506                // Execute tools — parallel when multiple are requested
507                let mut tool_results = Vec::new();
508                
509                if tool_uses.len() == 1 {
510                    // Single tool — run inline, no spawn overhead
511                    let tool_use = &tool_uses[0];
512                    if let (Some(tool_name), Some(tool_id)) = (
513                        tool_use["name"].as_str(),
514                        tool_use["id"].as_str()
515                    ) {
516                        let input = &tool_use["input"];
517                        let result = match self.tools.read().await.get(tool_name).cloned() {
518                            Some(tool) => {
519                                let input = self.tools.read().await.translate_input_for_api_tool(tool_name, input.clone());
520                                let runtime_name = self.tools.read().await.runtime_name_for_api(tool_name).to_string();
521                                let ctx = crate::ToolContext {
522                                    channels: crate::tools::ToolChannels {
523                                        tx_delta: None,
524                                        tx_events: None,
525                                    },
526                                    capabilities: crate::tools::ToolCapabilities {
527                                        watcher_exit_path: self.watcher_exit_path.clone(),
528                                        tool_register_tx: None,
529                                        session_manager: Some(self.session_manager.clone()),
530                                        subagent_registry: Some(self.subagent_registry.clone()),
531                                        event_queue: Some(self.event_queue.clone()),
532                                        secret_prompt: None,
533                                    },
534                                    limits: crate::tools::ToolLimits {
535                                        max_tool_output: self.max_tool_output,
536                                        bash_timeout: self.bash_timeout,
537                                        bash_max_timeout: self.bash_max_timeout,
538                                        subagent_timeout: self.subagent_timeout,
539                                    },
540                                };
541                                let decision = resolve_before_tool_call_decision(
542                                    input.clone(),
543                                    emit_before_tool_call(
544                                        &self.hook_bus,
545                                        tool_name,
546                                        Some(&runtime_name),
547                                        input.clone(),
548                                    ).await,
549                                    None,
550                                    false,
551                                ).await;
552                                if let BeforeToolCallDecision::Block { reason } = decision {
553                                    format!("Tool call blocked by extension: {}", reason)
554                                } else {
555                                    let BeforeToolCallDecision::Continue { input } = decision else { unreachable!() };
556                                    let input_for_hook = input.clone();
557                                    let output = match tool.execute(input, ctx).await {
558                                        Ok(output) => output,
559                                        Err(e) => format!("Tool execution failed: {}", e),
560                                    };
561                                    let _ = emit_after_tool_call(
562                                        &self.hook_bus,
563                                        tool_name,
564                                        Some(&runtime_name),
565                                        input_for_hook,
566                                        output.clone(),
567                                    ).await;
568                                    output
569                                }
570                            }
571                            None => format!("Unknown tool: {}", tool_name),
572                        };
573                        tool_results.push(json!({
574                            "type": "tool_result",
575                            "tool_use_id": tool_id,
576                            "content": HelperMethods::truncate_tool_result(&result, self.max_tool_output)
577                        }));
578                    }
579                } else {
580                    // Multiple tools — run in parallel with JoinSet
581                    let mut join_set = tokio::task::JoinSet::new();
582                    
583                    // Capture config values before spawning (can't borrow &self in 'static spawn)
584                    let cfg_max_tool_output = self.max_tool_output;
585                    let cfg_bash_timeout = self.bash_timeout;
586                    let cfg_bash_max_timeout = self.bash_max_timeout;
587                    let cfg_subagent_timeout = self.subagent_timeout;
588                    let session_mgr = self.session_manager.clone();
589                    let cfg_subagent_registry = self.subagent_registry.clone();
590                    let cfg_event_queue = self.event_queue.clone();
591                    let cfg_hook_bus = self.hook_bus.clone();
592                    
593                    for tool_use in &tool_uses {
594                        if let (Some(tool_name), Some(tool_id)) = (
595                            tool_use["name"].as_str().map(|s| s.to_string()),
596                            tool_use["id"].as_str().map(|s| s.to_string()),
597                        ) {
598                            let input = tool_use["input"].clone();
599                            let tools_snapshot = self.tools.read().await;
600                            let input = tools_snapshot.translate_input_for_api_tool(&tool_name, input);
601                            let runtime_name = tools_snapshot.runtime_name_for_api(&tool_name).to_string();
602                            let tool = tools_snapshot.get(&tool_name).cloned();
603                            drop(tools_snapshot);
604                            let exit_path = self.watcher_exit_path.clone();
605                            let session_mgr_inner = session_mgr.clone();
606                            let registry_inner = cfg_subagent_registry.clone();
607                            let event_queue_inner = cfg_event_queue.clone();
608                            let hook_bus_inner = cfg_hook_bus.clone();
609                            let tool_name_for_hook = tool_name.clone();
610                            let runtime_name_for_hook = runtime_name.clone();
611                            
612                            join_set.spawn(async move {
613                                let result = match tool {
614                                    Some(t) => {
615                                        let decision = crate::runtime::resolve_before_tool_call_decision(
616                                            input.clone(),
617                                            crate::runtime::emit_before_tool_call(
618                                                &hook_bus_inner,
619                                                &tool_name_for_hook,
620                                                Some(&runtime_name_for_hook),
621                                                input.clone(),
622                                            ).await,
623                                            None,
624                                            false,
625                                        ).await;
626                                        if let crate::runtime::BeforeToolCallDecision::Block { reason } = decision {
627                                            format!("Tool call blocked by extension: {}", reason)
628                                        } else {
629                                        let crate::runtime::BeforeToolCallDecision::Continue { input } = decision else { unreachable!() };
630                                        let ctx = crate::ToolContext {
631                                            channels: crate::tools::ToolChannels {
632                                                tx_delta: None,
633                                                tx_events: None,
634                                            },
635                                            capabilities: crate::tools::ToolCapabilities {
636                                                watcher_exit_path: exit_path,
637                                                tool_register_tx: None,
638                                                session_manager: Some(session_mgr_inner),
639                                                subagent_registry: Some(registry_inner),
640                                                event_queue: Some(event_queue_inner),
641                                                secret_prompt: None,
642                                            },
643                                            limits: crate::tools::ToolLimits {
644                                                max_tool_output: cfg_max_tool_output,
645                                                bash_timeout: cfg_bash_timeout,
646                                                bash_max_timeout: cfg_bash_max_timeout,
647                                                subagent_timeout: cfg_subagent_timeout,
648                                            },
649                                        };
650                                        let input_for_hook = input.clone();
651                                        let output = match t.execute(input, ctx).await {
652                                            Ok(output) => output,
653                                            Err(e) => format!("Tool execution failed: {}", e),
654                                        };
655                                        let _ = crate::runtime::emit_after_tool_call(
656                                            &hook_bus_inner,
657                                            &tool_name_for_hook,
658                                            Some(&runtime_name_for_hook),
659                                            input_for_hook,
660                                            output.clone(),
661                                        ).await;
662                                        output
663                                        }
664                                    }
665                                    None => format!("Unknown tool: {}", tool_name),
666                                };
667                                (tool_id, result)
668                            });
669                        }
670                    }
671                    
672                    // Collect results, preserving order by tool_id
673                    let mut results_map = std::collections::HashMap::new();
674                    while let Some(res) = join_set.join_next().await {
675                        match res {
676                            Ok((tool_id, result)) => {
677                                results_map.insert(tool_id, result);
678                            }
679                            Err(e) => {
680                                // Task panicked — log it but don't crash
681                                tracing::error!("Parallel tool task panicked: {}", e);
682                            }
683                        }
684                    }
685                    
686                    // Build tool_results in original order — every tool_use MUST have a result
687                    for tool_use in &tool_uses {
688                        if let Some(tool_id) = tool_use["id"].as_str() {
689                            let result = results_map.remove(tool_id)
690                                .unwrap_or_else(|| "Tool execution failed: task panicked".to_string());
691                            tool_results.push(json!({
692                                "type": "tool_result",
693                                "tool_use_id": tool_id,
694                                "content": HelperMethods::truncate_tool_result(&result, self.max_tool_output)
695                            }));
696                        }
697                    }
698                }
699                
700                // Add tool results to conversation
701                messages.push(json!({
702                    "role": "user",
703                    "content": tool_results
704                }));
705                
706                // Continue the loop to get Claude's response with tool results
707            } else {
708                return Err(RuntimeError::Tool("Invalid response format".to_string()));
709            }
710        }
711    }
712
713    /// Run a prompt as a cancellable stream of [`StreamEvent`]s. Convenience wrapper
714    /// around [`run_stream_with_messages`] for single-turn usage.
715    pub async fn run_stream(&self, prompt: String, cancel: CancellationToken) -> Pin<Box<dyn Stream<Item = StreamEvent> + Send>> {
716        self.run_stream_with_messages(vec![json!({"role": "user", "content": prompt})], cancel, None, None, false).await
717    }
718
719    /// Run a multi-turn conversation as a cancellable stream of [`StreamEvent`]s.
720    /// This is the main entry point for chat UIs and agents. Handles tool execution,
721    /// API retries, and dynamic tool registration (MCP) internally.
722    pub async fn run_stream_with_messages(
723        &self,
724        messages: Vec<Value>,
725        cancel: CancellationToken,
726        steering_rx: Option<mpsc::UnboundedReceiver<String>>,
727        secret_prompt: Option<crate::tools::SecretPromptHandle>,
728        auto_approve_confirms: bool,
729    ) -> Pin<Box<dyn Stream<Item = StreamEvent> + Send>> {
730        let (tx, rx) = mpsc::unbounded_channel();
731
732        // Refresh OAuth token if expired before starting the stream.
733        if let Err(e) = self.refresh_if_needed().await {
734            let _ = tx.send(StreamEvent::Session(SessionEvent::Error(e.to_string())));
735            let _ = tx.send(StreamEvent::Session(SessionEvent::Done));
736            return Box::pin(UnboundedReceiverStream::new(rx));
737        }
738
739        // Clone the Arc, not the whole Runtime — the spawned task shares the
740        // same AuthState so mid-loop token refreshes are visible immediately.
741        let auth = Arc::clone(&self.auth);
742        let client = self.client.clone();
743        let model = self.model.clone();
744        let tools = self.tools.clone();
745        let system_prompt = self.system_prompt.clone();
746        let thinking_budget = self.thinking_budget;
747        let watcher_exit_path = self.watcher_exit_path.clone();
748        let max_tool_output = self.max_tool_output;
749        let bash_timeout = self.bash_timeout;
750        let bash_max_timeout = self.bash_max_timeout;
751        let subagent_timeout = self.subagent_timeout;
752        let api_retries = self.api_retries;
753        let session_manager = self.session_manager.clone();
754        // Opt into the 1M-context beta header only when the user explicitly
755        // requested 1M (via context_window setting). Default 200k matches
756        // Anthropic's claude-code default and gives smarter inference.
757        let subagent_registry = self.subagent_registry.clone();
758        let event_queue = self.event_queue.clone();
759        let options = api::ApiOptions {
760            use_1m_context: self.context_window_override == Some(1_000_000),
761            cache_ttl: self.cache_ttl,
762            ttl_downgrade_notified: self.ttl_downgrade_notified.clone(),
763            saw_1h_honored: self.saw_1h_honored.clone(),
764        };
765
766        let session = crate::runtime::stream::StreamSession {
767            auth, client, options, api_retries,
768            model, tools, system_prompt, thinking_budget,
769            tx: tx.clone(), cancel, steering_rx,
770            watcher_exit_path, max_tool_output,
771            bash_timeout, bash_max_timeout, subagent_timeout,
772            session_manager, subagent_registry, event_queue, secret_prompt,
773            hook_bus: self.hook_bus.clone(),
774            auto_approve_confirms,
775            telemetry_level: self.telemetry_level,
776        };
777
778        tokio::spawn(async move {
779            if let Err(e) = StreamMethods::run_stream_internal(session, messages).await {
780                let _ = tx.send(StreamEvent::Session(SessionEvent::Error(e.to_string())));
781            }
782            let _ = tx.send(StreamEvent::Session(SessionEvent::Done));
783        });
784
785        Box::pin(UnboundedReceiverStream::new(rx))
786    }
787}
788
789impl Clone for Runtime {
790    fn clone(&self) -> Self {
791        Self {
792            client: self.client.clone(),
793            auth: Arc::clone(&self.auth),
794            model: self.model.clone(),
795            tools: self.tools.clone(),
796            system_prompt: self.system_prompt.clone(),
797            thinking_budget: self.thinking_budget,
798            context_window_override: self.context_window_override,
799            compaction_model: self.compaction_model.clone(),
800            subagent_registry: self.subagent_registry.clone(),
801            event_queue: self.event_queue.clone(),
802            watcher_exit_path: self.watcher_exit_path.clone(),
803            max_tool_output: self.max_tool_output,
804            bash_timeout: self.bash_timeout,
805            bash_max_timeout: self.bash_max_timeout,
806            subagent_timeout: self.subagent_timeout,
807            api_retries: self.api_retries,
808            telemetry_level: self.telemetry_level,
809            cache_diagnostics: self.cache_diagnostics,
810            cache_ttl: self.cache_ttl,
811            // Subagents are their own session — fresh latches so a downgrade
812            // in the subagent's chain surfaces its own (single) notice and
813            // honored-state isn't inherited from the parent.
814            ttl_downgrade_notified: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
815            saw_1h_honored: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
816            // Subagents start their own request chain — inheriting the parent's
817            // last msg_id would produce bogus `messages_changed` diagnostics.
818            last_msg_id: Arc::new(Mutex::new(None)),
819            session_manager: self.session_manager.clone(),
820            hook_bus: self.hook_bus.clone(),
821            reaper_handle: None,  // Cloned runtimes don't own the reaper
822            reaper_cancel: None,  // Cloned runtimes don't own the reaper
823        }
824    }
825}
826
827#[cfg(test)]
828mod tests {
829    use super::*;
830
831    #[tokio::test]
832    async fn confirm_without_prompt_fails_closed() {
833        let result = resolve_before_tool_call_result(
834            crate::extensions::hooks::events::HookResult::Confirm {
835                message: "Run deploy?".into(),
836            },
837            None,
838            false,
839        )
840        .await;
841
842        assert!(matches!(
843            result,
844            crate::extensions::hooks::events::HookResult::Block { reason }
845                if reason.contains("requires confirmation") && reason.contains("Run deploy?")
846        ));
847    }
848
849    #[tokio::test]
850    async fn modify_result_replaces_tool_input() {
851        let result = resolve_before_tool_call_decision(
852            serde_json::json!({"command":"rm -rf /"}),
853            crate::extensions::hooks::events::HookResult::Modify {
854                input: serde_json::json!({"command":"echo safe"}),
855            },
856            None,
857            false,
858        ).await;
859
860        match result {
861            BeforeToolCallDecision::Continue { input } => {
862                assert_eq!(input, serde_json::json!({"command":"echo safe"}));
863            }
864            BeforeToolCallDecision::Block { reason } => panic!("unexpected block: {reason}"),
865        }
866    }
867
868    #[tokio::test]
869    async fn confirm_prompt_yes_continues() {
870        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
871        let handle = crate::tools::SecretPromptHandle::new(tx);
872
873        let task = tokio::spawn(async move {
874            let request = rx.recv().await.expect("confirm prompt request");
875            assert_eq!(request.title, "Confirm tool call");
876            assert!(request.prompt.contains("Run deploy?"));
877            let _ = request.response_tx.send(Some("yes".to_string()));
878        });
879
880        let result = resolve_before_tool_call_result(
881            crate::extensions::hooks::events::HookResult::Confirm {
882                message: "Run deploy?".into(),
883            },
884            Some(&handle),
885            false,
886        )
887        .await;
888
889        task.await.unwrap();
890        assert!(matches!(
891            result,
892            crate::extensions::hooks::events::HookResult::Continue
893        ));
894    }
895
896    #[tokio::test]
897    async fn confirm_prompt_non_yes_blocks() {
898        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
899        let handle = crate::tools::SecretPromptHandle::new(tx);
900
901        let task = tokio::spawn(async move {
902            let request = rx.recv().await.expect("confirm prompt request");
903            let _ = request.response_tx.send(Some("no".to_string()));
904        });
905
906        let result = resolve_before_tool_call_result(
907            crate::extensions::hooks::events::HookResult::Confirm {
908                message: "Run deploy?".into(),
909            },
910            Some(&handle),
911            false,
912        )
913        .await;
914
915        task.await.unwrap();
916        assert!(matches!(
917            result,
918            crate::extensions::hooks::events::HookResult::Block { reason }
919                if reason.contains("confirmation denied")
920        ));
921    }
922
923    #[test]
924    fn test_max_tokens_for_model() {
925        // Opus models should return 128000
926        assert_eq!(HelperMethods::max_tokens_for_model("claude-opus-4-6"), 128000);
927        assert_eq!(HelperMethods::max_tokens_for_model("opus-something"), 128000);
928        
929        // Non-opus models should return 64000
930        assert_eq!(HelperMethods::max_tokens_for_model("claude-sonnet-4-20250514"), 64000);
931        assert_eq!(HelperMethods::max_tokens_for_model("haiku"), 64000);
932        assert_eq!(HelperMethods::max_tokens_for_model("claude-3-haiku"), 64000);
933        assert_eq!(HelperMethods::max_tokens_for_model("some-other-model"), 64000);
934        
935        // Edge cases
936        assert_eq!(HelperMethods::max_tokens_for_model(""), 64000);
937        assert_eq!(HelperMethods::max_tokens_for_model("OPUS"), 64000); // Case sensitive - uppercase doesn't match
938        assert_eq!(HelperMethods::max_tokens_for_model("model-opus-end"), 128000); // Contains "opus" anywhere
939    }
940
941    #[test]
942    fn test_truncate_tool_result() {
943        let default_max = 30000;
944        
945        // Short string should remain unchanged
946        let short = "This is a short string.";
947        assert_eq!(HelperMethods::truncate_tool_result(short, default_max), short);
948        
949        // Exactly max should remain unchanged
950        let exact = "x".repeat(30000);
951        assert_eq!(HelperMethods::truncate_tool_result(&exact, default_max), exact);
952        
953        // String longer than max should be truncated with notice
954        let too_long = "x".repeat(30001);
955        let truncated = HelperMethods::truncate_tool_result(&too_long, default_max);
956        
957        // Should start with the truncated content
958        assert!(truncated.starts_with(&"x".repeat(30000)));
959        
960        // Should contain truncation notice with total char count
961        assert!(truncated.contains("[truncated — 30001 total chars, showing first 30000]"));
962        
963        // Should be longer than max (due to notice)
964        assert!(truncated.len() > 30000);
965        
966        // Test with a much longer string
967        let very_long = "a".repeat(50000);
968        let truncated_very_long = HelperMethods::truncate_tool_result(&very_long, default_max);
969        assert!(truncated_very_long.contains("[truncated — 50000 total chars, showing first 30000]"));
970        assert!(truncated_very_long.starts_with(&"a".repeat(30000)));
971        
972        // Test with custom limit
973        let custom_truncated = HelperMethods::truncate_tool_result(&very_long, 100);
974        assert!(custom_truncated.starts_with(&"a".repeat(100)));
975        assert!(custom_truncated.contains("[truncated — 50000 total chars, showing first 100]"));
976    }
977
978    #[test]
979    fn test_thinking_level_ranges() {
980        use crate::core::models::thinking_level_for_budget;
981
982        // Sentinel 0 = "adaptive" (S172 — model decides)
983        assert_eq!(thinking_level_for_budget(0), "adaptive");
984
985        // Low range: 1..=2048
986        assert_eq!(thinking_level_for_budget(1), "low");
987        assert_eq!(thinking_level_for_budget(1024), "low");
988        assert_eq!(thinking_level_for_budget(2048), "low");
989
990        // Medium range: 2049..=4096
991        assert_eq!(thinking_level_for_budget(2049), "medium");
992        assert_eq!(thinking_level_for_budget(3000), "medium");
993        assert_eq!(thinking_level_for_budget(4096), "medium");
994
995        // High range: 4097..=16384
996        assert_eq!(thinking_level_for_budget(4097), "high");
997        assert_eq!(thinking_level_for_budget(8192), "high");
998        assert_eq!(thinking_level_for_budget(16384), "high");
999
1000        // XHigh range: _ (everything else)
1001        assert_eq!(thinking_level_for_budget(16385), "xhigh");
1002        assert_eq!(thinking_level_for_budget(32768), "xhigh");
1003        assert_eq!(thinking_level_for_budget(100000), "xhigh");
1004    }
1005}