Skip to main content

synaps_cli/runtime/
mod.rs

1use reqwest::Client;
2use serde_json::{json, Value};
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Duration;
6use crate::{Result, RuntimeError, ToolRegistry};
7use std::sync::Mutex;
8use tokio::sync::{mpsc, RwLock};
9use tokio_stream::wrappers::UnboundedReceiverStream;
10use tokio_util::sync::CancellationToken;
11use futures::stream::Stream;
12use std::pin::Pin;
13
14mod types;
15mod auth;
16mod api;
17mod api_sync;
18mod request;
19mod stream;
20mod helpers;
21pub mod subagent;
22pub mod openai;
23
24pub use types::{StreamEvent, LlmEvent, SessionEvent, AgentEvent};
25use types::AuthState;
26use auth::AuthMethods;
27use api::ApiMethods;
28use stream::StreamMethods;
29use helpers::HelperMethods;
30
31/// Result of resolving before_tool_call extension policy.
32pub enum BeforeToolCallDecision {
33    Continue { input: Value },
34    Block { reason: String },
35}
36
37/// Emit a `before_tool_call` event and include the runtime tool name when it
38/// differs from the API-safe name.
39pub async fn emit_before_tool_call(
40    hook_bus: &Arc<crate::extensions::hooks::HookBus>,
41    tool_name: &str,
42    runtime_tool_name: Option<&str>,
43    input: Value,
44) -> crate::extensions::hooks::events::HookResult {
45    let mut event = crate::extensions::hooks::events::HookEvent::before_tool_call(tool_name, input);
46    if let Some(runtime_tool_name) = runtime_tool_name {
47        event.tool_runtime_name = Some(runtime_tool_name.to_string());
48    }
49    hook_bus.emit(&event).await
50}
51
52
53/// Resolve a before_tool_call result that may request user confirmation.
54///
55/// When `auto_approve_confirms` is true, `Confirm` is short-circuited to `Continue`.
56/// Headless/non-interactive callers with `auto_approve_confirms = false` fail closed.
57pub async fn resolve_before_tool_call_result(
58    hook_result: crate::extensions::hooks::events::HookResult,
59    secret_prompt: Option<&crate::tools::SecretPromptHandle>,
60    auto_approve_confirms: bool,
61) -> crate::extensions::hooks::events::HookResult {
62    match hook_result {
63        crate::extensions::hooks::events::HookResult::Confirm { message } => {
64            if auto_approve_confirms {
65                tracing::info!(message = %message, "confirm auto-approved (auto_approve_confirms=true)");
66                return crate::extensions::hooks::events::HookResult::Continue;
67            }
68
69            let Some(prompt) = secret_prompt else {
70                return crate::extensions::hooks::events::HookResult::Block {
71                    reason: format!(
72                        "Tool call requires confirmation but no interactive prompt is available: {}",
73                        message
74                    ),
75                };
76            };
77
78            let response = prompt
79                .prompt(
80                    "Confirm tool call".to_string(),
81                    format!("{}\n\nType 'yes' or 'y' to allow.", message),
82                )
83                .await;
84
85            match response.as_deref().map(str::trim) {
86                Some(answer) if answer.eq_ignore_ascii_case("yes") || answer.eq_ignore_ascii_case("y") => {
87                    crate::extensions::hooks::events::HookResult::Continue
88                }
89                _ => crate::extensions::hooks::events::HookResult::Block {
90                    reason: format!("Tool call confirmation denied: {}", message),
91                },
92            }
93        }
94        other => other,
95    }
96}
97
98/// Resolve before_tool_call policy into executable input or a block reason.
99pub async fn resolve_before_tool_call_decision(
100    original_input: Value,
101    hook_result: crate::extensions::hooks::events::HookResult,
102    secret_prompt: Option<&crate::tools::SecretPromptHandle>,
103    auto_approve_confirms: bool,
104) -> BeforeToolCallDecision {
105    match resolve_before_tool_call_result(hook_result, secret_prompt, auto_approve_confirms).await {
106        crate::extensions::hooks::events::HookResult::Block { reason } => {
107            BeforeToolCallDecision::Block { reason }
108        }
109        crate::extensions::hooks::events::HookResult::Modify { input } => {
110            BeforeToolCallDecision::Continue { input }
111        }
112        _ => BeforeToolCallDecision::Continue { input: original_input },
113    }
114}
115
116/// Emit an `after_tool_call` event and include the runtime tool name when it
117/// differs from the API-safe name.
118pub async fn emit_after_tool_call(
119    hook_bus: &Arc<crate::extensions::hooks::HookBus>,
120    tool_name: &str,
121    runtime_tool_name: Option<&str>,
122    input: Value,
123    output: String,
124) -> crate::extensions::hooks::events::HookResult {
125    let mut event = crate::extensions::hooks::events::HookEvent::after_tool_call(
126        tool_name,
127        input,
128        output,
129    );
130    if let Some(runtime_tool_name) = runtime_tool_name {
131        event.tool_runtime_name = Some(runtime_tool_name.to_string());
132    }
133    hook_bus.emit(&event).await
134}
135
136/// The core runtime — manages API communication, tool execution, authentication,
137/// and streaming for all SynapsCLI binaries (chat, chatui, server, agent, watcher).
138pub struct Runtime {
139    client: Client,
140    auth: Arc<RwLock<AuthState>>,
141    model: String,
142    tools: Arc<RwLock<ToolRegistry>>,
143    system_prompt: Option<String>,
144    thinking_budget: u32,
145    /// User override for context window size (tokens). When set, takes
146    /// precedence over the model's auto-detected window from
147    /// `models::context_window_for_model`. Lets users cap context at e.g.
148    /// 200k even on models that natively support 1M.
149    context_window_override: Option<u64>,
150    /// Model used for compaction. Falls back to claude-sonnet-4-6 if not set.
151    compaction_model: Option<String>,
152    /// Shared registry for reactive subagent handles.
153    subagent_registry: Arc<Mutex<crate::runtime::subagent::SubagentRegistry>>,
154    /// Shared event queue — for Event Bus tooling.
155    event_queue: Arc<crate::events::EventQueue>,
156    /// Path for watcher_exit tool to write handoff state (agent mode only)
157    pub watcher_exit_path: Option<PathBuf>,
158    // New configurable fields
159    max_tool_output: usize,
160    bash_timeout: u64,
161    bash_max_timeout: u64,
162    subagent_timeout: u64,
163    api_retries: u32,
164    session_manager: std::sync::Arc<crate::tools::shell::SessionManager>,
165    /// Extension hook bus for dispatching events to extensions.
166    hook_bus: Arc<crate::extensions::hooks::HookBus>,
167    // Held to keep the reaper task alive for the Runtime's lifetime; never read directly.
168    #[allow(dead_code)]
169    reaper_handle: Option<tokio::task::JoinHandle<()>>,
170    #[allow(dead_code)]
171    reaper_cancel: Option<tokio_util::sync::CancellationToken>,
172}
173
174impl Runtime {
175    pub async fn new() -> Result<Self> {
176        let (auth_token, auth_type, refresh_token, token_expires) = AuthMethods::get_auth_token()?;
177
178        let client = Client::builder()
179            .connect_timeout(Duration::from_secs(10))
180            .timeout(Duration::from_secs(300))
181            .build()
182            .map_err(|e| RuntimeError::Config(format!("Failed to build HTTP client: {}", e)))?;
183
184        let session_manager = {
185            let config = crate::tools::shell::ShellConfig::default();
186            crate::tools::shell::SessionManager::new(config)
187        };
188
189        // Start the idle session reaper
190        let mgr = session_manager.clone();
191        let cancel = tokio_util::sync::CancellationToken::new();
192        let reaper_handle = crate::tools::shell::session::start_reaper(mgr, cancel.clone());
193
194        Ok(Runtime {
195            client,
196            auth: Arc::new(RwLock::new(AuthState {
197                auth_token,
198                auth_type,
199                refresh_token,
200                token_expires,
201            })),
202            model: crate::models::default_model().to_string(),
203            tools: Arc::new(RwLock::new(ToolRegistry::new())),
204            system_prompt: None,
205            thinking_budget: 4096,
206            context_window_override: None,
207            compaction_model: None,
208            subagent_registry: Arc::new(Mutex::new(crate::runtime::subagent::SubagentRegistry::new())),
209            event_queue: Arc::new(crate::events::EventQueue::new(1000)),
210            watcher_exit_path: None,
211            max_tool_output: 30000,
212            bash_timeout: 30,
213            bash_max_timeout: 300,
214            subagent_timeout: 300,
215            api_retries: 3,
216            session_manager,
217            hook_bus: Arc::new(crate::extensions::hooks::HookBus::new()),
218            reaper_handle: Some(reaper_handle),
219            reaper_cancel: Some(cancel),
220        })
221    }
222
223    pub fn set_system_prompt(&mut self, prompt: String) {
224        self.system_prompt = Some(prompt);
225    }
226
227    pub fn system_prompt(&self) -> Option<&str> {
228        self.system_prompt.as_deref()
229    }
230
231    pub fn set_model(&mut self, model: String) {
232        // Strip any health/status prefix (e.g. "✅  339ms  groq/..." → "groq/...")
233        let cleaned = if let Some(pos) = model.find("claude-") {
234            model[pos..].to_string()
235        } else if let Some(pos) = model.find('/') {
236            let before = &model[..pos];
237            let key_start = before.rfind(|c: char| !c.is_ascii_alphanumeric() && c != '-' && c != '_')
238                .map(|i| i + before[i..].chars().next().map(|c| c.len_utf8()).unwrap_or(1))
239                .unwrap_or(0);
240            model[key_start..].to_string()
241        } else {
242            model
243        };
244        self.model = cleaned;
245    }
246
247    pub fn set_tools(&mut self, tools: ToolRegistry) {
248        self.tools = Arc::new(RwLock::new(tools));
249    }
250
251    pub fn subagent_registry(&self) -> &Arc<Mutex<crate::runtime::subagent::SubagentRegistry>> {
252        &self.subagent_registry
253    }
254
255    pub fn event_queue(&self) -> &Arc<crate::events::EventQueue> {
256        &self.event_queue
257    }
258
259    /// Get a shared reference to the extension hook bus.
260    pub fn hook_bus(&self) -> &Arc<crate::extensions::hooks::HookBus> {
261        &self.hook_bus
262    }
263
264    /// Get a shared reference to the tool registry (for MCP lazy loading).
265    pub fn tools_shared(&self) -> Arc<RwLock<ToolRegistry>> {
266        Arc::clone(&self.tools)
267    }
268
269    pub fn model(&self) -> &str {
270        &self.model
271    }
272
273    pub fn http_client(&self) -> &Client {
274        &self.client
275    }
276    pub fn set_thinking_budget(&mut self, budget: u32) {
277        self.thinking_budget = budget;
278    }
279
280    pub fn set_compaction_model(&mut self, model: Option<String>) {
281        self.compaction_model = model;
282    }
283
284    pub fn set_context_window(&mut self, window: Option<u64>) {
285        self.context_window_override = window;
286    }
287
288    /// Effective context window for the current model — user override if set,
289    /// otherwise the model's native window from `models::context_window_for_model`.
290    pub fn compaction_model(&self) -> &str {
291        self.compaction_model.as_deref().unwrap_or("claude-sonnet-4-6")
292    }
293
294    pub fn context_window(&self) -> u64 {
295        self.context_window_override
296            .unwrap_or_else(|| crate::models::context_window_for_model(&self.model))
297    }
298
299    /// Apply a parsed config file to this runtime (model, thinking budget, etc.)
300    pub fn apply_config(&mut self, config: &crate::config::SynapsConfig) {
301        if let Some(ref model) = config.model {
302            self.set_model(model.clone());
303        }
304        if let Some(budget) = config.thinking_budget {
305            self.set_thinking_budget(budget);
306        }
307        self.context_window_override = config.context_window;
308        self.compaction_model = config.compaction_model.clone();
309        self.max_tool_output = config.max_tool_output;
310        self.bash_timeout = config.bash_timeout;
311        self.bash_max_timeout = config.bash_max_timeout;
312        self.subagent_timeout = config.subagent_timeout;
313        self.api_retries = config.api_retries;
314    }
315
316    pub fn thinking_budget(&self) -> u32 {
317        self.thinking_budget
318    }
319
320    pub fn max_tool_output(&self) -> usize {
321        self.max_tool_output
322    }
323
324    pub fn bash_timeout(&self) -> u64 {
325        self.bash_timeout
326    }
327
328    pub fn bash_max_timeout(&self) -> u64 {
329        self.bash_max_timeout
330    }
331
332    pub fn subagent_timeout(&self) -> u64 {
333        self.subagent_timeout
334    }
335
336    pub fn api_retries(&self) -> u32 {
337        self.api_retries
338    }
339
340    pub fn set_max_tool_output(&mut self, v: usize) {
341        self.max_tool_output = v;
342    }
343
344    pub fn set_bash_timeout(&mut self, v: u64) {
345        self.bash_timeout = v;
346    }
347
348    pub fn set_bash_max_timeout(&mut self, v: u64) {
349        self.bash_max_timeout = v;
350    }
351
352    pub fn set_subagent_timeout(&mut self, v: u64) {
353        self.subagent_timeout = v;
354    }
355
356    pub fn set_api_retries(&mut self, v: u32) {
357        self.api_retries = v;
358    }
359
360    pub fn thinking_level(&self) -> &str {
361        crate::core::models::thinking_level_for_budget(self.thinking_budget)
362    }
363
364    /// Check if the OAuth token is expired and refresh it if needed.
365    pub async fn refresh_if_needed(&self) -> Result<()> {
366        AuthMethods::refresh_if_needed(Arc::clone(&self.auth), &self.client).await
367    }
368
369    /// Make a simple non-streaming API call for compaction (no tools).
370    ///
371    /// Uses a dedicated summarization system prompt (not the user's), omits
372    /// all tools, and returns the raw text response. Caller supplies the
373    /// full message array including the serialized conversation.
374    pub async fn compact_call(&self, messages: Vec<Value>) -> Result<String> {
375        self.refresh_if_needed().await?;
376
377        use crate::core::compaction::COMPACTION_SYSTEM_PROMPT;
378
379        ApiMethods::call_api_simple(
380            &self.auth,
381            &self.client,
382            self.compaction_model(),
383            COMPACTION_SYSTEM_PROMPT,
384            self.thinking_budget,
385            &messages,
386            self.api_retries,
387        ).await
388    }
389
390    /// Run a single prompt synchronously (non-streaming). Handles tool execution
391    /// internally, looping until the model produces a final text response.
392    pub async fn run_single(&self, prompt: &str) -> Result<String> {
393        // Refresh OAuth token if expired
394        self.refresh_if_needed().await?;
395
396        let mut messages = vec![json!({"role": "user", "content": prompt})];
397        
398        loop {
399            let response = ApiMethods::call_api(
400                &self.auth,
401                &self.client,
402                &self.model,
403                &*self.tools.read().await,
404                &self.system_prompt,
405                self.thinking_budget,
406                &messages,
407                self.api_retries,
408                &api::ApiOptions {
409                    use_1m_context: self.context_window_override == Some(1_000_000),
410                },
411            ).await?;
412            
413            // Check if Claude wants to use tools
414            if let Some(content) = response["content"].as_array() {
415                let mut response_text = String::new();
416                let mut tool_uses = Vec::new();
417                
418                // Process response content
419                for item in content {
420                    match item["type"].as_str() {
421                        Some("text") => {
422                            if let Some(text) = item["text"].as_str() {
423                                response_text.push_str(text);
424                            }
425                        }
426                        Some("tool_use") => {
427                            tool_uses.push(item.clone());
428                        }
429                        _ => {}
430                    }
431                }
432                
433                // If no tool uses, return the text response
434                if tool_uses.is_empty() {
435                    return Ok(response_text);
436                }
437                
438                // Add assistant's response to conversation (only content, role)
439                messages.push(json!({
440                    "role": "assistant",
441                    "content": content
442                }));
443                
444                // Execute tools — parallel when multiple are requested
445                let mut tool_results = Vec::new();
446                
447                if tool_uses.len() == 1 {
448                    // Single tool — run inline, no spawn overhead
449                    let tool_use = &tool_uses[0];
450                    if let (Some(tool_name), Some(tool_id)) = (
451                        tool_use["name"].as_str(),
452                        tool_use["id"].as_str()
453                    ) {
454                        let input = &tool_use["input"];
455                        let result = match self.tools.read().await.get(tool_name).cloned() {
456                            Some(tool) => {
457                                let input = self.tools.read().await.translate_input_for_api_tool(tool_name, input.clone());
458                                let runtime_name = self.tools.read().await.runtime_name_for_api(tool_name).to_string();
459                                let ctx = crate::ToolContext {
460                                    channels: crate::tools::ToolChannels {
461                                        tx_delta: None,
462                                        tx_events: None,
463                                    },
464                                    capabilities: crate::tools::ToolCapabilities {
465                                        watcher_exit_path: self.watcher_exit_path.clone(),
466                                        tool_register_tx: None,
467                                        session_manager: Some(self.session_manager.clone()),
468                                        subagent_registry: Some(self.subagent_registry.clone()),
469                                        event_queue: Some(self.event_queue.clone()),
470                                        secret_prompt: None,
471                                    },
472                                    limits: crate::tools::ToolLimits {
473                                        max_tool_output: self.max_tool_output,
474                                        bash_timeout: self.bash_timeout,
475                                        bash_max_timeout: self.bash_max_timeout,
476                                        subagent_timeout: self.subagent_timeout,
477                                    },
478                                };
479                                let decision = resolve_before_tool_call_decision(
480                                    input.clone(),
481                                    emit_before_tool_call(
482                                        &self.hook_bus,
483                                        tool_name,
484                                        Some(&runtime_name),
485                                        input.clone(),
486                                    ).await,
487                                    None,
488                                    false,
489                                ).await;
490                                if let BeforeToolCallDecision::Block { reason } = decision {
491                                    format!("Tool call blocked by extension: {}", reason)
492                                } else {
493                                    let BeforeToolCallDecision::Continue { input } = decision else { unreachable!() };
494                                    let input_for_hook = input.clone();
495                                    let output = match tool.execute(input, ctx).await {
496                                        Ok(output) => output,
497                                        Err(e) => format!("Tool execution failed: {}", e),
498                                    };
499                                    let _ = emit_after_tool_call(
500                                        &self.hook_bus,
501                                        tool_name,
502                                        Some(&runtime_name),
503                                        input_for_hook,
504                                        output.clone(),
505                                    ).await;
506                                    output
507                                }
508                            }
509                            None => format!("Unknown tool: {}", tool_name),
510                        };
511                        tool_results.push(json!({
512                            "type": "tool_result",
513                            "tool_use_id": tool_id,
514                            "content": HelperMethods::truncate_tool_result(&result, self.max_tool_output)
515                        }));
516                    }
517                } else {
518                    // Multiple tools — run in parallel with JoinSet
519                    let mut join_set = tokio::task::JoinSet::new();
520                    
521                    // Capture config values before spawning (can't borrow &self in 'static spawn)
522                    let cfg_max_tool_output = self.max_tool_output;
523                    let cfg_bash_timeout = self.bash_timeout;
524                    let cfg_bash_max_timeout = self.bash_max_timeout;
525                    let cfg_subagent_timeout = self.subagent_timeout;
526                    let session_mgr = self.session_manager.clone();
527                    let cfg_subagent_registry = self.subagent_registry.clone();
528                    let cfg_event_queue = self.event_queue.clone();
529                    let cfg_hook_bus = self.hook_bus.clone();
530                    
531                    for tool_use in &tool_uses {
532                        if let (Some(tool_name), Some(tool_id)) = (
533                            tool_use["name"].as_str().map(|s| s.to_string()),
534                            tool_use["id"].as_str().map(|s| s.to_string()),
535                        ) {
536                            let input = tool_use["input"].clone();
537                            let tools_snapshot = self.tools.read().await;
538                            let input = tools_snapshot.translate_input_for_api_tool(&tool_name, input);
539                            let runtime_name = tools_snapshot.runtime_name_for_api(&tool_name).to_string();
540                            let tool = tools_snapshot.get(&tool_name).cloned();
541                            drop(tools_snapshot);
542                            let exit_path = self.watcher_exit_path.clone();
543                            let session_mgr_inner = session_mgr.clone();
544                            let registry_inner = cfg_subagent_registry.clone();
545                            let event_queue_inner = cfg_event_queue.clone();
546                            let hook_bus_inner = cfg_hook_bus.clone();
547                            let tool_name_for_hook = tool_name.clone();
548                            let runtime_name_for_hook = runtime_name.clone();
549                            
550                            join_set.spawn(async move {
551                                let result = match tool {
552                                    Some(t) => {
553                                        let decision = crate::runtime::resolve_before_tool_call_decision(
554                                            input.clone(),
555                                            crate::runtime::emit_before_tool_call(
556                                                &hook_bus_inner,
557                                                &tool_name_for_hook,
558                                                Some(&runtime_name_for_hook),
559                                                input.clone(),
560                                            ).await,
561                                            None,
562                                            false,
563                                        ).await;
564                                        if let crate::runtime::BeforeToolCallDecision::Block { reason } = decision {
565                                            format!("Tool call blocked by extension: {}", reason)
566                                        } else {
567                                        let crate::runtime::BeforeToolCallDecision::Continue { input } = decision else { unreachable!() };
568                                        let ctx = crate::ToolContext {
569                                            channels: crate::tools::ToolChannels {
570                                                tx_delta: None,
571                                                tx_events: None,
572                                            },
573                                            capabilities: crate::tools::ToolCapabilities {
574                                                watcher_exit_path: exit_path,
575                                                tool_register_tx: None,
576                                                session_manager: Some(session_mgr_inner),
577                                                subagent_registry: Some(registry_inner),
578                                                event_queue: Some(event_queue_inner),
579                                                secret_prompt: None,
580                                            },
581                                            limits: crate::tools::ToolLimits {
582                                                max_tool_output: cfg_max_tool_output,
583                                                bash_timeout: cfg_bash_timeout,
584                                                bash_max_timeout: cfg_bash_max_timeout,
585                                                subagent_timeout: cfg_subagent_timeout,
586                                            },
587                                        };
588                                        let input_for_hook = input.clone();
589                                        let output = match t.execute(input, ctx).await {
590                                            Ok(output) => output,
591                                            Err(e) => format!("Tool execution failed: {}", e),
592                                        };
593                                        let _ = crate::runtime::emit_after_tool_call(
594                                            &hook_bus_inner,
595                                            &tool_name_for_hook,
596                                            Some(&runtime_name_for_hook),
597                                            input_for_hook,
598                                            output.clone(),
599                                        ).await;
600                                        output
601                                        }
602                                    }
603                                    None => format!("Unknown tool: {}", tool_name),
604                                };
605                                (tool_id, result)
606                            });
607                        }
608                    }
609                    
610                    // Collect results, preserving order by tool_id
611                    let mut results_map = std::collections::HashMap::new();
612                    while let Some(res) = join_set.join_next().await {
613                        match res {
614                            Ok((tool_id, result)) => {
615                                results_map.insert(tool_id, result);
616                            }
617                            Err(e) => {
618                                // Task panicked — log it but don't crash
619                                tracing::error!("Parallel tool task panicked: {}", e);
620                            }
621                        }
622                    }
623                    
624                    // Build tool_results in original order — every tool_use MUST have a result
625                    for tool_use in &tool_uses {
626                        if let Some(tool_id) = tool_use["id"].as_str() {
627                            let result = results_map.remove(tool_id)
628                                .unwrap_or_else(|| "Tool execution failed: task panicked".to_string());
629                            tool_results.push(json!({
630                                "type": "tool_result",
631                                "tool_use_id": tool_id,
632                                "content": HelperMethods::truncate_tool_result(&result, self.max_tool_output)
633                            }));
634                        }
635                    }
636                }
637                
638                // Add tool results to conversation
639                messages.push(json!({
640                    "role": "user",
641                    "content": tool_results
642                }));
643                
644                // Continue the loop to get Claude's response with tool results
645            } else {
646                return Err(RuntimeError::Tool("Invalid response format".to_string()));
647            }
648        }
649    }
650
651    /// Run a prompt as a cancellable stream of [`StreamEvent`]s. Convenience wrapper
652    /// around [`run_stream_with_messages`] for single-turn usage.
653    pub async fn run_stream(&self, prompt: String, cancel: CancellationToken) -> Pin<Box<dyn Stream<Item = StreamEvent> + Send>> {
654        self.run_stream_with_messages(vec![json!({"role": "user", "content": prompt})], cancel, None, None, false).await
655    }
656
657    /// Run a multi-turn conversation as a cancellable stream of [`StreamEvent`]s.
658    /// This is the main entry point for chat UIs and agents. Handles tool execution,
659    /// API retries, and dynamic tool registration (MCP) internally.
660    pub async fn run_stream_with_messages(
661        &self,
662        messages: Vec<Value>,
663        cancel: CancellationToken,
664        steering_rx: Option<mpsc::UnboundedReceiver<String>>,
665        secret_prompt: Option<crate::tools::SecretPromptHandle>,
666        auto_approve_confirms: bool,
667    ) -> Pin<Box<dyn Stream<Item = StreamEvent> + Send>> {
668        let (tx, rx) = mpsc::unbounded_channel();
669
670        // Refresh OAuth token if expired before starting the stream.
671        if let Err(e) = self.refresh_if_needed().await {
672            let _ = tx.send(StreamEvent::Session(SessionEvent::Error(e.to_string())));
673            let _ = tx.send(StreamEvent::Session(SessionEvent::Done));
674            return Box::pin(UnboundedReceiverStream::new(rx));
675        }
676
677        // Clone the Arc, not the whole Runtime — the spawned task shares the
678        // same AuthState so mid-loop token refreshes are visible immediately.
679        let auth = Arc::clone(&self.auth);
680        let client = self.client.clone();
681        let model = self.model.clone();
682        let tools = self.tools.clone();
683        let system_prompt = self.system_prompt.clone();
684        let thinking_budget = self.thinking_budget;
685        let watcher_exit_path = self.watcher_exit_path.clone();
686        let max_tool_output = self.max_tool_output;
687        let bash_timeout = self.bash_timeout;
688        let bash_max_timeout = self.bash_max_timeout;
689        let subagent_timeout = self.subagent_timeout;
690        let api_retries = self.api_retries;
691        let session_manager = self.session_manager.clone();
692        // Opt into the 1M-context beta header only when the user explicitly
693        // requested 1M (via context_window setting). Default 200k matches
694        // Anthropic's claude-code default and gives smarter inference.
695        let subagent_registry = self.subagent_registry.clone();
696        let event_queue = self.event_queue.clone();
697        let options = api::ApiOptions {
698            use_1m_context: self.context_window_override == Some(1_000_000),
699        };
700
701        let session = crate::runtime::stream::StreamSession {
702            auth, client, options, api_retries,
703            model, tools, system_prompt, thinking_budget,
704            tx: tx.clone(), cancel, steering_rx,
705            watcher_exit_path, max_tool_output,
706            bash_timeout, bash_max_timeout, subagent_timeout,
707            session_manager, subagent_registry, event_queue, secret_prompt,
708            hook_bus: self.hook_bus.clone(),
709            auto_approve_confirms,
710        };
711
712        tokio::spawn(async move {
713            if let Err(e) = StreamMethods::run_stream_internal(session, messages).await {
714                let _ = tx.send(StreamEvent::Session(SessionEvent::Error(e.to_string())));
715            }
716            let _ = tx.send(StreamEvent::Session(SessionEvent::Done));
717        });
718
719        Box::pin(UnboundedReceiverStream::new(rx))
720    }
721}
722
723impl Clone for Runtime {
724    fn clone(&self) -> Self {
725        Self {
726            client: self.client.clone(),
727            auth: Arc::clone(&self.auth),
728            model: self.model.clone(),
729            tools: self.tools.clone(),
730            system_prompt: self.system_prompt.clone(),
731            thinking_budget: self.thinking_budget,
732            context_window_override: self.context_window_override,
733            compaction_model: self.compaction_model.clone(),
734            subagent_registry: self.subagent_registry.clone(),
735            event_queue: self.event_queue.clone(),
736            watcher_exit_path: self.watcher_exit_path.clone(),
737            max_tool_output: self.max_tool_output,
738            bash_timeout: self.bash_timeout,
739            bash_max_timeout: self.bash_max_timeout,
740            subagent_timeout: self.subagent_timeout,
741            api_retries: self.api_retries,
742            session_manager: self.session_manager.clone(),
743            hook_bus: self.hook_bus.clone(),
744            reaper_handle: None,  // Cloned runtimes don't own the reaper
745            reaper_cancel: None,  // Cloned runtimes don't own the reaper
746        }
747    }
748}
749
750#[cfg(test)]
751mod tests {
752    use super::*;
753
754    #[tokio::test]
755    async fn confirm_without_prompt_fails_closed() {
756        let result = resolve_before_tool_call_result(
757            crate::extensions::hooks::events::HookResult::Confirm {
758                message: "Run deploy?".into(),
759            },
760            None,
761            false,
762        )
763        .await;
764
765        assert!(matches!(
766            result,
767            crate::extensions::hooks::events::HookResult::Block { reason }
768                if reason.contains("requires confirmation") && reason.contains("Run deploy?")
769        ));
770    }
771
772    #[tokio::test]
773    async fn modify_result_replaces_tool_input() {
774        let result = resolve_before_tool_call_decision(
775            serde_json::json!({"command":"rm -rf /"}),
776            crate::extensions::hooks::events::HookResult::Modify {
777                input: serde_json::json!({"command":"echo safe"}),
778            },
779            None,
780            false,
781        ).await;
782
783        match result {
784            BeforeToolCallDecision::Continue { input } => {
785                assert_eq!(input, serde_json::json!({"command":"echo safe"}));
786            }
787            BeforeToolCallDecision::Block { reason } => panic!("unexpected block: {reason}"),
788        }
789    }
790
791    #[tokio::test]
792    async fn confirm_prompt_yes_continues() {
793        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
794        let handle = crate::tools::SecretPromptHandle::new(tx);
795
796        let task = tokio::spawn(async move {
797            let request = rx.recv().await.expect("confirm prompt request");
798            assert_eq!(request.title, "Confirm tool call");
799            assert!(request.prompt.contains("Run deploy?"));
800            let _ = request.response_tx.send(Some("yes".to_string()));
801        });
802
803        let result = resolve_before_tool_call_result(
804            crate::extensions::hooks::events::HookResult::Confirm {
805                message: "Run deploy?".into(),
806            },
807            Some(&handle),
808            false,
809        )
810        .await;
811
812        task.await.unwrap();
813        assert!(matches!(
814            result,
815            crate::extensions::hooks::events::HookResult::Continue
816        ));
817    }
818
819    #[tokio::test]
820    async fn confirm_prompt_non_yes_blocks() {
821        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
822        let handle = crate::tools::SecretPromptHandle::new(tx);
823
824        let task = tokio::spawn(async move {
825            let request = rx.recv().await.expect("confirm prompt request");
826            let _ = request.response_tx.send(Some("no".to_string()));
827        });
828
829        let result = resolve_before_tool_call_result(
830            crate::extensions::hooks::events::HookResult::Confirm {
831                message: "Run deploy?".into(),
832            },
833            Some(&handle),
834            false,
835        )
836        .await;
837
838        task.await.unwrap();
839        assert!(matches!(
840            result,
841            crate::extensions::hooks::events::HookResult::Block { reason }
842                if reason.contains("confirmation denied")
843        ));
844    }
845
846    #[test]
847    fn test_max_tokens_for_model() {
848        // Opus models should return 128000
849        assert_eq!(HelperMethods::max_tokens_for_model("claude-opus-4-6"), 128000);
850        assert_eq!(HelperMethods::max_tokens_for_model("opus-something"), 128000);
851        
852        // Non-opus models should return 64000
853        assert_eq!(HelperMethods::max_tokens_for_model("claude-sonnet-4-20250514"), 64000);
854        assert_eq!(HelperMethods::max_tokens_for_model("haiku"), 64000);
855        assert_eq!(HelperMethods::max_tokens_for_model("claude-3-haiku"), 64000);
856        assert_eq!(HelperMethods::max_tokens_for_model("some-other-model"), 64000);
857        
858        // Edge cases
859        assert_eq!(HelperMethods::max_tokens_for_model(""), 64000);
860        assert_eq!(HelperMethods::max_tokens_for_model("OPUS"), 64000); // Case sensitive - uppercase doesn't match
861        assert_eq!(HelperMethods::max_tokens_for_model("model-opus-end"), 128000); // Contains "opus" anywhere
862    }
863
864    #[test]
865    fn test_truncate_tool_result() {
866        let default_max = 30000;
867        
868        // Short string should remain unchanged
869        let short = "This is a short string.";
870        assert_eq!(HelperMethods::truncate_tool_result(short, default_max), short);
871        
872        // Exactly max should remain unchanged
873        let exact = "x".repeat(30000);
874        assert_eq!(HelperMethods::truncate_tool_result(&exact, default_max), exact);
875        
876        // String longer than max should be truncated with notice
877        let too_long = "x".repeat(30001);
878        let truncated = HelperMethods::truncate_tool_result(&too_long, default_max);
879        
880        // Should start with the truncated content
881        assert!(truncated.starts_with(&"x".repeat(30000)));
882        
883        // Should contain truncation notice with total char count
884        assert!(truncated.contains("[truncated — 30001 total chars, showing first 30000]"));
885        
886        // Should be longer than max (due to notice)
887        assert!(truncated.len() > 30000);
888        
889        // Test with a much longer string
890        let very_long = "a".repeat(50000);
891        let truncated_very_long = HelperMethods::truncate_tool_result(&very_long, default_max);
892        assert!(truncated_very_long.contains("[truncated — 50000 total chars, showing first 30000]"));
893        assert!(truncated_very_long.starts_with(&"a".repeat(30000)));
894        
895        // Test with custom limit
896        let custom_truncated = HelperMethods::truncate_tool_result(&very_long, 100);
897        assert!(custom_truncated.starts_with(&"a".repeat(100)));
898        assert!(custom_truncated.contains("[truncated — 50000 total chars, showing first 100]"));
899    }
900
901    #[test]
902    fn test_thinking_level_ranges() {
903        use crate::core::models::thinking_level_for_budget;
904
905        // Sentinel 0 = "adaptive" (S172 — model decides)
906        assert_eq!(thinking_level_for_budget(0), "adaptive");
907
908        // Low range: 1..=2048
909        assert_eq!(thinking_level_for_budget(1), "low");
910        assert_eq!(thinking_level_for_budget(1024), "low");
911        assert_eq!(thinking_level_for_budget(2048), "low");
912
913        // Medium range: 2049..=4096
914        assert_eq!(thinking_level_for_budget(2049), "medium");
915        assert_eq!(thinking_level_for_budget(3000), "medium");
916        assert_eq!(thinking_level_for_budget(4096), "medium");
917
918        // High range: 4097..=16384
919        assert_eq!(thinking_level_for_budget(4097), "high");
920        assert_eq!(thinking_level_for_budget(8192), "high");
921        assert_eq!(thinking_level_for_budget(16384), "high");
922
923        // XHigh range: _ (everything else)
924        assert_eq!(thinking_level_for_budget(16385), "xhigh");
925        assert_eq!(thinking_level_for_budget(32768), "xhigh");
926        assert_eq!(thinking_level_for_budget(100000), "xhigh");
927    }
928}