Skip to main content

synaps_cli/runtime/
mod.rs

1use reqwest::Client;
2use serde_json::{json, Value};
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Duration;
6use crate::{Result, RuntimeError, ToolRegistry};
7use std::sync::Mutex;
8use tokio::sync::{mpsc, RwLock};
9use tokio_stream::wrappers::UnboundedReceiverStream;
10use tokio_util::sync::CancellationToken;
11use futures::stream::Stream;
12use std::pin::Pin;
13
14mod types;
15mod auth;
16mod api;
17mod api_sync;
18mod request;
19mod stream;
20mod helpers;
21pub mod subagent;
22pub mod openai;
23
24pub use types::{StreamEvent, LlmEvent, SessionEvent, AgentEvent};
25use types::AuthState;
26use auth::AuthMethods;
27use api::ApiMethods;
28use stream::StreamMethods;
29use helpers::HelperMethods;
30
31/// Result of resolving before_tool_call extension policy.
32pub enum BeforeToolCallDecision {
33    Continue { input: Value },
34    Block { reason: String },
35}
36
37/// Emit a `before_tool_call` event and include the runtime tool name when it
38/// differs from the API-safe name.
39pub async fn emit_before_tool_call(
40    hook_bus: &Arc<crate::extensions::hooks::HookBus>,
41    tool_name: &str,
42    runtime_tool_name: Option<&str>,
43    input: Value,
44) -> crate::extensions::hooks::events::HookResult {
45    let mut event = crate::extensions::hooks::events::HookEvent::before_tool_call(tool_name, input);
46    if let Some(runtime_tool_name) = runtime_tool_name {
47        event.tool_runtime_name = Some(runtime_tool_name.to_string());
48    }
49    hook_bus.emit(&event).await
50}
51
52
53/// Resolve a before_tool_call result that may request user confirmation.
54/// Resolve a before_tool_call result that may request user confirmation.
55///
56/// When `auto_approve_confirms` is true, `Confirm` is short-circuited to `Continue`.
57/// Headless/non-interactive callers with `auto_approve_confirms = false` fail closed.
58pub async fn resolve_before_tool_call_result(
59    hook_result: crate::extensions::hooks::events::HookResult,
60    secret_prompt: Option<&crate::tools::SecretPromptHandle>,
61    auto_approve_confirms: bool,
62) -> crate::extensions::hooks::events::HookResult {
63    match hook_result {
64        crate::extensions::hooks::events::HookResult::Confirm { message } => {
65            if auto_approve_confirms {
66                tracing::info!(message = %message, "confirm auto-approved (auto_approve_confirms=true)");
67                return crate::extensions::hooks::events::HookResult::Continue;
68            }
69
70            let Some(prompt) = secret_prompt else {
71                return crate::extensions::hooks::events::HookResult::Block {
72                    reason: format!(
73                        "Tool call requires confirmation but no interactive prompt is available: {}",
74                        message
75                    ),
76                };
77            };
78
79            let response = prompt
80                .prompt(
81                    "Confirm tool call".to_string(),
82                    format!("{}\n\nType 'yes' or 'y' to allow.", message),
83                )
84                .await;
85
86            match response.as_deref().map(str::trim) {
87                Some(answer) if answer.eq_ignore_ascii_case("yes") || answer.eq_ignore_ascii_case("y") => {
88                    crate::extensions::hooks::events::HookResult::Continue
89                }
90                _ => crate::extensions::hooks::events::HookResult::Block {
91                    reason: format!("Tool call confirmation denied: {}", message),
92                },
93            }
94        }
95        other => other,
96    }
97}
98
99/// Resolve before_tool_call policy into executable input or a block reason.
100pub async fn resolve_before_tool_call_decision(
101    original_input: Value,
102    hook_result: crate::extensions::hooks::events::HookResult,
103    secret_prompt: Option<&crate::tools::SecretPromptHandle>,
104    auto_approve_confirms: bool,
105) -> BeforeToolCallDecision {
106    match resolve_before_tool_call_result(hook_result, secret_prompt, auto_approve_confirms).await {
107        crate::extensions::hooks::events::HookResult::Block { reason } => {
108            BeforeToolCallDecision::Block { reason }
109        }
110        crate::extensions::hooks::events::HookResult::Modify { input } => {
111            BeforeToolCallDecision::Continue { input }
112        }
113        _ => BeforeToolCallDecision::Continue { input: original_input },
114    }
115}
116
117/// Emit an `after_tool_call` event and include the runtime tool name when it
118/// differs from the API-safe name.
119pub async fn emit_after_tool_call(
120    hook_bus: &Arc<crate::extensions::hooks::HookBus>,
121    tool_name: &str,
122    runtime_tool_name: Option<&str>,
123    input: Value,
124    output: String,
125) -> crate::extensions::hooks::events::HookResult {
126    let mut event = crate::extensions::hooks::events::HookEvent::after_tool_call(
127        tool_name,
128        input,
129        output,
130    );
131    if let Some(runtime_tool_name) = runtime_tool_name {
132        event.tool_runtime_name = Some(runtime_tool_name.to_string());
133    }
134    hook_bus.emit(&event).await
135}
136
137/// The core runtime — manages API communication, tool execution, authentication,
138/// and streaming for all SynapsCLI binaries (chat, chatui, server, agent, watcher).
139pub struct Runtime {
140    client: Client,
141    auth: Arc<RwLock<AuthState>>,
142    model: String,
143    tools: Arc<RwLock<ToolRegistry>>,
144    system_prompt: Option<String>,
145    thinking_budget: u32,
146    /// User override for context window size (tokens). When set, takes
147    /// precedence over the model's auto-detected window from
148    /// `models::context_window_for_model`. Lets users cap context at e.g.
149    /// 200k even on models that natively support 1M.
150    context_window_override: Option<u64>,
151    /// Model used for compaction. Falls back to claude-sonnet-4-6 if not set.
152    compaction_model: Option<String>,
153    /// Shared registry for reactive subagent handles.
154    subagent_registry: Arc<Mutex<crate::runtime::subagent::SubagentRegistry>>,
155    /// Shared event queue — for Event Bus tooling.
156    event_queue: Arc<crate::events::EventQueue>,
157    /// Path for watcher_exit tool to write handoff state (agent mode only)
158    pub watcher_exit_path: Option<PathBuf>,
159    // New configurable fields
160    max_tool_output: usize,
161    bash_timeout: u64,
162    bash_max_timeout: u64,
163    subagent_timeout: u64,
164    api_retries: u32,
165    session_manager: std::sync::Arc<crate::tools::shell::SessionManager>,
166    /// Extension hook bus for dispatching events to extensions.
167    hook_bus: Arc<crate::extensions::hooks::HookBus>,
168    // Held to keep the reaper task alive for the Runtime's lifetime; never read directly.
169    #[allow(dead_code)]
170    reaper_handle: Option<tokio::task::JoinHandle<()>>,
171    #[allow(dead_code)]
172    reaper_cancel: Option<tokio_util::sync::CancellationToken>,
173}
174
175impl Runtime {
176    pub async fn new() -> Result<Self> {
177        let (auth_token, auth_type, refresh_token, token_expires) = AuthMethods::get_auth_token()?;
178
179        let client = Client::builder()
180            .connect_timeout(Duration::from_secs(10))
181            .timeout(Duration::from_secs(300))
182            .build()
183            .map_err(|e| RuntimeError::Config(format!("Failed to build HTTP client: {}", e)))?;
184
185        let session_manager = {
186            let config = crate::tools::shell::ShellConfig::default();
187            crate::tools::shell::SessionManager::new(config)
188        };
189
190        // Start the idle session reaper
191        let mgr = session_manager.clone();
192        let cancel = tokio_util::sync::CancellationToken::new();
193        let reaper_handle = crate::tools::shell::session::start_reaper(mgr, cancel.clone());
194
195        Ok(Runtime {
196            client,
197            auth: Arc::new(RwLock::new(AuthState {
198                auth_token,
199                auth_type,
200                refresh_token,
201                token_expires,
202            })),
203            model: crate::models::default_model().to_string(),
204            tools: Arc::new(RwLock::new(ToolRegistry::new())),
205            system_prompt: None,
206            thinking_budget: 4096,
207            context_window_override: None,
208            compaction_model: None,
209            subagent_registry: Arc::new(Mutex::new(crate::runtime::subagent::SubagentRegistry::new())),
210            event_queue: Arc::new(crate::events::EventQueue::new(1000)),
211            watcher_exit_path: None,
212            max_tool_output: 30000,
213            bash_timeout: 30,
214            bash_max_timeout: 300,
215            subagent_timeout: 300,
216            api_retries: 3,
217            session_manager,
218            hook_bus: Arc::new(crate::extensions::hooks::HookBus::new()),
219            reaper_handle: Some(reaper_handle),
220            reaper_cancel: Some(cancel),
221        })
222    }
223
224    pub fn set_system_prompt(&mut self, prompt: String) {
225        self.system_prompt = Some(prompt);
226    }
227
228    pub fn system_prompt(&self) -> Option<&str> {
229        self.system_prompt.as_deref()
230    }
231
232    pub fn set_model(&mut self, model: String) {
233        // Strip any health/status prefix (e.g. "✅  339ms  groq/..." → "groq/...")
234        let cleaned = if let Some(pos) = model.find("claude-") {
235            model[pos..].to_string()
236        } else if let Some(pos) = model.find('/') {
237            let before = &model[..pos];
238            let key_start = before.rfind(|c: char| !c.is_ascii_alphanumeric() && c != '-' && c != '_')
239                .map(|i| i + before[i..].chars().next().map(|c| c.len_utf8()).unwrap_or(1))
240                .unwrap_or(0);
241            model[key_start..].to_string()
242        } else {
243            model
244        };
245        self.model = cleaned;
246    }
247
248    pub fn set_tools(&mut self, tools: ToolRegistry) {
249        self.tools = Arc::new(RwLock::new(tools));
250    }
251
252    pub fn subagent_registry(&self) -> &Arc<Mutex<crate::runtime::subagent::SubagentRegistry>> {
253        &self.subagent_registry
254    }
255
256    pub fn event_queue(&self) -> &Arc<crate::events::EventQueue> {
257        &self.event_queue
258    }
259
260    /// Get a shared reference to the extension hook bus.
261    pub fn hook_bus(&self) -> &Arc<crate::extensions::hooks::HookBus> {
262        &self.hook_bus
263    }
264
265    /// Get a shared reference to the tool registry (for MCP lazy loading).
266    pub fn tools_shared(&self) -> Arc<RwLock<ToolRegistry>> {
267        Arc::clone(&self.tools)
268    }
269
270    pub fn model(&self) -> &str {
271        &self.model
272    }
273
274    pub fn http_client(&self) -> &Client {
275        &self.client
276    }
277    pub fn set_thinking_budget(&mut self, budget: u32) {
278        self.thinking_budget = budget;
279    }
280
281    pub fn set_compaction_model(&mut self, model: Option<String>) {
282        self.compaction_model = model;
283    }
284
285    pub fn set_context_window(&mut self, window: Option<u64>) {
286        self.context_window_override = window;
287    }
288
289    /// Effective context window for the current model — user override if set,
290    /// otherwise the model's native window from `models::context_window_for_model`.
291    pub fn compaction_model(&self) -> &str {
292        self.compaction_model.as_deref().unwrap_or("claude-sonnet-4-6")
293    }
294
295    pub fn context_window(&self) -> u64 {
296        self.context_window_override
297            .unwrap_or_else(|| crate::models::context_window_for_model(&self.model))
298    }
299
300    /// Apply a parsed config file to this runtime (model, thinking budget, etc.)
301    pub fn apply_config(&mut self, config: &crate::config::SynapsConfig) {
302        if let Some(ref model) = config.model {
303            self.set_model(model.clone());
304        }
305        if let Some(budget) = config.thinking_budget {
306            self.set_thinking_budget(budget);
307        }
308        self.context_window_override = config.context_window;
309        self.compaction_model = config.compaction_model.clone();
310        self.max_tool_output = config.max_tool_output;
311        self.bash_timeout = config.bash_timeout;
312        self.bash_max_timeout = config.bash_max_timeout;
313        self.subagent_timeout = config.subagent_timeout;
314        self.api_retries = config.api_retries;
315    }
316
317    pub fn thinking_budget(&self) -> u32 {
318        self.thinking_budget
319    }
320
321    pub fn max_tool_output(&self) -> usize {
322        self.max_tool_output
323    }
324
325    pub fn bash_timeout(&self) -> u64 {
326        self.bash_timeout
327    }
328
329    pub fn bash_max_timeout(&self) -> u64 {
330        self.bash_max_timeout
331    }
332
333    pub fn subagent_timeout(&self) -> u64 {
334        self.subagent_timeout
335    }
336
337    pub fn api_retries(&self) -> u32 {
338        self.api_retries
339    }
340
341    pub fn set_max_tool_output(&mut self, v: usize) {
342        self.max_tool_output = v;
343    }
344
345    pub fn set_bash_timeout(&mut self, v: u64) {
346        self.bash_timeout = v;
347    }
348
349    pub fn set_bash_max_timeout(&mut self, v: u64) {
350        self.bash_max_timeout = v;
351    }
352
353    pub fn set_subagent_timeout(&mut self, v: u64) {
354        self.subagent_timeout = v;
355    }
356
357    pub fn set_api_retries(&mut self, v: u32) {
358        self.api_retries = v;
359    }
360
361    pub fn thinking_level(&self) -> &str {
362        crate::core::models::thinking_level_for_budget(self.thinking_budget)
363    }
364
365    /// Check if the OAuth token is expired and refresh it if needed.
366    pub async fn refresh_if_needed(&self) -> Result<()> {
367        AuthMethods::refresh_if_needed(Arc::clone(&self.auth), &self.client).await
368    }
369
370    /// Make a simple non-streaming API call for compaction (no tools).
371    ///
372    /// Uses a dedicated summarization system prompt (not the user's), omits
373    /// all tools, and returns the raw text response. Caller supplies the
374    /// full message array including the serialized conversation.
375    pub async fn compact_call(&self, messages: Vec<Value>) -> Result<String> {
376        self.refresh_if_needed().await?;
377
378        use crate::core::compaction::COMPACTION_SYSTEM_PROMPT;
379
380        ApiMethods::call_api_simple(
381            &self.auth,
382            &self.client,
383            self.compaction_model(),
384            COMPACTION_SYSTEM_PROMPT,
385            self.thinking_budget,
386            &messages,
387            self.api_retries,
388        ).await
389    }
390
391    /// Run a single prompt synchronously (non-streaming). Handles tool execution
392    /// internally, looping until the model produces a final text response.
393    pub async fn run_single(&self, prompt: &str) -> Result<String> {
394        // Refresh OAuth token if expired
395        self.refresh_if_needed().await?;
396
397        let mut messages = vec![json!({"role": "user", "content": prompt})];
398        
399        loop {
400            let response = ApiMethods::call_api(
401                &self.auth,
402                &self.client,
403                &self.model,
404                &*self.tools.read().await,
405                &self.system_prompt,
406                self.thinking_budget,
407                &messages,
408                self.api_retries,
409                &api::ApiOptions {
410                    use_1m_context: self.context_window_override == Some(1_000_000),
411                },
412            ).await?;
413            
414            // Check if Claude wants to use tools
415            if let Some(content) = response["content"].as_array() {
416                let mut response_text = String::new();
417                let mut tool_uses = Vec::new();
418                
419                // Process response content
420                for item in content {
421                    match item["type"].as_str() {
422                        Some("text") => {
423                            if let Some(text) = item["text"].as_str() {
424                                response_text.push_str(text);
425                            }
426                        }
427                        Some("tool_use") => {
428                            tool_uses.push(item.clone());
429                        }
430                        _ => {}
431                    }
432                }
433                
434                // If no tool uses, return the text response
435                if tool_uses.is_empty() {
436                    return Ok(response_text);
437                }
438                
439                // Add assistant's response to conversation (only content, role)
440                messages.push(json!({
441                    "role": "assistant",
442                    "content": content
443                }));
444                
445                // Execute tools — parallel when multiple are requested
446                let mut tool_results = Vec::new();
447                
448                if tool_uses.len() == 1 {
449                    // Single tool — run inline, no spawn overhead
450                    let tool_use = &tool_uses[0];
451                    if let (Some(tool_name), Some(tool_id)) = (
452                        tool_use["name"].as_str(),
453                        tool_use["id"].as_str()
454                    ) {
455                        let input = &tool_use["input"];
456                        let result = match self.tools.read().await.get(tool_name).cloned() {
457                            Some(tool) => {
458                                let input = self.tools.read().await.translate_input_for_api_tool(tool_name, input.clone());
459                                let runtime_name = self.tools.read().await.runtime_name_for_api(tool_name).to_string();
460                                let ctx = crate::ToolContext {
461                                    channels: crate::tools::ToolChannels {
462                                        tx_delta: None,
463                                        tx_events: None,
464                                    },
465                                    capabilities: crate::tools::ToolCapabilities {
466                                        watcher_exit_path: self.watcher_exit_path.clone(),
467                                        tool_register_tx: None,
468                                        session_manager: Some(self.session_manager.clone()),
469                                        subagent_registry: Some(self.subagent_registry.clone()),
470                                        event_queue: Some(self.event_queue.clone()),
471                                        secret_prompt: None,
472                                    },
473                                    limits: crate::tools::ToolLimits {
474                                        max_tool_output: self.max_tool_output,
475                                        bash_timeout: self.bash_timeout,
476                                        bash_max_timeout: self.bash_max_timeout,
477                                        subagent_timeout: self.subagent_timeout,
478                                    },
479                                };
480                                let decision = resolve_before_tool_call_decision(
481                                    input.clone(),
482                                    emit_before_tool_call(
483                                        &self.hook_bus,
484                                        &tool_name,
485                                        Some(&runtime_name),
486                                        input.clone(),
487                                    ).await,
488                                    None,
489                                    false,
490                                ).await;
491                                if let BeforeToolCallDecision::Block { reason } = decision {
492                                    format!("Tool call blocked by extension: {}", reason)
493                                } else {
494                                    let BeforeToolCallDecision::Continue { input } = decision else { unreachable!() };
495                                    let input_for_hook = input.clone();
496                                    let output = match tool.execute(input, ctx).await {
497                                        Ok(output) => output,
498                                        Err(e) => format!("Tool execution failed: {}", e),
499                                    };
500                                    let _ = emit_after_tool_call(
501                                        &self.hook_bus,
502                                        &tool_name,
503                                        Some(&runtime_name),
504                                        input_for_hook,
505                                        output.clone(),
506                                    ).await;
507                                    output
508                                }
509                            }
510                            None => format!("Unknown tool: {}", tool_name),
511                        };
512                        tool_results.push(json!({
513                            "type": "tool_result",
514                            "tool_use_id": tool_id,
515                            "content": HelperMethods::truncate_tool_result(&result, self.max_tool_output)
516                        }));
517                    }
518                } else {
519                    // Multiple tools — run in parallel with JoinSet
520                    let mut join_set = tokio::task::JoinSet::new();
521                    
522                    // Capture config values before spawning (can't borrow &self in 'static spawn)
523                    let cfg_max_tool_output = self.max_tool_output;
524                    let cfg_bash_timeout = self.bash_timeout;
525                    let cfg_bash_max_timeout = self.bash_max_timeout;
526                    let cfg_subagent_timeout = self.subagent_timeout;
527                    let session_mgr = self.session_manager.clone();
528                    let cfg_subagent_registry = self.subagent_registry.clone();
529                    let cfg_event_queue = self.event_queue.clone();
530                    let cfg_hook_bus = self.hook_bus.clone();
531                    
532                    for tool_use in &tool_uses {
533                        if let (Some(tool_name), Some(tool_id)) = (
534                            tool_use["name"].as_str().map(|s| s.to_string()),
535                            tool_use["id"].as_str().map(|s| s.to_string()),
536                        ) {
537                            let input = tool_use["input"].clone();
538                            let tools_snapshot = self.tools.read().await;
539                            let input = tools_snapshot.translate_input_for_api_tool(&tool_name, input);
540                            let runtime_name = tools_snapshot.runtime_name_for_api(&tool_name).to_string();
541                            let tool = tools_snapshot.get(&tool_name).cloned();
542                            drop(tools_snapshot);
543                            let exit_path = self.watcher_exit_path.clone();
544                            let session_mgr_inner = session_mgr.clone();
545                            let registry_inner = cfg_subagent_registry.clone();
546                            let event_queue_inner = cfg_event_queue.clone();
547                            let hook_bus_inner = cfg_hook_bus.clone();
548                            let tool_name_for_hook = tool_name.clone();
549                            let runtime_name_for_hook = runtime_name.clone();
550                            
551                            join_set.spawn(async move {
552                                let result = match tool {
553                                    Some(t) => {
554                                        let decision = crate::runtime::resolve_before_tool_call_decision(
555                                            input.clone(),
556                                            crate::runtime::emit_before_tool_call(
557                                                &hook_bus_inner,
558                                                &tool_name_for_hook,
559                                                Some(&runtime_name_for_hook),
560                                                input.clone(),
561                                            ).await,
562                                            None,
563                                            false,
564                                        ).await;
565                                        if let crate::runtime::BeforeToolCallDecision::Block { reason } = decision {
566                                            format!("Tool call blocked by extension: {}", reason)
567                                        } else {
568                                        let crate::runtime::BeforeToolCallDecision::Continue { input } = decision else { unreachable!() };
569                                        let ctx = crate::ToolContext {
570                                            channels: crate::tools::ToolChannels {
571                                                tx_delta: None,
572                                                tx_events: None,
573                                            },
574                                            capabilities: crate::tools::ToolCapabilities {
575                                                watcher_exit_path: exit_path,
576                                                tool_register_tx: None,
577                                                session_manager: Some(session_mgr_inner),
578                                                subagent_registry: Some(registry_inner),
579                                                event_queue: Some(event_queue_inner),
580                                                secret_prompt: None,
581                                            },
582                                            limits: crate::tools::ToolLimits {
583                                                max_tool_output: cfg_max_tool_output,
584                                                bash_timeout: cfg_bash_timeout,
585                                                bash_max_timeout: cfg_bash_max_timeout,
586                                                subagent_timeout: cfg_subagent_timeout,
587                                            },
588                                        };
589                                        let input_for_hook = input.clone();
590                                        let output = match t.execute(input, ctx).await {
591                                            Ok(output) => output,
592                                            Err(e) => format!("Tool execution failed: {}", e),
593                                        };
594                                        let _ = crate::runtime::emit_after_tool_call(
595                                            &hook_bus_inner,
596                                            &tool_name_for_hook,
597                                            Some(&runtime_name_for_hook),
598                                            input_for_hook,
599                                            output.clone(),
600                                        ).await;
601                                        output
602                                        }
603                                    }
604                                    None => format!("Unknown tool: {}", tool_name),
605                                };
606                                (tool_id, result)
607                            });
608                        }
609                    }
610                    
611                    // Collect results, preserving order by tool_id
612                    let mut results_map = std::collections::HashMap::new();
613                    while let Some(res) = join_set.join_next().await {
614                        match res {
615                            Ok((tool_id, result)) => {
616                                results_map.insert(tool_id, result);
617                            }
618                            Err(e) => {
619                                // Task panicked — log it but don't crash
620                                tracing::error!("Parallel tool task panicked: {}", e);
621                            }
622                        }
623                    }
624                    
625                    // Build tool_results in original order — every tool_use MUST have a result
626                    for tool_use in &tool_uses {
627                        if let Some(tool_id) = tool_use["id"].as_str() {
628                            let result = results_map.remove(tool_id)
629                                .unwrap_or_else(|| "Tool execution failed: task panicked".to_string());
630                            tool_results.push(json!({
631                                "type": "tool_result",
632                                "tool_use_id": tool_id,
633                                "content": HelperMethods::truncate_tool_result(&result, self.max_tool_output)
634                            }));
635                        }
636                    }
637                }
638                
639                // Add tool results to conversation
640                messages.push(json!({
641                    "role": "user",
642                    "content": tool_results
643                }));
644                
645                // Continue the loop to get Claude's response with tool results
646            } else {
647                return Err(RuntimeError::Tool("Invalid response format".to_string()));
648            }
649        }
650    }
651
652    /// Run a prompt as a cancellable stream of [`StreamEvent`]s. Convenience wrapper
653    /// around [`run_stream_with_messages`] for single-turn usage.
654    pub async fn run_stream(&self, prompt: String, cancel: CancellationToken) -> Pin<Box<dyn Stream<Item = StreamEvent> + Send>> {
655        self.run_stream_with_messages(vec![json!({"role": "user", "content": prompt})], cancel, None, None, false).await
656    }
657
658    /// Run a multi-turn conversation as a cancellable stream of [`StreamEvent`]s.
659    /// This is the main entry point for chat UIs and agents. Handles tool execution,
660    /// API retries, and dynamic tool registration (MCP) internally.
661    pub async fn run_stream_with_messages(
662        &self,
663        messages: Vec<Value>,
664        cancel: CancellationToken,
665        steering_rx: Option<mpsc::UnboundedReceiver<String>>,
666        secret_prompt: Option<crate::tools::SecretPromptHandle>,
667        auto_approve_confirms: bool,
668    ) -> Pin<Box<dyn Stream<Item = StreamEvent> + Send>> {
669        let (tx, rx) = mpsc::unbounded_channel();
670
671        // Refresh OAuth token if expired before starting the stream.
672        if let Err(e) = self.refresh_if_needed().await {
673            let _ = tx.send(StreamEvent::Session(SessionEvent::Error(e.to_string())));
674            let _ = tx.send(StreamEvent::Session(SessionEvent::Done));
675            return Box::pin(UnboundedReceiverStream::new(rx));
676        }
677
678        // Clone the Arc, not the whole Runtime — the spawned task shares the
679        // same AuthState so mid-loop token refreshes are visible immediately.
680        let auth = Arc::clone(&self.auth);
681        let client = self.client.clone();
682        let model = self.model.clone();
683        let tools = self.tools.clone();
684        let system_prompt = self.system_prompt.clone();
685        let thinking_budget = self.thinking_budget;
686        let watcher_exit_path = self.watcher_exit_path.clone();
687        let max_tool_output = self.max_tool_output;
688        let bash_timeout = self.bash_timeout;
689        let bash_max_timeout = self.bash_max_timeout;
690        let subagent_timeout = self.subagent_timeout;
691        let api_retries = self.api_retries;
692        let session_manager = self.session_manager.clone();
693        // Opt into the 1M-context beta header only when the user explicitly
694        // requested 1M (via context_window setting). Default 200k matches
695        // Anthropic's claude-code default and gives smarter inference.
696        let subagent_registry = self.subagent_registry.clone();
697        let event_queue = self.event_queue.clone();
698        let options = api::ApiOptions {
699            use_1m_context: self.context_window_override == Some(1_000_000),
700        };
701
702        let session = crate::runtime::stream::StreamSession {
703            auth, client, options, api_retries,
704            model, tools, system_prompt, thinking_budget,
705            tx: tx.clone(), cancel, steering_rx,
706            watcher_exit_path, max_tool_output,
707            bash_timeout, bash_max_timeout, subagent_timeout,
708            session_manager, subagent_registry, event_queue, secret_prompt,
709            hook_bus: self.hook_bus.clone(),
710            auto_approve_confirms,
711        };
712
713        tokio::spawn(async move {
714            if let Err(e) = StreamMethods::run_stream_internal(session, messages).await {
715                let _ = tx.send(StreamEvent::Session(SessionEvent::Error(e.to_string())));
716            }
717            let _ = tx.send(StreamEvent::Session(SessionEvent::Done));
718        });
719
720        Box::pin(UnboundedReceiverStream::new(rx))
721    }
722}
723
724impl Clone for Runtime {
725    fn clone(&self) -> Self {
726        Self {
727            client: self.client.clone(),
728            auth: Arc::clone(&self.auth),
729            model: self.model.clone(),
730            tools: self.tools.clone(),
731            system_prompt: self.system_prompt.clone(),
732            thinking_budget: self.thinking_budget,
733            context_window_override: self.context_window_override,
734            compaction_model: self.compaction_model.clone(),
735            subagent_registry: self.subagent_registry.clone(),
736            event_queue: self.event_queue.clone(),
737            watcher_exit_path: self.watcher_exit_path.clone(),
738            max_tool_output: self.max_tool_output,
739            bash_timeout: self.bash_timeout,
740            bash_max_timeout: self.bash_max_timeout,
741            subagent_timeout: self.subagent_timeout,
742            api_retries: self.api_retries,
743            session_manager: self.session_manager.clone(),
744            hook_bus: self.hook_bus.clone(),
745            reaper_handle: None,  // Cloned runtimes don't own the reaper
746            reaper_cancel: None,  // Cloned runtimes don't own the reaper
747        }
748    }
749}
750
751#[cfg(test)]
752mod tests {
753    use super::*;
754
755    #[tokio::test]
756    async fn confirm_without_prompt_fails_closed() {
757        let result = resolve_before_tool_call_result(
758            crate::extensions::hooks::events::HookResult::Confirm {
759                message: "Run deploy?".into(),
760            },
761            None,
762            false,
763        )
764        .await;
765
766        assert!(matches!(
767            result,
768            crate::extensions::hooks::events::HookResult::Block { reason }
769                if reason.contains("requires confirmation") && reason.contains("Run deploy?")
770        ));
771    }
772
773    #[tokio::test]
774    async fn modify_result_replaces_tool_input() {
775        let result = resolve_before_tool_call_decision(
776            serde_json::json!({"command":"rm -rf /"}),
777            crate::extensions::hooks::events::HookResult::Modify {
778                input: serde_json::json!({"command":"echo safe"}),
779            },
780            None,
781            false,
782        ).await;
783
784        match result {
785            BeforeToolCallDecision::Continue { input } => {
786                assert_eq!(input, serde_json::json!({"command":"echo safe"}));
787            }
788            BeforeToolCallDecision::Block { reason } => panic!("unexpected block: {reason}"),
789        }
790    }
791
792    #[tokio::test]
793    async fn confirm_prompt_yes_continues() {
794        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
795        let handle = crate::tools::SecretPromptHandle::new(tx);
796
797        let task = tokio::spawn(async move {
798            let request = rx.recv().await.expect("confirm prompt request");
799            assert_eq!(request.title, "Confirm tool call");
800            assert!(request.prompt.contains("Run deploy?"));
801            let _ = request.response_tx.send(Some("yes".to_string()));
802        });
803
804        let result = resolve_before_tool_call_result(
805            crate::extensions::hooks::events::HookResult::Confirm {
806                message: "Run deploy?".into(),
807            },
808            Some(&handle),
809            false,
810        )
811        .await;
812
813        task.await.unwrap();
814        assert!(matches!(
815            result,
816            crate::extensions::hooks::events::HookResult::Continue
817        ));
818    }
819
820    #[tokio::test]
821    async fn confirm_prompt_non_yes_blocks() {
822        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
823        let handle = crate::tools::SecretPromptHandle::new(tx);
824
825        let task = tokio::spawn(async move {
826            let request = rx.recv().await.expect("confirm prompt request");
827            let _ = request.response_tx.send(Some("no".to_string()));
828        });
829
830        let result = resolve_before_tool_call_result(
831            crate::extensions::hooks::events::HookResult::Confirm {
832                message: "Run deploy?".into(),
833            },
834            Some(&handle),
835            false,
836        )
837        .await;
838
839        task.await.unwrap();
840        assert!(matches!(
841            result,
842            crate::extensions::hooks::events::HookResult::Block { reason }
843                if reason.contains("confirmation denied")
844        ));
845    }
846
847    #[test]
848    fn test_max_tokens_for_model() {
849        // Opus models should return 128000
850        assert_eq!(HelperMethods::max_tokens_for_model("claude-opus-4-6"), 128000);
851        assert_eq!(HelperMethods::max_tokens_for_model("opus-something"), 128000);
852        
853        // Non-opus models should return 64000
854        assert_eq!(HelperMethods::max_tokens_for_model("claude-sonnet-4-20250514"), 64000);
855        assert_eq!(HelperMethods::max_tokens_for_model("haiku"), 64000);
856        assert_eq!(HelperMethods::max_tokens_for_model("claude-3-haiku"), 64000);
857        assert_eq!(HelperMethods::max_tokens_for_model("some-other-model"), 64000);
858        
859        // Edge cases
860        assert_eq!(HelperMethods::max_tokens_for_model(""), 64000);
861        assert_eq!(HelperMethods::max_tokens_for_model("OPUS"), 64000); // Case sensitive - uppercase doesn't match
862        assert_eq!(HelperMethods::max_tokens_for_model("model-opus-end"), 128000); // Contains "opus" anywhere
863    }
864
865    #[test]
866    fn test_truncate_tool_result() {
867        let default_max = 30000;
868        
869        // Short string should remain unchanged
870        let short = "This is a short string.";
871        assert_eq!(HelperMethods::truncate_tool_result(short, default_max), short);
872        
873        // Exactly max should remain unchanged
874        let exact = "x".repeat(30000);
875        assert_eq!(HelperMethods::truncate_tool_result(&exact, default_max), exact);
876        
877        // String longer than max should be truncated with notice
878        let too_long = "x".repeat(30001);
879        let truncated = HelperMethods::truncate_tool_result(&too_long, default_max);
880        
881        // Should start with the truncated content
882        assert!(truncated.starts_with(&"x".repeat(30000)));
883        
884        // Should contain truncation notice with total char count
885        assert!(truncated.contains("[truncated — 30001 total chars, showing first 30000]"));
886        
887        // Should be longer than max (due to notice)
888        assert!(truncated.len() > 30000);
889        
890        // Test with a much longer string
891        let very_long = "a".repeat(50000);
892        let truncated_very_long = HelperMethods::truncate_tool_result(&very_long, default_max);
893        assert!(truncated_very_long.contains("[truncated — 50000 total chars, showing first 30000]"));
894        assert!(truncated_very_long.starts_with(&"a".repeat(30000)));
895        
896        // Test with custom limit
897        let custom_truncated = HelperMethods::truncate_tool_result(&very_long, 100);
898        assert!(custom_truncated.starts_with(&"a".repeat(100)));
899        assert!(custom_truncated.contains("[truncated — 50000 total chars, showing first 100]"));
900    }
901
902    #[test]
903    fn test_thinking_level_ranges() {
904        use crate::core::models::thinking_level_for_budget;
905
906        // Sentinel 0 = "adaptive" (S172 — model decides)
907        assert_eq!(thinking_level_for_budget(0), "adaptive");
908
909        // Low range: 1..=2048
910        assert_eq!(thinking_level_for_budget(1), "low");
911        assert_eq!(thinking_level_for_budget(1024), "low");
912        assert_eq!(thinking_level_for_budget(2048), "low");
913
914        // Medium range: 2049..=4096
915        assert_eq!(thinking_level_for_budget(2049), "medium");
916        assert_eq!(thinking_level_for_budget(3000), "medium");
917        assert_eq!(thinking_level_for_budget(4096), "medium");
918
919        // High range: 4097..=16384
920        assert_eq!(thinking_level_for_budget(4097), "high");
921        assert_eq!(thinking_level_for_budget(8192), "high");
922        assert_eq!(thinking_level_for_budget(16384), "high");
923
924        // XHigh range: _ (everything else)
925        assert_eq!(thinking_level_for_budget(16385), "xhigh");
926        assert_eq!(thinking_level_for_budget(32768), "xhigh");
927        assert_eq!(thinking_level_for_budget(100000), "xhigh");
928    }
929}