Skip to main content

synaps_cli/runtime/
mod.rs

1use reqwest::Client;
2use serde_json::{json, Value};
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Duration;
6use crate::{Result, RuntimeError, ToolRegistry};
7use std::sync::Mutex;
8use tokio::sync::{mpsc, RwLock};
9use tokio_stream::wrappers::UnboundedReceiverStream;
10use tokio_util::sync::CancellationToken;
11use futures::stream::Stream;
12use std::pin::Pin;
13
14mod types;
15mod auth;
16mod api;
17mod api_sync;
18mod request;
19mod stream;
20mod helpers;
21mod sse;
22mod sse_types;
23pub mod subagent;
24pub mod openai;
25pub mod telemetry;
26
27pub use types::{StreamEvent, LlmEvent, SessionEvent, AgentEvent};
28use types::AuthState;
29use auth::AuthMethods;
30use api::ApiMethods;
31use stream::StreamMethods;
32use helpers::HelperMethods;
33
34/// Result of resolving before_tool_call extension policy.
35pub enum BeforeToolCallDecision {
36    Continue { input: Value },
37    Block { reason: String },
38}
39
40/// Emit a `before_tool_call` event and include the runtime tool name when it
41/// differs from the API-safe name.
42pub async fn emit_before_tool_call(
43    hook_bus: &Arc<crate::extensions::hooks::HookBus>,
44    tool_name: &str,
45    runtime_tool_name: Option<&str>,
46    input: Value,
47) -> crate::extensions::hooks::events::HookResult {
48    let mut event = crate::extensions::hooks::events::HookEvent::before_tool_call(tool_name, input);
49    if let Some(runtime_tool_name) = runtime_tool_name {
50        event.tool_runtime_name = Some(runtime_tool_name.to_string());
51    }
52    hook_bus.emit(&event).await
53}
54
55
56/// Resolve a before_tool_call result that may request user confirmation.
57///
58/// When `auto_approve_confirms` is true, `Confirm` is short-circuited to `Continue`.
59/// Headless/non-interactive callers with `auto_approve_confirms = false` fail closed.
60pub async fn resolve_before_tool_call_result(
61    hook_result: crate::extensions::hooks::events::HookResult,
62    secret_prompt: Option<&crate::tools::SecretPromptHandle>,
63    auto_approve_confirms: bool,
64) -> crate::extensions::hooks::events::HookResult {
65    match hook_result {
66        crate::extensions::hooks::events::HookResult::Confirm { message } => {
67            if auto_approve_confirms {
68                tracing::info!(message = %message, "confirm auto-approved (auto_approve_confirms=true)");
69                return crate::extensions::hooks::events::HookResult::Continue;
70            }
71
72            let Some(prompt) = secret_prompt else {
73                return crate::extensions::hooks::events::HookResult::Block {
74                    reason: format!(
75                        "Tool call requires confirmation but no interactive prompt is available: {}",
76                        message
77                    ),
78                };
79            };
80
81            let response = prompt
82                .prompt(
83                    "Confirm tool call".to_string(),
84                    format!("{}\n\nType 'yes' or 'y' to allow.", message),
85                )
86                .await;
87
88            match response.as_deref().map(str::trim) {
89                Some(answer) if answer.eq_ignore_ascii_case("yes") || answer.eq_ignore_ascii_case("y") => {
90                    crate::extensions::hooks::events::HookResult::Continue
91                }
92                _ => crate::extensions::hooks::events::HookResult::Block {
93                    reason: format!("Tool call confirmation denied: {}", message),
94                },
95            }
96        }
97        other => other,
98    }
99}
100
101/// Resolve before_tool_call policy into executable input or a block reason.
102pub async fn resolve_before_tool_call_decision(
103    original_input: Value,
104    hook_result: crate::extensions::hooks::events::HookResult,
105    secret_prompt: Option<&crate::tools::SecretPromptHandle>,
106    auto_approve_confirms: bool,
107) -> BeforeToolCallDecision {
108    match resolve_before_tool_call_result(hook_result, secret_prompt, auto_approve_confirms).await {
109        crate::extensions::hooks::events::HookResult::Block { reason } => {
110            BeforeToolCallDecision::Block { reason }
111        }
112        crate::extensions::hooks::events::HookResult::Modify { input } => {
113            BeforeToolCallDecision::Continue { input }
114        }
115        _ => BeforeToolCallDecision::Continue { input: original_input },
116    }
117}
118
119/// Emit an `after_tool_call` event and include the runtime tool name when it
120/// differs from the API-safe name.
121pub async fn emit_after_tool_call(
122    hook_bus: &Arc<crate::extensions::hooks::HookBus>,
123    tool_name: &str,
124    runtime_tool_name: Option<&str>,
125    input: Value,
126    output: String,
127) -> crate::extensions::hooks::events::HookResult {
128    let mut event = crate::extensions::hooks::events::HookEvent::after_tool_call(
129        tool_name,
130        input,
131        output,
132    );
133    if let Some(runtime_tool_name) = runtime_tool_name {
134        event.tool_runtime_name = Some(runtime_tool_name.to_string());
135    }
136    hook_bus.emit(&event).await
137}
138
139/// The core runtime — manages API communication, tool execution, authentication,
140/// and streaming for all SynapsCLI binaries (chat, chatui, server, agent, watcher).
141pub struct Runtime {
142    client: Client,
143    auth: Arc<RwLock<AuthState>>,
144    model: String,
145    tools: Arc<RwLock<ToolRegistry>>,
146    system_prompt: Option<String>,
147    thinking_budget: u32,
148    /// User override for context window size (tokens). When set, takes
149    /// precedence over the model's auto-detected window from
150    /// `models::context_window_for_model`. Lets users cap context at e.g.
151    /// 200k even on models that natively support 1M.
152    context_window_override: Option<u64>,
153    /// Model used for compaction. Falls back to claude-sonnet-4-6 if not set.
154    compaction_model: Option<String>,
155    /// Shared registry for reactive subagent handles.
156    subagent_registry: Arc<Mutex<crate::runtime::subagent::SubagentRegistry>>,
157    /// Shared event queue — for Event Bus tooling.
158    event_queue: Arc<crate::events::EventQueue>,
159    /// Path for watcher_exit tool to write handoff state (agent mode only)
160    pub watcher_exit_path: Option<PathBuf>,
161    // New configurable fields
162    max_tool_output: usize,
163    bash_timeout: u64,
164    bash_max_timeout: u64,
165    subagent_timeout: u64,
166    api_retries: u32,
167    /// Telemetry level for structured per-request API logging (opt-in).
168    telemetry_level: crate::runtime::telemetry::TelemetryLevel,
169    /// Opt into the cache-diagnosis beta (`cache-diagnosis-2026-04-07`).
170    cache_diagnostics: bool,
171    /// Last Anthropic message id (`msg_...`) — threaded into the next
172    /// request's `diagnostics.previous_message_id` when diagnostics is on.
173    /// Reserved for the cache-diagnosis beta wiring (handoff item).
174    #[allow(dead_code)]
175    last_msg_id: Arc<Mutex<Option<String>>>,
176    session_manager: std::sync::Arc<crate::tools::shell::SessionManager>,
177    /// Extension hook bus for dispatching events to extensions.
178    hook_bus: Arc<crate::extensions::hooks::HookBus>,
179    // Held to keep the reaper task alive for the Runtime's lifetime; never read directly.
180    #[allow(dead_code)]
181    reaper_handle: Option<tokio::task::JoinHandle<()>>,
182    #[allow(dead_code)]
183    reaper_cancel: Option<tokio_util::sync::CancellationToken>,
184}
185
186impl Runtime {
187    pub async fn new() -> Result<Self> {
188        let (auth_token, auth_type, refresh_token, token_expires) = AuthMethods::get_auth_token()?;
189
190        let client = Client::builder()
191            .connect_timeout(Duration::from_secs(10))
192            .timeout(Duration::from_secs(300))
193            .build()
194            .map_err(|e| RuntimeError::Config(format!("Failed to build HTTP client: {}", e)))?;
195
196        let session_manager = {
197            let config = crate::tools::shell::ShellConfig::default();
198            crate::tools::shell::SessionManager::new(config)
199        };
200
201        // Start the idle session reaper
202        let mgr = session_manager.clone();
203        let cancel = tokio_util::sync::CancellationToken::new();
204        let reaper_handle = crate::tools::shell::session::start_reaper(mgr, cancel.clone());
205
206        Ok(Runtime {
207            client,
208            auth: Arc::new(RwLock::new(AuthState {
209                auth_token,
210                auth_type,
211                refresh_token,
212                token_expires,
213            })),
214            model: crate::models::default_model().to_string(),
215            tools: Arc::new(RwLock::new(ToolRegistry::new())),
216            system_prompt: None,
217            thinking_budget: 4096,
218            context_window_override: None,
219            compaction_model: None,
220            subagent_registry: Arc::new(Mutex::new(crate::runtime::subagent::SubagentRegistry::new())),
221            event_queue: Arc::new(crate::events::EventQueue::new(1000)),
222            watcher_exit_path: None,
223            max_tool_output: 30000,
224            bash_timeout: 30,
225            bash_max_timeout: 300,
226            subagent_timeout: 300,
227            api_retries: 3,
228            telemetry_level: crate::runtime::telemetry::TelemetryLevel::Off,
229            cache_diagnostics: false,
230            last_msg_id: Arc::new(Mutex::new(None)),
231            session_manager,
232            hook_bus: Arc::new(crate::extensions::hooks::HookBus::new()),
233            reaper_handle: Some(reaper_handle),
234            reaper_cancel: Some(cancel),
235        })
236    }
237
238    pub fn set_system_prompt(&mut self, prompt: String) {
239        self.system_prompt = Some(prompt);
240    }
241
242    pub fn system_prompt(&self) -> Option<&str> {
243        self.system_prompt.as_deref()
244    }
245
246    pub fn set_model(&mut self, model: String) {
247        // Strip any health/status prefix (e.g. "✅  339ms  groq/..." → "groq/...")
248        let cleaned = if let Some(pos) = model.find("claude-") {
249            model[pos..].to_string()
250        } else if let Some(pos) = model.find('/') {
251            let before = &model[..pos];
252            let key_start = before.rfind(|c: char| !c.is_ascii_alphanumeric() && c != '-' && c != '_')
253                .map(|i| i + before[i..].chars().next().map(|c| c.len_utf8()).unwrap_or(1))
254                .unwrap_or(0);
255            model[key_start..].to_string()
256        } else {
257            model
258        };
259        self.model = cleaned;
260    }
261
262    pub fn set_tools(&mut self, tools: ToolRegistry) {
263        self.tools = Arc::new(RwLock::new(tools));
264    }
265
266    pub fn subagent_registry(&self) -> &Arc<Mutex<crate::runtime::subagent::SubagentRegistry>> {
267        &self.subagent_registry
268    }
269
270    pub fn event_queue(&self) -> &Arc<crate::events::EventQueue> {
271        &self.event_queue
272    }
273
274    /// Get a shared reference to the extension hook bus.
275    pub fn hook_bus(&self) -> &Arc<crate::extensions::hooks::HookBus> {
276        &self.hook_bus
277    }
278
279    /// Get a shared reference to the tool registry (for MCP lazy loading).
280    pub fn tools_shared(&self) -> Arc<RwLock<ToolRegistry>> {
281        Arc::clone(&self.tools)
282    }
283
284    pub fn model(&self) -> &str {
285        &self.model
286    }
287
288    pub fn http_client(&self) -> &Client {
289        &self.client
290    }
291    pub fn set_thinking_budget(&mut self, budget: u32) {
292        self.thinking_budget = budget;
293    }
294
295    pub fn set_compaction_model(&mut self, model: Option<String>) {
296        self.compaction_model = model;
297    }
298
299    pub fn set_context_window(&mut self, window: Option<u64>) {
300        self.context_window_override = window;
301    }
302
303    /// Effective context window for the current model — user override if set,
304    /// otherwise the model's native window from `models::context_window_for_model`.
305    pub fn compaction_model(&self) -> &str {
306        self.compaction_model.as_deref().unwrap_or("claude-sonnet-4-6")
307    }
308
309    pub fn context_window(&self) -> u64 {
310        self.context_window_override
311            .unwrap_or_else(|| crate::models::context_window_for_model(&self.model))
312    }
313
314    /// Apply a parsed config file to this runtime (model, thinking budget, etc.)
315    pub fn apply_config(&mut self, config: &crate::config::SynapsConfig) {
316        if let Some(ref model) = config.model {
317            self.set_model(model.clone());
318        }
319        if let Some(budget) = config.thinking_budget {
320            self.set_thinking_budget(budget);
321        }
322        self.context_window_override = config.context_window;
323        self.compaction_model = config.compaction_model.clone();
324        self.max_tool_output = config.max_tool_output;
325        self.bash_timeout = config.bash_timeout;
326        self.bash_max_timeout = config.bash_max_timeout;
327        self.subagent_timeout = config.subagent_timeout;
328        self.api_retries = config.api_retries;
329        self.telemetry_level = crate::runtime::telemetry::TelemetryLevel::from_str_key(&config.telemetry);
330        self.cache_diagnostics = config.cache_diagnostics;
331    }
332
333    pub fn thinking_budget(&self) -> u32 {
334        self.thinking_budget
335    }
336
337    pub fn max_tool_output(&self) -> usize {
338        self.max_tool_output
339    }
340
341    pub fn bash_timeout(&self) -> u64 {
342        self.bash_timeout
343    }
344
345    pub fn bash_max_timeout(&self) -> u64 {
346        self.bash_max_timeout
347    }
348
349    pub fn subagent_timeout(&self) -> u64 {
350        self.subagent_timeout
351    }
352
353    pub fn api_retries(&self) -> u32 {
354        self.api_retries
355    }
356
357    pub fn set_max_tool_output(&mut self, v: usize) {
358        self.max_tool_output = v;
359    }
360
361    pub fn set_bash_timeout(&mut self, v: u64) {
362        self.bash_timeout = v;
363    }
364
365    pub fn set_bash_max_timeout(&mut self, v: u64) {
366        self.bash_max_timeout = v;
367    }
368
369    pub fn set_subagent_timeout(&mut self, v: u64) {
370        self.subagent_timeout = v;
371    }
372
373    pub fn set_api_retries(&mut self, v: u32) {
374        self.api_retries = v;
375    }
376
377    pub fn telemetry_level(&self) -> crate::runtime::telemetry::TelemetryLevel {
378        self.telemetry_level
379    }
380
381    pub fn set_telemetry_level(&mut self, level: crate::runtime::telemetry::TelemetryLevel) {
382        self.telemetry_level = level;
383    }
384
385    pub fn cache_diagnostics(&self) -> bool {
386        self.cache_diagnostics
387    }
388
389    pub fn set_cache_diagnostics(&mut self, v: bool) {
390        self.cache_diagnostics = v;
391    }
392
393    pub fn thinking_level(&self) -> &str {
394        crate::core::models::thinking_level_for_budget(self.thinking_budget)
395    }
396
397    /// Check if the OAuth token is expired and refresh it if needed.
398    pub async fn refresh_if_needed(&self) -> Result<()> {
399        AuthMethods::refresh_if_needed(Arc::clone(&self.auth), &self.client).await
400    }
401
402    /// Make a simple non-streaming API call for compaction (no tools).
403    ///
404    /// Uses a dedicated summarization system prompt (not the user's), omits
405    /// all tools, and returns the raw text response. Caller supplies the
406    /// full message array including the serialized conversation.
407    pub async fn compact_call(&self, messages: Vec<Value>) -> Result<String> {
408        self.refresh_if_needed().await?;
409
410        use crate::core::compaction::COMPACTION_SYSTEM_PROMPT;
411
412        ApiMethods::call_api_simple(
413            &self.auth,
414            &self.client,
415            self.compaction_model(),
416            COMPACTION_SYSTEM_PROMPT,
417            self.thinking_budget,
418            &messages,
419            self.api_retries,
420        ).await
421    }
422
423    /// Run a single prompt synchronously (non-streaming). Handles tool execution
424    /// internally, looping until the model produces a final text response.
425    pub async fn run_single(&self, prompt: &str) -> Result<String> {
426        // Refresh OAuth token if expired
427        self.refresh_if_needed().await?;
428
429        let mut messages = vec![json!({"role": "user", "content": prompt})];
430        
431        loop {
432            let response = ApiMethods::call_api(
433                &self.auth,
434                &self.client,
435                &self.model,
436                &*self.tools.read().await,
437                &self.system_prompt,
438                self.thinking_budget,
439                &messages,
440                self.api_retries,
441                &api::ApiOptions {
442                    use_1m_context: self.context_window_override == Some(1_000_000),
443                },
444            ).await?;
445            
446            // Check if Claude wants to use tools
447            if let Some(content) = response["content"].as_array() {
448                let mut response_text = String::new();
449                let mut tool_uses = Vec::new();
450                
451                // Process response content
452                for item in content {
453                    match item["type"].as_str() {
454                        Some("text") => {
455                            if let Some(text) = item["text"].as_str() {
456                                response_text.push_str(text);
457                            }
458                        }
459                        Some("tool_use") => {
460                            tool_uses.push(item.clone());
461                        }
462                        _ => {}
463                    }
464                }
465                
466                // If no tool uses, return the text response
467                if tool_uses.is_empty() {
468                    return Ok(response_text);
469                }
470                
471                // Add assistant's response to conversation (only content, role)
472                messages.push(json!({
473                    "role": "assistant",
474                    "content": content
475                }));
476                
477                // Execute tools — parallel when multiple are requested
478                let mut tool_results = Vec::new();
479                
480                if tool_uses.len() == 1 {
481                    // Single tool — run inline, no spawn overhead
482                    let tool_use = &tool_uses[0];
483                    if let (Some(tool_name), Some(tool_id)) = (
484                        tool_use["name"].as_str(),
485                        tool_use["id"].as_str()
486                    ) {
487                        let input = &tool_use["input"];
488                        let result = match self.tools.read().await.get(tool_name).cloned() {
489                            Some(tool) => {
490                                let input = self.tools.read().await.translate_input_for_api_tool(tool_name, input.clone());
491                                let runtime_name = self.tools.read().await.runtime_name_for_api(tool_name).to_string();
492                                let ctx = crate::ToolContext {
493                                    channels: crate::tools::ToolChannels {
494                                        tx_delta: None,
495                                        tx_events: None,
496                                    },
497                                    capabilities: crate::tools::ToolCapabilities {
498                                        watcher_exit_path: self.watcher_exit_path.clone(),
499                                        tool_register_tx: None,
500                                        session_manager: Some(self.session_manager.clone()),
501                                        subagent_registry: Some(self.subagent_registry.clone()),
502                                        event_queue: Some(self.event_queue.clone()),
503                                        secret_prompt: None,
504                                    },
505                                    limits: crate::tools::ToolLimits {
506                                        max_tool_output: self.max_tool_output,
507                                        bash_timeout: self.bash_timeout,
508                                        bash_max_timeout: self.bash_max_timeout,
509                                        subagent_timeout: self.subagent_timeout,
510                                    },
511                                };
512                                let decision = resolve_before_tool_call_decision(
513                                    input.clone(),
514                                    emit_before_tool_call(
515                                        &self.hook_bus,
516                                        tool_name,
517                                        Some(&runtime_name),
518                                        input.clone(),
519                                    ).await,
520                                    None,
521                                    false,
522                                ).await;
523                                if let BeforeToolCallDecision::Block { reason } = decision {
524                                    format!("Tool call blocked by extension: {}", reason)
525                                } else {
526                                    let BeforeToolCallDecision::Continue { input } = decision else { unreachable!() };
527                                    let input_for_hook = input.clone();
528                                    let output = match tool.execute(input, ctx).await {
529                                        Ok(output) => output,
530                                        Err(e) => format!("Tool execution failed: {}", e),
531                                    };
532                                    let _ = emit_after_tool_call(
533                                        &self.hook_bus,
534                                        tool_name,
535                                        Some(&runtime_name),
536                                        input_for_hook,
537                                        output.clone(),
538                                    ).await;
539                                    output
540                                }
541                            }
542                            None => format!("Unknown tool: {}", tool_name),
543                        };
544                        tool_results.push(json!({
545                            "type": "tool_result",
546                            "tool_use_id": tool_id,
547                            "content": HelperMethods::truncate_tool_result(&result, self.max_tool_output)
548                        }));
549                    }
550                } else {
551                    // Multiple tools — run in parallel with JoinSet
552                    let mut join_set = tokio::task::JoinSet::new();
553                    
554                    // Capture config values before spawning (can't borrow &self in 'static spawn)
555                    let cfg_max_tool_output = self.max_tool_output;
556                    let cfg_bash_timeout = self.bash_timeout;
557                    let cfg_bash_max_timeout = self.bash_max_timeout;
558                    let cfg_subagent_timeout = self.subagent_timeout;
559                    let session_mgr = self.session_manager.clone();
560                    let cfg_subagent_registry = self.subagent_registry.clone();
561                    let cfg_event_queue = self.event_queue.clone();
562                    let cfg_hook_bus = self.hook_bus.clone();
563                    
564                    for tool_use in &tool_uses {
565                        if let (Some(tool_name), Some(tool_id)) = (
566                            tool_use["name"].as_str().map(|s| s.to_string()),
567                            tool_use["id"].as_str().map(|s| s.to_string()),
568                        ) {
569                            let input = tool_use["input"].clone();
570                            let tools_snapshot = self.tools.read().await;
571                            let input = tools_snapshot.translate_input_for_api_tool(&tool_name, input);
572                            let runtime_name = tools_snapshot.runtime_name_for_api(&tool_name).to_string();
573                            let tool = tools_snapshot.get(&tool_name).cloned();
574                            drop(tools_snapshot);
575                            let exit_path = self.watcher_exit_path.clone();
576                            let session_mgr_inner = session_mgr.clone();
577                            let registry_inner = cfg_subagent_registry.clone();
578                            let event_queue_inner = cfg_event_queue.clone();
579                            let hook_bus_inner = cfg_hook_bus.clone();
580                            let tool_name_for_hook = tool_name.clone();
581                            let runtime_name_for_hook = runtime_name.clone();
582                            
583                            join_set.spawn(async move {
584                                let result = match tool {
585                                    Some(t) => {
586                                        let decision = crate::runtime::resolve_before_tool_call_decision(
587                                            input.clone(),
588                                            crate::runtime::emit_before_tool_call(
589                                                &hook_bus_inner,
590                                                &tool_name_for_hook,
591                                                Some(&runtime_name_for_hook),
592                                                input.clone(),
593                                            ).await,
594                                            None,
595                                            false,
596                                        ).await;
597                                        if let crate::runtime::BeforeToolCallDecision::Block { reason } = decision {
598                                            format!("Tool call blocked by extension: {}", reason)
599                                        } else {
600                                        let crate::runtime::BeforeToolCallDecision::Continue { input } = decision else { unreachable!() };
601                                        let ctx = crate::ToolContext {
602                                            channels: crate::tools::ToolChannels {
603                                                tx_delta: None,
604                                                tx_events: None,
605                                            },
606                                            capabilities: crate::tools::ToolCapabilities {
607                                                watcher_exit_path: exit_path,
608                                                tool_register_tx: None,
609                                                session_manager: Some(session_mgr_inner),
610                                                subagent_registry: Some(registry_inner),
611                                                event_queue: Some(event_queue_inner),
612                                                secret_prompt: None,
613                                            },
614                                            limits: crate::tools::ToolLimits {
615                                                max_tool_output: cfg_max_tool_output,
616                                                bash_timeout: cfg_bash_timeout,
617                                                bash_max_timeout: cfg_bash_max_timeout,
618                                                subagent_timeout: cfg_subagent_timeout,
619                                            },
620                                        };
621                                        let input_for_hook = input.clone();
622                                        let output = match t.execute(input, ctx).await {
623                                            Ok(output) => output,
624                                            Err(e) => format!("Tool execution failed: {}", e),
625                                        };
626                                        let _ = crate::runtime::emit_after_tool_call(
627                                            &hook_bus_inner,
628                                            &tool_name_for_hook,
629                                            Some(&runtime_name_for_hook),
630                                            input_for_hook,
631                                            output.clone(),
632                                        ).await;
633                                        output
634                                        }
635                                    }
636                                    None => format!("Unknown tool: {}", tool_name),
637                                };
638                                (tool_id, result)
639                            });
640                        }
641                    }
642                    
643                    // Collect results, preserving order by tool_id
644                    let mut results_map = std::collections::HashMap::new();
645                    while let Some(res) = join_set.join_next().await {
646                        match res {
647                            Ok((tool_id, result)) => {
648                                results_map.insert(tool_id, result);
649                            }
650                            Err(e) => {
651                                // Task panicked — log it but don't crash
652                                tracing::error!("Parallel tool task panicked: {}", e);
653                            }
654                        }
655                    }
656                    
657                    // Build tool_results in original order — every tool_use MUST have a result
658                    for tool_use in &tool_uses {
659                        if let Some(tool_id) = tool_use["id"].as_str() {
660                            let result = results_map.remove(tool_id)
661                                .unwrap_or_else(|| "Tool execution failed: task panicked".to_string());
662                            tool_results.push(json!({
663                                "type": "tool_result",
664                                "tool_use_id": tool_id,
665                                "content": HelperMethods::truncate_tool_result(&result, self.max_tool_output)
666                            }));
667                        }
668                    }
669                }
670                
671                // Add tool results to conversation
672                messages.push(json!({
673                    "role": "user",
674                    "content": tool_results
675                }));
676                
677                // Continue the loop to get Claude's response with tool results
678            } else {
679                return Err(RuntimeError::Tool("Invalid response format".to_string()));
680            }
681        }
682    }
683
684    /// Run a prompt as a cancellable stream of [`StreamEvent`]s. Convenience wrapper
685    /// around [`run_stream_with_messages`] for single-turn usage.
686    pub async fn run_stream(&self, prompt: String, cancel: CancellationToken) -> Pin<Box<dyn Stream<Item = StreamEvent> + Send>> {
687        self.run_stream_with_messages(vec![json!({"role": "user", "content": prompt})], cancel, None, None, false).await
688    }
689
690    /// Run a multi-turn conversation as a cancellable stream of [`StreamEvent`]s.
691    /// This is the main entry point for chat UIs and agents. Handles tool execution,
692    /// API retries, and dynamic tool registration (MCP) internally.
693    pub async fn run_stream_with_messages(
694        &self,
695        messages: Vec<Value>,
696        cancel: CancellationToken,
697        steering_rx: Option<mpsc::UnboundedReceiver<String>>,
698        secret_prompt: Option<crate::tools::SecretPromptHandle>,
699        auto_approve_confirms: bool,
700    ) -> Pin<Box<dyn Stream<Item = StreamEvent> + Send>> {
701        let (tx, rx) = mpsc::unbounded_channel();
702
703        // Refresh OAuth token if expired before starting the stream.
704        if let Err(e) = self.refresh_if_needed().await {
705            let _ = tx.send(StreamEvent::Session(SessionEvent::Error(e.to_string())));
706            let _ = tx.send(StreamEvent::Session(SessionEvent::Done));
707            return Box::pin(UnboundedReceiverStream::new(rx));
708        }
709
710        // Clone the Arc, not the whole Runtime — the spawned task shares the
711        // same AuthState so mid-loop token refreshes are visible immediately.
712        let auth = Arc::clone(&self.auth);
713        let client = self.client.clone();
714        let model = self.model.clone();
715        let tools = self.tools.clone();
716        let system_prompt = self.system_prompt.clone();
717        let thinking_budget = self.thinking_budget;
718        let watcher_exit_path = self.watcher_exit_path.clone();
719        let max_tool_output = self.max_tool_output;
720        let bash_timeout = self.bash_timeout;
721        let bash_max_timeout = self.bash_max_timeout;
722        let subagent_timeout = self.subagent_timeout;
723        let api_retries = self.api_retries;
724        let session_manager = self.session_manager.clone();
725        // Opt into the 1M-context beta header only when the user explicitly
726        // requested 1M (via context_window setting). Default 200k matches
727        // Anthropic's claude-code default and gives smarter inference.
728        let subagent_registry = self.subagent_registry.clone();
729        let event_queue = self.event_queue.clone();
730        let options = api::ApiOptions {
731            use_1m_context: self.context_window_override == Some(1_000_000),
732        };
733
734        let session = crate::runtime::stream::StreamSession {
735            auth, client, options, api_retries,
736            model, tools, system_prompt, thinking_budget,
737            tx: tx.clone(), cancel, steering_rx,
738            watcher_exit_path, max_tool_output,
739            bash_timeout, bash_max_timeout, subagent_timeout,
740            session_manager, subagent_registry, event_queue, secret_prompt,
741            hook_bus: self.hook_bus.clone(),
742            auto_approve_confirms,
743            telemetry_level: self.telemetry_level,
744        };
745
746        tokio::spawn(async move {
747            if let Err(e) = StreamMethods::run_stream_internal(session, messages).await {
748                let _ = tx.send(StreamEvent::Session(SessionEvent::Error(e.to_string())));
749            }
750            let _ = tx.send(StreamEvent::Session(SessionEvent::Done));
751        });
752
753        Box::pin(UnboundedReceiverStream::new(rx))
754    }
755}
756
757impl Clone for Runtime {
758    fn clone(&self) -> Self {
759        Self {
760            client: self.client.clone(),
761            auth: Arc::clone(&self.auth),
762            model: self.model.clone(),
763            tools: self.tools.clone(),
764            system_prompt: self.system_prompt.clone(),
765            thinking_budget: self.thinking_budget,
766            context_window_override: self.context_window_override,
767            compaction_model: self.compaction_model.clone(),
768            subagent_registry: self.subagent_registry.clone(),
769            event_queue: self.event_queue.clone(),
770            watcher_exit_path: self.watcher_exit_path.clone(),
771            max_tool_output: self.max_tool_output,
772            bash_timeout: self.bash_timeout,
773            bash_max_timeout: self.bash_max_timeout,
774            subagent_timeout: self.subagent_timeout,
775            api_retries: self.api_retries,
776            telemetry_level: self.telemetry_level,
777            cache_diagnostics: self.cache_diagnostics,
778            // Subagents start their own request chain — inheriting the parent's
779            // last msg_id would produce bogus `messages_changed` diagnostics.
780            last_msg_id: Arc::new(Mutex::new(None)),
781            session_manager: self.session_manager.clone(),
782            hook_bus: self.hook_bus.clone(),
783            reaper_handle: None,  // Cloned runtimes don't own the reaper
784            reaper_cancel: None,  // Cloned runtimes don't own the reaper
785        }
786    }
787}
788
789#[cfg(test)]
790mod tests {
791    use super::*;
792
793    #[tokio::test]
794    async fn confirm_without_prompt_fails_closed() {
795        let result = resolve_before_tool_call_result(
796            crate::extensions::hooks::events::HookResult::Confirm {
797                message: "Run deploy?".into(),
798            },
799            None,
800            false,
801        )
802        .await;
803
804        assert!(matches!(
805            result,
806            crate::extensions::hooks::events::HookResult::Block { reason }
807                if reason.contains("requires confirmation") && reason.contains("Run deploy?")
808        ));
809    }
810
811    #[tokio::test]
812    async fn modify_result_replaces_tool_input() {
813        let result = resolve_before_tool_call_decision(
814            serde_json::json!({"command":"rm -rf /"}),
815            crate::extensions::hooks::events::HookResult::Modify {
816                input: serde_json::json!({"command":"echo safe"}),
817            },
818            None,
819            false,
820        ).await;
821
822        match result {
823            BeforeToolCallDecision::Continue { input } => {
824                assert_eq!(input, serde_json::json!({"command":"echo safe"}));
825            }
826            BeforeToolCallDecision::Block { reason } => panic!("unexpected block: {reason}"),
827        }
828    }
829
830    #[tokio::test]
831    async fn confirm_prompt_yes_continues() {
832        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
833        let handle = crate::tools::SecretPromptHandle::new(tx);
834
835        let task = tokio::spawn(async move {
836            let request = rx.recv().await.expect("confirm prompt request");
837            assert_eq!(request.title, "Confirm tool call");
838            assert!(request.prompt.contains("Run deploy?"));
839            let _ = request.response_tx.send(Some("yes".to_string()));
840        });
841
842        let result = resolve_before_tool_call_result(
843            crate::extensions::hooks::events::HookResult::Confirm {
844                message: "Run deploy?".into(),
845            },
846            Some(&handle),
847            false,
848        )
849        .await;
850
851        task.await.unwrap();
852        assert!(matches!(
853            result,
854            crate::extensions::hooks::events::HookResult::Continue
855        ));
856    }
857
858    #[tokio::test]
859    async fn confirm_prompt_non_yes_blocks() {
860        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
861        let handle = crate::tools::SecretPromptHandle::new(tx);
862
863        let task = tokio::spawn(async move {
864            let request = rx.recv().await.expect("confirm prompt request");
865            let _ = request.response_tx.send(Some("no".to_string()));
866        });
867
868        let result = resolve_before_tool_call_result(
869            crate::extensions::hooks::events::HookResult::Confirm {
870                message: "Run deploy?".into(),
871            },
872            Some(&handle),
873            false,
874        )
875        .await;
876
877        task.await.unwrap();
878        assert!(matches!(
879            result,
880            crate::extensions::hooks::events::HookResult::Block { reason }
881                if reason.contains("confirmation denied")
882        ));
883    }
884
885    #[test]
886    fn test_max_tokens_for_model() {
887        // Opus models should return 128000
888        assert_eq!(HelperMethods::max_tokens_for_model("claude-opus-4-6"), 128000);
889        assert_eq!(HelperMethods::max_tokens_for_model("opus-something"), 128000);
890        
891        // Non-opus models should return 64000
892        assert_eq!(HelperMethods::max_tokens_for_model("claude-sonnet-4-20250514"), 64000);
893        assert_eq!(HelperMethods::max_tokens_for_model("haiku"), 64000);
894        assert_eq!(HelperMethods::max_tokens_for_model("claude-3-haiku"), 64000);
895        assert_eq!(HelperMethods::max_tokens_for_model("some-other-model"), 64000);
896        
897        // Edge cases
898        assert_eq!(HelperMethods::max_tokens_for_model(""), 64000);
899        assert_eq!(HelperMethods::max_tokens_for_model("OPUS"), 64000); // Case sensitive - uppercase doesn't match
900        assert_eq!(HelperMethods::max_tokens_for_model("model-opus-end"), 128000); // Contains "opus" anywhere
901    }
902
903    #[test]
904    fn test_truncate_tool_result() {
905        let default_max = 30000;
906        
907        // Short string should remain unchanged
908        let short = "This is a short string.";
909        assert_eq!(HelperMethods::truncate_tool_result(short, default_max), short);
910        
911        // Exactly max should remain unchanged
912        let exact = "x".repeat(30000);
913        assert_eq!(HelperMethods::truncate_tool_result(&exact, default_max), exact);
914        
915        // String longer than max should be truncated with notice
916        let too_long = "x".repeat(30001);
917        let truncated = HelperMethods::truncate_tool_result(&too_long, default_max);
918        
919        // Should start with the truncated content
920        assert!(truncated.starts_with(&"x".repeat(30000)));
921        
922        // Should contain truncation notice with total char count
923        assert!(truncated.contains("[truncated — 30001 total chars, showing first 30000]"));
924        
925        // Should be longer than max (due to notice)
926        assert!(truncated.len() > 30000);
927        
928        // Test with a much longer string
929        let very_long = "a".repeat(50000);
930        let truncated_very_long = HelperMethods::truncate_tool_result(&very_long, default_max);
931        assert!(truncated_very_long.contains("[truncated — 50000 total chars, showing first 30000]"));
932        assert!(truncated_very_long.starts_with(&"a".repeat(30000)));
933        
934        // Test with custom limit
935        let custom_truncated = HelperMethods::truncate_tool_result(&very_long, 100);
936        assert!(custom_truncated.starts_with(&"a".repeat(100)));
937        assert!(custom_truncated.contains("[truncated — 50000 total chars, showing first 100]"));
938    }
939
940    #[test]
941    fn test_thinking_level_ranges() {
942        use crate::core::models::thinking_level_for_budget;
943
944        // Sentinel 0 = "adaptive" (S172 — model decides)
945        assert_eq!(thinking_level_for_budget(0), "adaptive");
946
947        // Low range: 1..=2048
948        assert_eq!(thinking_level_for_budget(1), "low");
949        assert_eq!(thinking_level_for_budget(1024), "low");
950        assert_eq!(thinking_level_for_budget(2048), "low");
951
952        // Medium range: 2049..=4096
953        assert_eq!(thinking_level_for_budget(2049), "medium");
954        assert_eq!(thinking_level_for_budget(3000), "medium");
955        assert_eq!(thinking_level_for_budget(4096), "medium");
956
957        // High range: 4097..=16384
958        assert_eq!(thinking_level_for_budget(4097), "high");
959        assert_eq!(thinking_level_for_budget(8192), "high");
960        assert_eq!(thinking_level_for_budget(16384), "high");
961
962        // XHigh range: _ (everything else)
963        assert_eq!(thinking_level_for_budget(16385), "xhigh");
964        assert_eq!(thinking_level_for_budget(32768), "xhigh");
965        assert_eq!(thinking_level_for_budget(100000), "xhigh");
966    }
967}