1use reqwest::Client;
2use serde_json::{json, Value};
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Duration;
6use crate::{Result, RuntimeError, ToolRegistry};
7use std::sync::Mutex;
8use tokio::sync::{mpsc, RwLock};
9use tokio_stream::wrappers::UnboundedReceiverStream;
10use tokio_util::sync::CancellationToken;
11use futures::stream::Stream;
12use std::pin::Pin;
13
14mod types;
15mod auth;
16mod api;
17mod api_sync;
18mod request;
19mod stream;
20mod helpers;
21mod sse;
22mod sse_types;
23pub mod subagent;
24pub mod openai;
25pub mod telemetry;
26pub mod compaction;
27
28pub use types::{StreamEvent, LlmEvent, SessionEvent, AgentEvent};
29use types::AuthState;
30use auth::AuthMethods;
31use api::ApiMethods;
32use stream::StreamMethods;
33use helpers::HelperMethods;
34
35pub enum BeforeToolCallDecision {
37 Continue { input: Value },
38 Block { reason: String },
39}
40
41pub async fn emit_before_tool_call(
44 hook_bus: &Arc<crate::extensions::hooks::HookBus>,
45 tool_name: &str,
46 runtime_tool_name: Option<&str>,
47 input: Value,
48) -> crate::extensions::hooks::events::HookResult {
49 let mut event = crate::extensions::hooks::events::HookEvent::before_tool_call(tool_name, input);
50 if let Some(runtime_tool_name) = runtime_tool_name {
51 event.tool_runtime_name = Some(runtime_tool_name.to_string());
52 }
53 hook_bus.emit(&event).await
54}
55
56
57pub async fn resolve_before_tool_call_result(
62 hook_result: crate::extensions::hooks::events::HookResult,
63 secret_prompt: Option<&crate::tools::SecretPromptHandle>,
64 auto_approve_confirms: bool,
65) -> crate::extensions::hooks::events::HookResult {
66 match hook_result {
67 crate::extensions::hooks::events::HookResult::Confirm { message } => {
68 if auto_approve_confirms {
69 tracing::info!(message = %message, "confirm auto-approved (auto_approve_confirms=true)");
70 return crate::extensions::hooks::events::HookResult::Continue;
71 }
72
73 let Some(prompt) = secret_prompt else {
74 return crate::extensions::hooks::events::HookResult::Block {
75 reason: format!(
76 "Tool call requires confirmation but no interactive prompt is available: {}",
77 message
78 ),
79 };
80 };
81
82 let response = prompt
83 .prompt(
84 "Confirm tool call".to_string(),
85 format!("{}\n\nType 'yes' or 'y' to allow.", message),
86 )
87 .await;
88
89 match response.as_deref().map(str::trim) {
90 Some(answer) if answer.eq_ignore_ascii_case("yes") || answer.eq_ignore_ascii_case("y") => {
91 crate::extensions::hooks::events::HookResult::Continue
92 }
93 _ => crate::extensions::hooks::events::HookResult::Block {
94 reason: format!("Tool call confirmation denied: {}", message),
95 },
96 }
97 }
98 other => other,
99 }
100}
101
102pub async fn resolve_before_tool_call_decision(
104 original_input: Value,
105 hook_result: crate::extensions::hooks::events::HookResult,
106 secret_prompt: Option<&crate::tools::SecretPromptHandle>,
107 auto_approve_confirms: bool,
108) -> BeforeToolCallDecision {
109 match resolve_before_tool_call_result(hook_result, secret_prompt, auto_approve_confirms).await {
110 crate::extensions::hooks::events::HookResult::Block { reason } => {
111 BeforeToolCallDecision::Block { reason }
112 }
113 crate::extensions::hooks::events::HookResult::Modify { input } => {
114 BeforeToolCallDecision::Continue { input }
115 }
116 _ => BeforeToolCallDecision::Continue { input: original_input },
117 }
118}
119
120pub async fn emit_after_tool_call(
123 hook_bus: &Arc<crate::extensions::hooks::HookBus>,
124 tool_name: &str,
125 runtime_tool_name: Option<&str>,
126 input: Value,
127 output: String,
128) -> crate::extensions::hooks::events::HookResult {
129 let mut event = crate::extensions::hooks::events::HookEvent::after_tool_call(
130 tool_name,
131 input,
132 output,
133 );
134 if let Some(runtime_tool_name) = runtime_tool_name {
135 event.tool_runtime_name = Some(runtime_tool_name.to_string());
136 }
137 hook_bus.emit(&event).await
138}
139
140pub struct Runtime {
143 client: Client,
144 auth: Arc<RwLock<AuthState>>,
145 model: String,
146 tools: Arc<RwLock<ToolRegistry>>,
147 system_prompt: Option<String>,
148 thinking_budget: u32,
149 context_window_override: Option<u64>,
154 compaction_model: Option<String>,
156 subagent_registry: Arc<Mutex<crate::runtime::subagent::SubagentRegistry>>,
158 event_queue: Arc<crate::events::EventQueue>,
160 pub watcher_exit_path: Option<PathBuf>,
162 max_tool_output: usize,
164 bash_timeout: u64,
165 bash_max_timeout: u64,
166 subagent_timeout: u64,
167 api_retries: u32,
168 telemetry_level: crate::runtime::telemetry::TelemetryLevel,
170 cache_diagnostics: bool,
172 cache_ttl: crate::core::config::CacheTtl,
175 ttl_downgrade_notified: std::sync::Arc<std::sync::atomic::AtomicBool>,
178 saw_1h_honored: std::sync::Arc<std::sync::atomic::AtomicBool>,
182 #[allow(dead_code)]
186 last_msg_id: Arc<Mutex<Option<String>>>,
187 session_manager: std::sync::Arc<crate::tools::shell::SessionManager>,
188 hook_bus: Arc<crate::extensions::hooks::HookBus>,
190 #[allow(dead_code)]
192 reaper_handle: Option<tokio::task::JoinHandle<()>>,
193 #[allow(dead_code)]
194 reaper_cancel: Option<tokio_util::sync::CancellationToken>,
195}
196
197impl Runtime {
198 pub async fn new() -> Result<Self> {
199 let (auth_token, auth_type, refresh_token, token_expires) = AuthMethods::get_auth_token()?;
200
201 let client = Client::builder()
202 .connect_timeout(Duration::from_secs(10))
203 .timeout(Duration::from_secs(300))
204 .build()
205 .map_err(|e| RuntimeError::Config(format!("Failed to build HTTP client: {}", e)))?;
206
207 let session_manager = {
208 let config = crate::tools::shell::ShellConfig::default();
209 crate::tools::shell::SessionManager::new(config)
210 };
211
212 let mgr = session_manager.clone();
214 let cancel = tokio_util::sync::CancellationToken::new();
215 let reaper_handle = crate::tools::shell::session::start_reaper(mgr, cancel.clone());
216
217 Ok(Runtime {
218 client,
219 auth: Arc::new(RwLock::new(AuthState {
220 auth_token,
221 auth_type,
222 refresh_token,
223 token_expires,
224 })),
225 model: crate::models::default_model().to_string(),
226 tools: Arc::new(RwLock::new(ToolRegistry::new())),
227 system_prompt: None,
228 thinking_budget: 4096,
229 context_window_override: None,
230 compaction_model: None,
231 subagent_registry: Arc::new(Mutex::new(crate::runtime::subagent::SubagentRegistry::new())),
232 event_queue: Arc::new(crate::events::EventQueue::new(1000)),
233 watcher_exit_path: None,
234 max_tool_output: 30000,
235 bash_timeout: 30,
236 bash_max_timeout: 300,
237 subagent_timeout: 300,
238 api_retries: 3,
239 telemetry_level: crate::runtime::telemetry::TelemetryLevel::Off,
240 cache_diagnostics: false,
241 cache_ttl: crate::core::config::CacheTtl::default(),
242 ttl_downgrade_notified: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
243 saw_1h_honored: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
244 last_msg_id: Arc::new(Mutex::new(None)),
245 session_manager,
246 hook_bus: Arc::new(crate::extensions::hooks::HookBus::new()),
247 reaper_handle: Some(reaper_handle),
248 reaper_cancel: Some(cancel),
249 })
250 }
251
252 pub fn set_system_prompt(&mut self, prompt: String) {
253 self.system_prompt = Some(prompt);
254 }
255
256 pub fn system_prompt(&self) -> Option<&str> {
257 self.system_prompt.as_deref()
258 }
259
260 pub fn set_model(&mut self, model: String) {
261 let cleaned = if let Some(pos) = model.find("claude-") {
263 model[pos..].to_string()
264 } else if let Some(pos) = model.find('/') {
265 let before = &model[..pos];
266 let key_start = before.rfind(|c: char| !c.is_ascii_alphanumeric() && c != '-' && c != '_')
267 .map(|i| i + before[i..].chars().next().map(|c| c.len_utf8()).unwrap_or(1))
268 .unwrap_or(0);
269 model[key_start..].to_string()
270 } else {
271 model
272 };
273 self.model = cleaned;
274 }
275
276 pub fn set_tools(&mut self, tools: ToolRegistry) {
277 self.tools = Arc::new(RwLock::new(tools));
278 }
279
280 pub fn subagent_registry(&self) -> &Arc<Mutex<crate::runtime::subagent::SubagentRegistry>> {
281 &self.subagent_registry
282 }
283
284 pub fn event_queue(&self) -> &Arc<crate::events::EventQueue> {
285 &self.event_queue
286 }
287
288 pub fn hook_bus(&self) -> &Arc<crate::extensions::hooks::HookBus> {
290 &self.hook_bus
291 }
292
293 pub fn tools_shared(&self) -> Arc<RwLock<ToolRegistry>> {
295 Arc::clone(&self.tools)
296 }
297
298 pub fn model(&self) -> &str {
299 &self.model
300 }
301
302 pub fn http_client(&self) -> &Client {
303 &self.client
304 }
305 pub fn set_thinking_budget(&mut self, budget: u32) {
306 self.thinking_budget = budget;
307 }
308
309 pub fn set_compaction_model(&mut self, model: Option<String>) {
310 self.compaction_model = model;
311 }
312
313 pub fn set_context_window(&mut self, window: Option<u64>) {
314 self.context_window_override = window;
315 }
316
317 pub fn compaction_model(&self) -> &str {
320 self.compaction_model.as_deref().unwrap_or("claude-sonnet-4-6")
321 }
322
323 pub fn context_window(&self) -> u64 {
324 self.context_window_override
325 .unwrap_or_else(|| crate::models::context_window_for_model(&self.model))
326 }
327
328 pub fn apply_config(&mut self, config: &crate::config::SynapsConfig) {
330 if let Some(ref model) = config.model {
331 self.set_model(model.clone());
332 }
333 if let Some(budget) = config.thinking_budget {
334 self.set_thinking_budget(budget);
335 }
336 self.context_window_override = config.context_window;
337 self.compaction_model = config.compaction_model.clone();
338 self.max_tool_output = config.max_tool_output;
339 self.bash_timeout = config.bash_timeout;
340 self.bash_max_timeout = config.bash_max_timeout;
341 self.subagent_timeout = config.subagent_timeout;
342 self.api_retries = config.api_retries;
343 self.telemetry_level = crate::runtime::telemetry::TelemetryLevel::from_str_key(&config.telemetry);
344 self.cache_diagnostics = config.cache_diagnostics;
345 self.cache_ttl = config.cache_ttl;
346 }
347
348 pub fn thinking_budget(&self) -> u32 {
349 self.thinking_budget
350 }
351
352 pub fn max_tool_output(&self) -> usize {
353 self.max_tool_output
354 }
355
356 pub fn bash_timeout(&self) -> u64 {
357 self.bash_timeout
358 }
359
360 pub fn bash_max_timeout(&self) -> u64 {
361 self.bash_max_timeout
362 }
363
364 pub fn subagent_timeout(&self) -> u64 {
365 self.subagent_timeout
366 }
367
368 pub fn api_retries(&self) -> u32 {
369 self.api_retries
370 }
371
372 pub fn set_max_tool_output(&mut self, v: usize) {
373 self.max_tool_output = v;
374 }
375
376 pub fn set_bash_timeout(&mut self, v: u64) {
377 self.bash_timeout = v;
378 }
379
380 pub fn set_bash_max_timeout(&mut self, v: u64) {
381 self.bash_max_timeout = v;
382 }
383
384 pub fn set_subagent_timeout(&mut self, v: u64) {
385 self.subagent_timeout = v;
386 }
387
388 pub fn set_api_retries(&mut self, v: u32) {
389 self.api_retries = v;
390 }
391
392 pub fn telemetry_level(&self) -> crate::runtime::telemetry::TelemetryLevel {
393 self.telemetry_level
394 }
395
396 pub fn set_telemetry_level(&mut self, level: crate::runtime::telemetry::TelemetryLevel) {
397 self.telemetry_level = level;
398 }
399
400 pub fn cache_diagnostics(&self) -> bool {
401 self.cache_diagnostics
402 }
403
404 pub fn set_cache_diagnostics(&mut self, v: bool) {
405 self.cache_diagnostics = v;
406 }
407
408 pub fn cache_ttl(&self) -> crate::core::config::CacheTtl {
409 self.cache_ttl
410 }
411
412 pub fn set_cache_ttl(&mut self, ttl: crate::core::config::CacheTtl) {
416 self.cache_ttl = ttl;
417 }
418
419 pub fn thinking_level(&self) -> &str {
420 crate::core::models::thinking_level_for_budget(self.thinking_budget)
421 }
422
423 pub async fn refresh_if_needed(&self) -> Result<()> {
425 AuthMethods::refresh_if_needed(Arc::clone(&self.auth), &self.client).await
426 }
427
428 pub async fn compact_call(&self, messages: Vec<Value>) -> Result<String> {
434 self.refresh_if_needed().await?;
435
436 use crate::runtime::compaction::COMPACTION_SYSTEM_PROMPT;
437
438 ApiMethods::call_api_simple(
439 &self.auth,
440 &self.client,
441 self.compaction_model(),
442 COMPACTION_SYSTEM_PROMPT,
443 self.thinking_budget,
444 &messages,
445 self.api_retries,
446 ).await
447 }
448
449 pub async fn run_single(&self, prompt: &str) -> Result<String> {
452 self.refresh_if_needed().await?;
454
455 let mut messages = vec![json!({"role": "user", "content": prompt})];
456
457 loop {
458 let response = ApiMethods::call_api(
459 &self.auth,
460 &self.client,
461 &self.model,
462 &*self.tools.read().await,
463 &self.system_prompt,
464 self.thinking_budget,
465 &messages,
466 self.api_retries,
467 &api::ApiOptions {
468 use_1m_context: self.context_window_override == Some(1_000_000),
469 cache_ttl: self.cache_ttl,
470 ttl_downgrade_notified: self.ttl_downgrade_notified.clone(),
471 saw_1h_honored: self.saw_1h_honored.clone(),
472 },
473 ).await?;
474
475 if let Some(content) = response["content"].as_array() {
477 let mut response_text = String::new();
478 let mut tool_uses = Vec::new();
479
480 for item in content {
482 match item["type"].as_str() {
483 Some("text") => {
484 if let Some(text) = item["text"].as_str() {
485 response_text.push_str(text);
486 }
487 }
488 Some("tool_use") => {
489 tool_uses.push(item.clone());
490 }
491 _ => {}
492 }
493 }
494
495 if tool_uses.is_empty() {
497 return Ok(response_text);
498 }
499
500 messages.push(json!({
502 "role": "assistant",
503 "content": content
504 }));
505
506 let mut tool_results = Vec::new();
508
509 if tool_uses.len() == 1 {
510 let tool_use = &tool_uses[0];
512 if let (Some(tool_name), Some(tool_id)) = (
513 tool_use["name"].as_str(),
514 tool_use["id"].as_str()
515 ) {
516 let input = &tool_use["input"];
517 let result = match self.tools.read().await.get(tool_name).cloned() {
518 Some(tool) => {
519 let input = self.tools.read().await.translate_input_for_api_tool(tool_name, input.clone());
520 let runtime_name = self.tools.read().await.runtime_name_for_api(tool_name).to_string();
521 let ctx = crate::ToolContext {
522 channels: crate::tools::ToolChannels {
523 tx_delta: None,
524 tx_events: None,
525 },
526 capabilities: crate::tools::ToolCapabilities {
527 watcher_exit_path: self.watcher_exit_path.clone(),
528 tool_register_tx: None,
529 session_manager: Some(self.session_manager.clone()),
530 subagent_registry: Some(self.subagent_registry.clone()),
531 event_queue: Some(self.event_queue.clone()),
532 secret_prompt: None,
533 },
534 limits: crate::tools::ToolLimits {
535 max_tool_output: self.max_tool_output,
536 bash_timeout: self.bash_timeout,
537 bash_max_timeout: self.bash_max_timeout,
538 subagent_timeout: self.subagent_timeout,
539 },
540 };
541 let decision = resolve_before_tool_call_decision(
542 input.clone(),
543 emit_before_tool_call(
544 &self.hook_bus,
545 tool_name,
546 Some(&runtime_name),
547 input.clone(),
548 ).await,
549 None,
550 false,
551 ).await;
552 if let BeforeToolCallDecision::Block { reason } = decision {
553 format!("Tool call blocked by extension: {}", reason)
554 } else {
555 let BeforeToolCallDecision::Continue { input } = decision else { unreachable!() };
556 let input_for_hook = input.clone();
557 let output = match tool.execute(input, ctx).await {
558 Ok(output) => output,
559 Err(e) => format!("Tool execution failed: {}", e),
560 };
561 let _ = emit_after_tool_call(
562 &self.hook_bus,
563 tool_name,
564 Some(&runtime_name),
565 input_for_hook,
566 output.clone(),
567 ).await;
568 output
569 }
570 }
571 None => format!("Unknown tool: {}", tool_name),
572 };
573 tool_results.push(json!({
574 "type": "tool_result",
575 "tool_use_id": tool_id,
576 "content": HelperMethods::truncate_tool_result(&result, self.max_tool_output)
577 }));
578 }
579 } else {
580 let mut join_set = tokio::task::JoinSet::new();
582
583 let cfg_max_tool_output = self.max_tool_output;
585 let cfg_bash_timeout = self.bash_timeout;
586 let cfg_bash_max_timeout = self.bash_max_timeout;
587 let cfg_subagent_timeout = self.subagent_timeout;
588 let session_mgr = self.session_manager.clone();
589 let cfg_subagent_registry = self.subagent_registry.clone();
590 let cfg_event_queue = self.event_queue.clone();
591 let cfg_hook_bus = self.hook_bus.clone();
592
593 for tool_use in &tool_uses {
594 if let (Some(tool_name), Some(tool_id)) = (
595 tool_use["name"].as_str().map(|s| s.to_string()),
596 tool_use["id"].as_str().map(|s| s.to_string()),
597 ) {
598 let input = tool_use["input"].clone();
599 let tools_snapshot = self.tools.read().await;
600 let input = tools_snapshot.translate_input_for_api_tool(&tool_name, input);
601 let runtime_name = tools_snapshot.runtime_name_for_api(&tool_name).to_string();
602 let tool = tools_snapshot.get(&tool_name).cloned();
603 drop(tools_snapshot);
604 let exit_path = self.watcher_exit_path.clone();
605 let session_mgr_inner = session_mgr.clone();
606 let registry_inner = cfg_subagent_registry.clone();
607 let event_queue_inner = cfg_event_queue.clone();
608 let hook_bus_inner = cfg_hook_bus.clone();
609 let tool_name_for_hook = tool_name.clone();
610 let runtime_name_for_hook = runtime_name.clone();
611
612 join_set.spawn(async move {
613 let result = match tool {
614 Some(t) => {
615 let decision = crate::runtime::resolve_before_tool_call_decision(
616 input.clone(),
617 crate::runtime::emit_before_tool_call(
618 &hook_bus_inner,
619 &tool_name_for_hook,
620 Some(&runtime_name_for_hook),
621 input.clone(),
622 ).await,
623 None,
624 false,
625 ).await;
626 if let crate::runtime::BeforeToolCallDecision::Block { reason } = decision {
627 format!("Tool call blocked by extension: {}", reason)
628 } else {
629 let crate::runtime::BeforeToolCallDecision::Continue { input } = decision else { unreachable!() };
630 let ctx = crate::ToolContext {
631 channels: crate::tools::ToolChannels {
632 tx_delta: None,
633 tx_events: None,
634 },
635 capabilities: crate::tools::ToolCapabilities {
636 watcher_exit_path: exit_path,
637 tool_register_tx: None,
638 session_manager: Some(session_mgr_inner),
639 subagent_registry: Some(registry_inner),
640 event_queue: Some(event_queue_inner),
641 secret_prompt: None,
642 },
643 limits: crate::tools::ToolLimits {
644 max_tool_output: cfg_max_tool_output,
645 bash_timeout: cfg_bash_timeout,
646 bash_max_timeout: cfg_bash_max_timeout,
647 subagent_timeout: cfg_subagent_timeout,
648 },
649 };
650 let input_for_hook = input.clone();
651 let output = match t.execute(input, ctx).await {
652 Ok(output) => output,
653 Err(e) => format!("Tool execution failed: {}", e),
654 };
655 let _ = crate::runtime::emit_after_tool_call(
656 &hook_bus_inner,
657 &tool_name_for_hook,
658 Some(&runtime_name_for_hook),
659 input_for_hook,
660 output.clone(),
661 ).await;
662 output
663 }
664 }
665 None => format!("Unknown tool: {}", tool_name),
666 };
667 (tool_id, result)
668 });
669 }
670 }
671
672 let mut results_map = std::collections::HashMap::new();
674 while let Some(res) = join_set.join_next().await {
675 match res {
676 Ok((tool_id, result)) => {
677 results_map.insert(tool_id, result);
678 }
679 Err(e) => {
680 tracing::error!("Parallel tool task panicked: {}", e);
682 }
683 }
684 }
685
686 for tool_use in &tool_uses {
688 if let Some(tool_id) = tool_use["id"].as_str() {
689 let result = results_map.remove(tool_id)
690 .unwrap_or_else(|| "Tool execution failed: task panicked".to_string());
691 tool_results.push(json!({
692 "type": "tool_result",
693 "tool_use_id": tool_id,
694 "content": HelperMethods::truncate_tool_result(&result, self.max_tool_output)
695 }));
696 }
697 }
698 }
699
700 messages.push(json!({
702 "role": "user",
703 "content": tool_results
704 }));
705
706 } else {
708 return Err(RuntimeError::Tool("Invalid response format".to_string()));
709 }
710 }
711 }
712
713 pub async fn run_stream(&self, prompt: String, cancel: CancellationToken) -> Pin<Box<dyn Stream<Item = StreamEvent> + Send>> {
716 self.run_stream_with_messages(vec![json!({"role": "user", "content": prompt})], cancel, None, None, false).await
717 }
718
719 pub async fn run_stream_with_messages(
723 &self,
724 messages: Vec<Value>,
725 cancel: CancellationToken,
726 steering_rx: Option<mpsc::UnboundedReceiver<String>>,
727 secret_prompt: Option<crate::tools::SecretPromptHandle>,
728 auto_approve_confirms: bool,
729 ) -> Pin<Box<dyn Stream<Item = StreamEvent> + Send>> {
730 let (tx, rx) = mpsc::unbounded_channel();
731
732 if let Err(e) = self.refresh_if_needed().await {
734 let _ = tx.send(StreamEvent::Session(SessionEvent::Error(e.to_string())));
735 let _ = tx.send(StreamEvent::Session(SessionEvent::Done));
736 return Box::pin(UnboundedReceiverStream::new(rx));
737 }
738
739 let auth = Arc::clone(&self.auth);
742 let client = self.client.clone();
743 let model = self.model.clone();
744 let tools = self.tools.clone();
745 let system_prompt = self.system_prompt.clone();
746 let thinking_budget = self.thinking_budget;
747 let watcher_exit_path = self.watcher_exit_path.clone();
748 let max_tool_output = self.max_tool_output;
749 let bash_timeout = self.bash_timeout;
750 let bash_max_timeout = self.bash_max_timeout;
751 let subagent_timeout = self.subagent_timeout;
752 let api_retries = self.api_retries;
753 let session_manager = self.session_manager.clone();
754 let subagent_registry = self.subagent_registry.clone();
758 let event_queue = self.event_queue.clone();
759 let options = api::ApiOptions {
760 use_1m_context: self.context_window_override == Some(1_000_000),
761 cache_ttl: self.cache_ttl,
762 ttl_downgrade_notified: self.ttl_downgrade_notified.clone(),
763 saw_1h_honored: self.saw_1h_honored.clone(),
764 };
765
766 let session = crate::runtime::stream::StreamSession {
767 auth, client, options, api_retries,
768 model, tools, system_prompt, thinking_budget,
769 tx: tx.clone(), cancel, steering_rx,
770 watcher_exit_path, max_tool_output,
771 bash_timeout, bash_max_timeout, subagent_timeout,
772 session_manager, subagent_registry, event_queue, secret_prompt,
773 hook_bus: self.hook_bus.clone(),
774 auto_approve_confirms,
775 telemetry_level: self.telemetry_level,
776 };
777
778 tokio::spawn(async move {
779 if let Err(e) = StreamMethods::run_stream_internal(session, messages).await {
780 let _ = tx.send(StreamEvent::Session(SessionEvent::Error(e.to_string())));
781 }
782 let _ = tx.send(StreamEvent::Session(SessionEvent::Done));
783 });
784
785 Box::pin(UnboundedReceiverStream::new(rx))
786 }
787}
788
789impl Clone for Runtime {
790 fn clone(&self) -> Self {
791 Self {
792 client: self.client.clone(),
793 auth: Arc::clone(&self.auth),
794 model: self.model.clone(),
795 tools: self.tools.clone(),
796 system_prompt: self.system_prompt.clone(),
797 thinking_budget: self.thinking_budget,
798 context_window_override: self.context_window_override,
799 compaction_model: self.compaction_model.clone(),
800 subagent_registry: self.subagent_registry.clone(),
801 event_queue: self.event_queue.clone(),
802 watcher_exit_path: self.watcher_exit_path.clone(),
803 max_tool_output: self.max_tool_output,
804 bash_timeout: self.bash_timeout,
805 bash_max_timeout: self.bash_max_timeout,
806 subagent_timeout: self.subagent_timeout,
807 api_retries: self.api_retries,
808 telemetry_level: self.telemetry_level,
809 cache_diagnostics: self.cache_diagnostics,
810 cache_ttl: self.cache_ttl,
811 ttl_downgrade_notified: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
815 saw_1h_honored: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
816 last_msg_id: Arc::new(Mutex::new(None)),
819 session_manager: self.session_manager.clone(),
820 hook_bus: self.hook_bus.clone(),
821 reaper_handle: None, reaper_cancel: None, }
824 }
825}
826
827#[cfg(test)]
828mod tests {
829 use super::*;
830
831 #[tokio::test]
832 async fn confirm_without_prompt_fails_closed() {
833 let result = resolve_before_tool_call_result(
834 crate::extensions::hooks::events::HookResult::Confirm {
835 message: "Run deploy?".into(),
836 },
837 None,
838 false,
839 )
840 .await;
841
842 assert!(matches!(
843 result,
844 crate::extensions::hooks::events::HookResult::Block { reason }
845 if reason.contains("requires confirmation") && reason.contains("Run deploy?")
846 ));
847 }
848
849 #[tokio::test]
850 async fn modify_result_replaces_tool_input() {
851 let result = resolve_before_tool_call_decision(
852 serde_json::json!({"command":"rm -rf /"}),
853 crate::extensions::hooks::events::HookResult::Modify {
854 input: serde_json::json!({"command":"echo safe"}),
855 },
856 None,
857 false,
858 ).await;
859
860 match result {
861 BeforeToolCallDecision::Continue { input } => {
862 assert_eq!(input, serde_json::json!({"command":"echo safe"}));
863 }
864 BeforeToolCallDecision::Block { reason } => panic!("unexpected block: {reason}"),
865 }
866 }
867
868 #[tokio::test]
869 async fn confirm_prompt_yes_continues() {
870 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
871 let handle = crate::tools::SecretPromptHandle::new(tx);
872
873 let task = tokio::spawn(async move {
874 let request = rx.recv().await.expect("confirm prompt request");
875 assert_eq!(request.title, "Confirm tool call");
876 assert!(request.prompt.contains("Run deploy?"));
877 let _ = request.response_tx.send(Some("yes".to_string()));
878 });
879
880 let result = resolve_before_tool_call_result(
881 crate::extensions::hooks::events::HookResult::Confirm {
882 message: "Run deploy?".into(),
883 },
884 Some(&handle),
885 false,
886 )
887 .await;
888
889 task.await.unwrap();
890 assert!(matches!(
891 result,
892 crate::extensions::hooks::events::HookResult::Continue
893 ));
894 }
895
896 #[tokio::test]
897 async fn confirm_prompt_non_yes_blocks() {
898 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
899 let handle = crate::tools::SecretPromptHandle::new(tx);
900
901 let task = tokio::spawn(async move {
902 let request = rx.recv().await.expect("confirm prompt request");
903 let _ = request.response_tx.send(Some("no".to_string()));
904 });
905
906 let result = resolve_before_tool_call_result(
907 crate::extensions::hooks::events::HookResult::Confirm {
908 message: "Run deploy?".into(),
909 },
910 Some(&handle),
911 false,
912 )
913 .await;
914
915 task.await.unwrap();
916 assert!(matches!(
917 result,
918 crate::extensions::hooks::events::HookResult::Block { reason }
919 if reason.contains("confirmation denied")
920 ));
921 }
922
923 #[test]
924 fn test_max_tokens_for_model() {
925 assert_eq!(HelperMethods::max_tokens_for_model("claude-opus-4-6"), 128000);
927 assert_eq!(HelperMethods::max_tokens_for_model("opus-something"), 128000);
928
929 assert_eq!(HelperMethods::max_tokens_for_model("claude-sonnet-4-20250514"), 64000);
931 assert_eq!(HelperMethods::max_tokens_for_model("haiku"), 64000);
932 assert_eq!(HelperMethods::max_tokens_for_model("claude-3-haiku"), 64000);
933 assert_eq!(HelperMethods::max_tokens_for_model("some-other-model"), 64000);
934
935 assert_eq!(HelperMethods::max_tokens_for_model(""), 64000);
937 assert_eq!(HelperMethods::max_tokens_for_model("OPUS"), 64000); assert_eq!(HelperMethods::max_tokens_for_model("model-opus-end"), 128000); }
940
941 #[test]
942 fn test_truncate_tool_result() {
943 let default_max = 30000;
944
945 let short = "This is a short string.";
947 assert_eq!(HelperMethods::truncate_tool_result(short, default_max), short);
948
949 let exact = "x".repeat(30000);
951 assert_eq!(HelperMethods::truncate_tool_result(&exact, default_max), exact);
952
953 let too_long = "x".repeat(30001);
955 let truncated = HelperMethods::truncate_tool_result(&too_long, default_max);
956
957 assert!(truncated.starts_with(&"x".repeat(30000)));
959
960 assert!(truncated.contains("[truncated — 30001 total chars, showing first 30000]"));
962
963 assert!(truncated.len() > 30000);
965
966 let very_long = "a".repeat(50000);
968 let truncated_very_long = HelperMethods::truncate_tool_result(&very_long, default_max);
969 assert!(truncated_very_long.contains("[truncated — 50000 total chars, showing first 30000]"));
970 assert!(truncated_very_long.starts_with(&"a".repeat(30000)));
971
972 let custom_truncated = HelperMethods::truncate_tool_result(&very_long, 100);
974 assert!(custom_truncated.starts_with(&"a".repeat(100)));
975 assert!(custom_truncated.contains("[truncated — 50000 total chars, showing first 100]"));
976 }
977
978 #[test]
979 fn test_thinking_level_ranges() {
980 use crate::core::models::thinking_level_for_budget;
981
982 assert_eq!(thinking_level_for_budget(0), "adaptive");
984
985 assert_eq!(thinking_level_for_budget(1), "low");
987 assert_eq!(thinking_level_for_budget(1024), "low");
988 assert_eq!(thinking_level_for_budget(2048), "low");
989
990 assert_eq!(thinking_level_for_budget(2049), "medium");
992 assert_eq!(thinking_level_for_budget(3000), "medium");
993 assert_eq!(thinking_level_for_budget(4096), "medium");
994
995 assert_eq!(thinking_level_for_budget(4097), "high");
997 assert_eq!(thinking_level_for_budget(8192), "high");
998 assert_eq!(thinking_level_for_budget(16384), "high");
999
1000 assert_eq!(thinking_level_for_budget(16385), "xhigh");
1002 assert_eq!(thinking_level_for_budget(32768), "xhigh");
1003 assert_eq!(thinking_level_for_budget(100000), "xhigh");
1004 }
1005}