1use chrono::Utc;
2use futures::StreamExt;
3use serde_json::{json, Value};
4use std::collections::{hash_map::DefaultHasher, HashMap, HashSet};
5use std::hash::{Hash, Hasher};
6use std::path::{Path, PathBuf};
7use std::time::Duration;
8use tandem_observability::{emit_event, ObservabilityEvent, ProcessKind};
9use tandem_providers::{ChatMessage, ProviderRegistry, StreamChunk, TokenUsage};
10use tandem_tools::{validate_tool_schemas, ToolRegistry};
11use tandem_types::{
12 ContextMode, EngineEvent, HostRuntimeContext, Message, MessagePart, MessagePartInput,
13 MessageRole, ModelSpec, SendMessageRequest, SharedToolProgressSink, ToolMode, ToolSchema,
14};
15use tandem_wire::WireMessagePart;
16use tokio_util::sync::CancellationToken;
17use tracing::Level;
18
19mod loop_guards;
20mod loop_tuning;
21mod prewrite_gate;
22mod prewrite_mode;
23mod prompt_context;
24mod prompt_execution;
25mod prompt_helpers;
26mod prompt_runtime;
27mod tool_execution;
28mod tool_output;
29mod tool_parsing;
30mod types;
31
32use loop_guards::{
33 duplicate_signature_limit_for, tool_budget_for, websearch_duplicate_signature_limit,
34};
35use loop_tuning::{
36 max_tool_iterations, permission_wait_timeout_ms, prompt_context_hook_timeout_ms,
37 provider_stream_connect_timeout_ms, provider_stream_idle_timeout_ms,
38 strict_write_retry_max_attempts, tool_exec_timeout_ms,
39};
40use prewrite_gate::{evaluate_prewrite_gate, PrewriteProgress};
41use prewrite_mode::*;
42use prompt_context::{
43 format_context_mode, mcp_catalog_in_system_prompt_enabled, semantic_tool_retrieval_enabled,
44 semantic_tool_retrieval_k, tandem_runtime_system_prompt,
45};
46use prompt_helpers::*;
47use prompt_runtime::*;
48use tool_output::*;
49use tool_parsing::*;
50use types::{EngineToolProgressSink, StreamedToolCall, WritePathRecoveryMode};
51
52pub use prewrite_mode::prewrite_repair_retry_max_attempts;
53pub use types::{
54 PromptContextHook, PromptContextHookContext, SpawnAgentHook, SpawnAgentToolContext,
55 SpawnAgentToolResult, ToolPolicyContext, ToolPolicyDecision, ToolPolicyHook,
56};
57
58use crate::tool_router::{
59 classify_intent, default_mode_name, is_short_simple_prompt, select_tool_subset,
60 should_escalate_auto_tools, tool_router_enabled, ToolIntent, ToolRoutingDecision,
61};
62use crate::{
63 any_policy_matches, derive_session_title_from_prompt, title_needs_repair,
64 tool_name_matches_policy, AgentDefinition, AgentRegistry, CancellationRegistry, EventBus,
65 PermissionAction, PermissionManager, PluginRegistry, Storage,
66};
67use crate::{
68 build_tool_effect_ledger_record, finalize_mutation_checkpoint_record,
69 mutation_checkpoint_event, prepare_mutation_checkpoint, tool_effect_ledger_event,
70 MutationCheckpointOutcome, ToolEffectLedgerPhase, ToolEffectLedgerStatus,
71};
72use tokio::sync::RwLock;
73
74#[derive(Clone)]
75pub struct EngineLoop {
76 storage: std::sync::Arc<Storage>,
77 event_bus: EventBus,
78 providers: ProviderRegistry,
79 plugins: PluginRegistry,
80 agents: AgentRegistry,
81 permissions: PermissionManager,
82 tools: ToolRegistry,
83 cancellations: CancellationRegistry,
84 host_runtime_context: HostRuntimeContext,
85 workspace_overrides: std::sync::Arc<RwLock<HashMap<String, u64>>>,
86 session_allowed_tools: std::sync::Arc<RwLock<HashMap<String, Vec<String>>>>,
87 session_auto_approve_permissions: std::sync::Arc<RwLock<HashMap<String, bool>>>,
88 spawn_agent_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn SpawnAgentHook>>>>,
89 tool_policy_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn ToolPolicyHook>>>>,
90 prompt_context_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn PromptContextHook>>>>,
91}
92
93impl EngineLoop {
94 #[allow(clippy::too_many_arguments)]
95 pub fn new(
96 storage: std::sync::Arc<Storage>,
97 event_bus: EventBus,
98 providers: ProviderRegistry,
99 plugins: PluginRegistry,
100 agents: AgentRegistry,
101 permissions: PermissionManager,
102 tools: ToolRegistry,
103 cancellations: CancellationRegistry,
104 host_runtime_context: HostRuntimeContext,
105 ) -> Self {
106 Self {
107 storage,
108 event_bus,
109 providers,
110 plugins,
111 agents,
112 permissions,
113 tools,
114 cancellations,
115 host_runtime_context,
116 workspace_overrides: std::sync::Arc::new(RwLock::new(HashMap::new())),
117 session_allowed_tools: std::sync::Arc::new(RwLock::new(HashMap::new())),
118 session_auto_approve_permissions: std::sync::Arc::new(RwLock::new(HashMap::new())),
119 spawn_agent_hook: std::sync::Arc::new(RwLock::new(None)),
120 tool_policy_hook: std::sync::Arc::new(RwLock::new(None)),
121 prompt_context_hook: std::sync::Arc::new(RwLock::new(None)),
122 }
123 }
124
125 pub async fn set_spawn_agent_hook(&self, hook: std::sync::Arc<dyn SpawnAgentHook>) {
126 *self.spawn_agent_hook.write().await = Some(hook);
127 }
128
129 pub async fn set_tool_policy_hook(&self, hook: std::sync::Arc<dyn ToolPolicyHook>) {
130 *self.tool_policy_hook.write().await = Some(hook);
131 }
132
133 pub async fn set_prompt_context_hook(&self, hook: std::sync::Arc<dyn PromptContextHook>) {
134 *self.prompt_context_hook.write().await = Some(hook);
135 }
136
137 pub async fn set_session_allowed_tools(&self, session_id: &str, allowed_tools: Vec<String>) {
138 let normalized = allowed_tools
139 .into_iter()
140 .map(|tool| normalize_tool_name(&tool))
141 .filter(|tool| !tool.trim().is_empty())
142 .collect::<Vec<_>>();
143 self.session_allowed_tools
144 .write()
145 .await
146 .insert(session_id.to_string(), normalized);
147 }
148
149 pub async fn clear_session_allowed_tools(&self, session_id: &str) {
150 self.session_allowed_tools.write().await.remove(session_id);
151 }
152
153 pub async fn get_session_allowed_tools(&self, session_id: &str) -> Vec<String> {
154 self.session_allowed_tools
155 .read()
156 .await
157 .get(session_id)
158 .cloned()
159 .unwrap_or_default()
160 }
161
162 pub async fn set_session_auto_approve_permissions(&self, session_id: &str, enabled: bool) {
163 if enabled {
164 self.session_auto_approve_permissions
165 .write()
166 .await
167 .insert(session_id.to_string(), true);
168 } else {
169 self.session_auto_approve_permissions
170 .write()
171 .await
172 .remove(session_id);
173 }
174 }
175
176 pub async fn clear_session_auto_approve_permissions(&self, session_id: &str) {
177 self.session_auto_approve_permissions
178 .write()
179 .await
180 .remove(session_id);
181 }
182
183 pub async fn grant_workspace_override_for_session(
184 &self,
185 session_id: &str,
186 ttl_seconds: u64,
187 ) -> u64 {
188 const MAX_WORKSPACE_OVERRIDE_TTL_SECONDS: u64 = 600; let capped_ttl = ttl_seconds.min(MAX_WORKSPACE_OVERRIDE_TTL_SECONDS);
191 if capped_ttl < ttl_seconds {
192 tracing::warn!(
193 session_id = %session_id,
194 requested_ttl_s = %ttl_seconds,
195 capped_ttl_s = %capped_ttl,
196 "workspace override TTL capped to maximum allowed value"
197 );
198 }
199 let expires_at = chrono::Utc::now()
200 .timestamp_millis()
201 .max(0)
202 .saturating_add((capped_ttl as i64).saturating_mul(1000))
203 as u64;
204 self.workspace_overrides
205 .write()
206 .await
207 .insert(session_id.to_string(), expires_at);
208 self.event_bus.publish(EngineEvent::new(
209 "workspace.override.activated",
210 json!({
211 "sessionID": session_id,
212 "requestedTtlSeconds": ttl_seconds,
213 "cappedTtlSeconds": capped_ttl,
214 "expiresAt": expires_at,
215 }),
216 ));
217 expires_at
218 }
219
220 pub async fn run_prompt_async(
221 &self,
222 session_id: String,
223 req: SendMessageRequest,
224 ) -> anyhow::Result<()> {
225 self.run_prompt_async_with_context(session_id, req, None)
226 .await
227 }
228
229 pub async fn run_oneshot(&self, prompt: String) -> anyhow::Result<String> {
230 self.providers.default_complete(&prompt).await
231 }
232
233 pub async fn run_oneshot_for_provider(
234 &self,
235 prompt: String,
236 provider_id: Option<&str>,
237 ) -> anyhow::Result<String> {
238 self.providers
239 .complete_for_provider(provider_id, &prompt, None)
240 .await
241 }
242
243 #[allow(clippy::too_many_arguments)]
244 async fn execute_tool_with_permission(
245 &self,
246 session_id: &str,
247 message_id: &str,
248 tool: String,
249 args: Value,
250 initial_tool_call_id: Option<String>,
251 equipped_skills: Option<&[String]>,
252 latest_user_text: &str,
253 write_required: bool,
254 latest_assistant_context: Option<&str>,
255 cancel: CancellationToken,
256 ) -> anyhow::Result<Option<String>> {
257 let tool = normalize_tool_name(&tool);
258 let raw_args = args.clone();
259 let publish_tool_effect = |tool_call_id: Option<&str>,
260 phase: ToolEffectLedgerPhase,
261 status: ToolEffectLedgerStatus,
262 args: &Value,
263 metadata: Option<&Value>,
264 output: Option<&str>,
265 error: Option<&str>| {
266 self.event_bus
267 .publish(tool_effect_ledger_event(build_tool_effect_ledger_record(
268 session_id,
269 message_id,
270 tool_call_id,
271 &tool,
272 phase,
273 status,
274 args,
275 metadata,
276 output,
277 error,
278 )));
279 };
280 let normalized = normalize_tool_args_with_mode(
281 &tool,
282 args,
283 latest_user_text,
284 latest_assistant_context.unwrap_or_default(),
285 if write_required {
286 WritePathRecoveryMode::OutputTargetOnly
287 } else {
288 WritePathRecoveryMode::Heuristic
289 },
290 );
291 let raw_args_preview = truncate_text(&raw_args.to_string(), 2_000);
292 let normalized_args_preview = truncate_text(&normalized.args.to_string(), 2_000);
293 self.event_bus.publish(EngineEvent::new(
294 "tool.args.normalized",
295 json!({
296 "sessionID": session_id,
297 "messageID": message_id,
298 "tool": tool,
299 "argsSource": normalized.args_source,
300 "argsIntegrity": normalized.args_integrity,
301 "rawArgsState": normalized.raw_args_state.as_str(),
302 "rawArgsPreview": raw_args_preview,
303 "normalizedArgsPreview": normalized_args_preview,
304 "query": normalized.query,
305 "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
306 "requestID": Value::Null
307 }),
308 ));
309 if normalized.args_integrity == "recovered" {
310 self.event_bus.publish(EngineEvent::new(
311 "tool.args.recovered",
312 json!({
313 "sessionID": session_id,
314 "messageID": message_id,
315 "tool": tool,
316 "argsSource": normalized.args_source,
317 "rawArgsPreview": raw_args_preview,
318 "normalizedArgsPreview": normalized_args_preview,
319 "query": normalized.query,
320 "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
321 "requestID": Value::Null
322 }),
323 ));
324 }
325 if normalized.missing_terminal {
326 let missing_reason = normalized
327 .missing_terminal_reason
328 .clone()
329 .unwrap_or_else(|| "TOOL_ARGUMENTS_MISSING".to_string());
330 let latest_user_preview = truncate_text(latest_user_text, 500);
331 let latest_assistant_preview =
332 truncate_text(latest_assistant_context.unwrap_or_default(), 500);
333 self.event_bus.publish(EngineEvent::new(
334 "tool.args.missing_terminal",
335 json!({
336 "sessionID": session_id,
337 "messageID": message_id,
338 "tool": tool,
339 "argsSource": normalized.args_source,
340 "argsIntegrity": normalized.args_integrity,
341 "rawArgsState": normalized.raw_args_state.as_str(),
342 "requestID": Value::Null,
343 "error": missing_reason,
344 "rawArgsPreview": raw_args_preview,
345 "normalizedArgsPreview": normalized_args_preview,
346 "latestUserPreview": latest_user_preview,
347 "latestAssistantPreview": latest_assistant_preview,
348 }),
349 ));
350 if tool == "write" {
351 tracing::warn!(
352 session_id = %session_id,
353 message_id = %message_id,
354 tool = %tool,
355 reason = %missing_reason,
356 args_source = %normalized.args_source,
357 args_integrity = %normalized.args_integrity,
358 raw_args_state = %normalized.raw_args_state.as_str(),
359 raw_args = %raw_args_preview,
360 normalized_args = %normalized_args_preview,
361 latest_user = %latest_user_preview,
362 latest_assistant = %latest_assistant_preview,
363 "write tool arguments missing terminal field"
364 );
365 }
366 let best_effort_args = persisted_failed_tool_args(&raw_args, &normalized.args);
367 let mut failed_part = WireMessagePart::tool_result(
368 session_id,
369 message_id,
370 tool.clone(),
371 Some(best_effort_args),
372 json!(null),
373 );
374 failed_part.state = Some("failed".to_string());
375 let surfaced_reason =
376 provider_specific_write_reason(&tool, &missing_reason, normalized.raw_args_state)
377 .unwrap_or_else(|| missing_reason.clone());
378 failed_part.error = Some(surfaced_reason.clone());
379 self.event_bus.publish(EngineEvent::new(
380 "message.part.updated",
381 json!({"part": failed_part}),
382 ));
383 publish_tool_effect(
384 None,
385 ToolEffectLedgerPhase::Outcome,
386 ToolEffectLedgerStatus::Blocked,
387 &normalized.args,
388 None,
389 None,
390 Some(&surfaced_reason),
391 );
392 return Ok(Some(surfaced_reason));
393 }
394
395 let args = match enforce_skill_scope(&tool, normalized.args, equipped_skills) {
396 Ok(args) => args,
397 Err(message) => {
398 publish_tool_effect(
399 None,
400 ToolEffectLedgerPhase::Outcome,
401 ToolEffectLedgerStatus::Blocked,
402 &raw_args,
403 None,
404 None,
405 Some(&message),
406 );
407 return Ok(Some(message));
408 }
409 };
410 if let Some(allowed_tools) = self
411 .session_allowed_tools
412 .read()
413 .await
414 .get(session_id)
415 .cloned()
416 {
417 if !allowed_tools.is_empty() && !any_policy_matches(&allowed_tools, &tool) {
418 let reason = format!("Tool `{tool}` is not allowed for this run.");
419 publish_tool_effect(
420 None,
421 ToolEffectLedgerPhase::Outcome,
422 ToolEffectLedgerStatus::Blocked,
423 &args,
424 None,
425 None,
426 Some(&reason),
427 );
428 return Ok(Some(reason));
429 }
430 }
431 if let Some(hook) = self.tool_policy_hook.read().await.clone() {
432 let decision = hook
433 .evaluate_tool(ToolPolicyContext {
434 session_id: session_id.to_string(),
435 message_id: message_id.to_string(),
436 tool: tool.clone(),
437 args: args.clone(),
438 })
439 .await?;
440 if !decision.allowed {
441 let reason = decision
442 .reason
443 .unwrap_or_else(|| "Tool denied by runtime policy".to_string());
444 let mut blocked_part = WireMessagePart::tool_result(
445 session_id,
446 message_id,
447 tool.clone(),
448 Some(args.clone()),
449 json!(null),
450 );
451 blocked_part.state = Some("failed".to_string());
452 blocked_part.error = Some(reason.clone());
453 self.event_bus.publish(EngineEvent::new(
454 "message.part.updated",
455 json!({"part": blocked_part}),
456 ));
457 publish_tool_effect(
458 None,
459 ToolEffectLedgerPhase::Outcome,
460 ToolEffectLedgerStatus::Blocked,
461 &args,
462 None,
463 None,
464 Some(&reason),
465 );
466 return Ok(Some(reason));
467 }
468 }
469 let mut tool_call_id: Option<String> = initial_tool_call_id;
470 if let Some(violation) = self
471 .workspace_sandbox_violation(session_id, &tool, &args)
472 .await
473 {
474 let mut blocked_part = WireMessagePart::tool_result(
475 session_id,
476 message_id,
477 tool.clone(),
478 Some(args.clone()),
479 json!(null),
480 );
481 blocked_part.state = Some("failed".to_string());
482 blocked_part.error = Some(violation.clone());
483 self.event_bus.publish(EngineEvent::new(
484 "message.part.updated",
485 json!({"part": blocked_part}),
486 ));
487 publish_tool_effect(
488 tool_call_id.as_deref(),
489 ToolEffectLedgerPhase::Outcome,
490 ToolEffectLedgerStatus::Blocked,
491 &args,
492 None,
493 None,
494 Some(&violation),
495 );
496 return Ok(Some(violation));
497 }
498 let rule = self
499 .plugins
500 .permission_override(&tool)
501 .await
502 .unwrap_or(self.permissions.evaluate(&tool, &tool).await);
503 if matches!(rule, PermissionAction::Deny) {
504 let reason = format!("Permission denied for tool `{tool}` by policy.");
505 publish_tool_effect(
506 tool_call_id.as_deref(),
507 ToolEffectLedgerPhase::Outcome,
508 ToolEffectLedgerStatus::Blocked,
509 &args,
510 None,
511 None,
512 Some(&reason),
513 );
514 return Ok(Some(reason));
515 }
516
517 let mut effective_args = args.clone();
518 if matches!(rule, PermissionAction::Ask) {
519 let auto_approve_permissions = self
520 .session_auto_approve_permissions
521 .read()
522 .await
523 .get(session_id)
524 .copied()
525 .unwrap_or(false);
526 if auto_approve_permissions {
527 if normalized.args_integrity == "recovered" && is_workspace_write_tool(&tool) {
532 tracing::warn!(
533 session_id = %session_id,
534 message_id = %message_id,
535 tool = %tool,
536 args_source = %normalized.args_source,
537 "auto-approve granted for mutating tool with recovered args; verify intent"
538 );
539 self.event_bus.publish(EngineEvent::new(
540 "tool.args.recovered_write_auto_approved",
541 json!({
542 "sessionID": session_id,
543 "messageID": message_id,
544 "tool": tool,
545 "argsSource": normalized.args_source,
546 "argsIntegrity": normalized.args_integrity,
547 }),
548 ));
549 }
550 self.event_bus.publish(EngineEvent::new(
551 "permission.auto_approved",
552 json!({
553 "sessionID": session_id,
554 "messageID": message_id,
555 "tool": tool,
556 }),
557 ));
558 effective_args = args;
559 } else {
560 let pending = self
561 .permissions
562 .ask_for_session_with_context(
563 Some(session_id),
564 &tool,
565 args.clone(),
566 Some(crate::PermissionArgsContext {
567 args_source: normalized.args_source.clone(),
568 args_integrity: normalized.args_integrity.clone(),
569 query: normalized.query.clone(),
570 }),
571 )
572 .await;
573 let mut pending_part = WireMessagePart::tool_invocation(
574 session_id,
575 message_id,
576 tool.clone(),
577 args.clone(),
578 );
579 pending_part.id = Some(pending.id.clone());
580 tool_call_id = Some(pending.id.clone());
581 pending_part.state = Some("pending".to_string());
582 self.event_bus.publish(EngineEvent::new(
583 "message.part.updated",
584 json!({"part": pending_part}),
585 ));
586 let reply = self
587 .permissions
588 .wait_for_reply_with_timeout(
589 &pending.id,
590 cancel.clone(),
591 Some(Duration::from_millis(permission_wait_timeout_ms() as u64)),
592 )
593 .await;
594 let (reply, timed_out) = reply;
595 if cancel.is_cancelled() {
596 return Ok(None);
597 }
598 if timed_out {
599 let timeout_ms = permission_wait_timeout_ms();
600 self.event_bus.publish(EngineEvent::new(
601 "permission.wait.timeout",
602 json!({
603 "sessionID": session_id,
604 "messageID": message_id,
605 "tool": tool,
606 "requestID": pending.id,
607 "timeoutMs": timeout_ms,
608 }),
609 ));
610 let mut timeout_part = WireMessagePart::tool_result(
611 session_id,
612 message_id,
613 tool.clone(),
614 Some(args.clone()),
615 json!(null),
616 );
617 timeout_part.id = Some(pending.id);
618 timeout_part.state = Some("failed".to_string());
619 timeout_part.error = Some(format!(
620 "Permission request timed out after {} ms",
621 timeout_ms
622 ));
623 self.event_bus.publish(EngineEvent::new(
624 "message.part.updated",
625 json!({"part": timeout_part}),
626 ));
627 let timeout_reason = format!(
628 "Permission request for tool `{tool}` timed out after {timeout_ms} ms."
629 );
630 publish_tool_effect(
631 tool_call_id.as_deref(),
632 ToolEffectLedgerPhase::Outcome,
633 ToolEffectLedgerStatus::Blocked,
634 &args,
635 None,
636 None,
637 Some(&timeout_reason),
638 );
639 return Ok(Some(format!(
640 "Permission request for tool `{tool}` timed out after {timeout_ms} ms."
641 )));
642 }
643 let approved = matches!(reply.as_deref(), Some("once" | "always" | "allow"));
644 if !approved {
645 let mut denied_part = WireMessagePart::tool_result(
646 session_id,
647 message_id,
648 tool.clone(),
649 Some(args.clone()),
650 json!(null),
651 );
652 denied_part.id = Some(pending.id);
653 denied_part.state = Some("denied".to_string());
654 denied_part.error = Some("Permission denied by user".to_string());
655 self.event_bus.publish(EngineEvent::new(
656 "message.part.updated",
657 json!({"part": denied_part}),
658 ));
659 let denied_reason = format!("Permission denied for tool `{tool}` by user.");
660 publish_tool_effect(
661 tool_call_id.as_deref(),
662 ToolEffectLedgerPhase::Outcome,
663 ToolEffectLedgerStatus::Blocked,
664 &args,
665 None,
666 None,
667 Some(&denied_reason),
668 );
669 return Ok(Some(format!(
670 "Permission denied for tool `{tool}` by user."
671 )));
672 }
673 effective_args = args;
674 }
675 }
676
677 let mut args = self.plugins.inject_tool_args(&tool, effective_args).await;
678 let session = self.storage.get_session(session_id).await;
679 if let (Some(obj), Some(session)) = (args.as_object_mut(), session.as_ref()) {
680 obj.insert(
681 "__session_id".to_string(),
682 Value::String(session_id.to_string()),
683 );
684 if let Some(project_id) = session.project_id.clone() {
685 obj.insert(
686 "__project_id".to_string(),
687 Value::String(project_id.clone()),
688 );
689 if project_id.starts_with("channel-public::") {
690 obj.insert(
691 "__memory_max_visible_scope".to_string(),
692 Value::String("project".to_string()),
693 );
694 }
695 }
696 }
697 let tool_context = self.resolve_tool_execution_context(session_id).await;
698 if let Some((workspace_root, effective_cwd, project_id)) = tool_context.as_ref() {
699 args = rewrite_workspace_alias_tool_args(&tool, args, workspace_root);
700 if let Some(obj) = args.as_object_mut() {
701 obj.insert(
702 "__workspace_root".to_string(),
703 Value::String(workspace_root.clone()),
704 );
705 obj.insert(
706 "__effective_cwd".to_string(),
707 Value::String(effective_cwd.clone()),
708 );
709 obj.insert(
710 "__session_id".to_string(),
711 Value::String(session_id.to_string()),
712 );
713 if let Some(project_id) = project_id.clone() {
714 obj.insert("__project_id".to_string(), Value::String(project_id));
715 }
716 }
717 tracing::info!(
718 "tool execution context session_id={} tool={} workspace_root={} effective_cwd={} project_id={}",
719 session_id,
720 tool,
721 workspace_root,
722 effective_cwd,
723 project_id.clone().unwrap_or_default()
724 );
725 }
726 let mut invoke_part =
727 WireMessagePart::tool_invocation(session_id, message_id, tool.clone(), args.clone());
728 if let Some(call_id) = tool_call_id.clone() {
729 invoke_part.id = Some(call_id);
730 }
731 let invoke_part_id = invoke_part.id.clone();
732 self.event_bus.publish(EngineEvent::new(
733 "message.part.updated",
734 json!({"part": invoke_part}),
735 ));
736 let args_for_side_events = args.clone();
737 let mutation_checkpoint = prepare_mutation_checkpoint(&tool, &args_for_side_events);
738 let progress_sink: SharedToolProgressSink = std::sync::Arc::new(EngineToolProgressSink {
739 event_bus: self.event_bus.clone(),
740 session_id: session_id.to_string(),
741 message_id: message_id.to_string(),
742 tool_call_id: invoke_part_id.clone(),
743 source_tool: tool.clone(),
744 });
745 publish_tool_effect(
746 invoke_part_id.as_deref(),
747 ToolEffectLedgerPhase::Invocation,
748 ToolEffectLedgerStatus::Started,
749 &args_for_side_events,
750 None,
751 None,
752 None,
753 );
754 let publish_mutation_checkpoint =
755 |tool_call_id: Option<&str>, outcome: MutationCheckpointOutcome| {
756 if let Some(baseline) = mutation_checkpoint.as_ref() {
757 self.event_bus.publish(mutation_checkpoint_event(
758 finalize_mutation_checkpoint_record(
759 session_id,
760 message_id,
761 tool_call_id,
762 baseline,
763 outcome,
764 ),
765 ));
766 }
767 };
768 if tool == "spawn_agent" {
769 let hook = self.spawn_agent_hook.read().await.clone();
770 if let Some(hook) = hook {
771 let spawned = hook
772 .spawn_agent(SpawnAgentToolContext {
773 session_id: session_id.to_string(),
774 message_id: message_id.to_string(),
775 tool_call_id: invoke_part_id.clone(),
776 args: args_for_side_events.clone(),
777 })
778 .await?;
779 let output = self.plugins.transform_tool_output(spawned.output).await;
780 let output = truncate_text(&output, 16_000);
781 emit_tool_side_events(
782 self.storage.clone(),
783 &self.event_bus,
784 ToolSideEventContext {
785 session_id,
786 message_id,
787 tool: &tool,
788 args: &args_for_side_events,
789 metadata: &spawned.metadata,
790 workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
791 effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
792 },
793 )
794 .await;
795 let mut result_part = WireMessagePart::tool_result(
796 session_id,
797 message_id,
798 tool.clone(),
799 Some(args_for_side_events.clone()),
800 json!(output.clone()),
801 );
802 result_part.id = invoke_part_id.clone();
803 self.event_bus.publish(EngineEvent::new(
804 "message.part.updated",
805 json!({"part": result_part}),
806 ));
807 publish_tool_effect(
808 invoke_part_id.as_deref(),
809 ToolEffectLedgerPhase::Outcome,
810 ToolEffectLedgerStatus::Succeeded,
811 &args_for_side_events,
812 Some(&spawned.metadata),
813 Some(&output),
814 None,
815 );
816 publish_mutation_checkpoint(
817 invoke_part_id.as_deref(),
818 MutationCheckpointOutcome::Succeeded,
819 );
820 return Ok(Some(truncate_text(
821 &format!("Tool `{tool}` result:\n{output}"),
822 16_000,
823 )));
824 }
825 let output = "spawn_agent is unavailable in this runtime (no spawn hook installed).";
826 let mut failed_part = WireMessagePart::tool_result(
827 session_id,
828 message_id,
829 tool.clone(),
830 Some(args_for_side_events.clone()),
831 json!(null),
832 );
833 failed_part.id = invoke_part_id.clone();
834 failed_part.state = Some("failed".to_string());
835 failed_part.error = Some(output.to_string());
836 self.event_bus.publish(EngineEvent::new(
837 "message.part.updated",
838 json!({"part": failed_part}),
839 ));
840 publish_tool_effect(
841 invoke_part_id.as_deref(),
842 ToolEffectLedgerPhase::Outcome,
843 ToolEffectLedgerStatus::Failed,
844 &args_for_side_events,
845 None,
846 None,
847 Some(output),
848 );
849 publish_mutation_checkpoint(
850 invoke_part_id.as_deref(),
851 MutationCheckpointOutcome::Failed,
852 );
853 return Ok(Some(output.to_string()));
854 }
855 if tool == "batch" {
862 let allowed_tools = self
863 .session_allowed_tools
864 .read()
865 .await
866 .get(session_id)
867 .cloned()
868 .unwrap_or_default();
869
870 let ctx_workspace_root = args
872 .get("__workspace_root")
873 .and_then(|v| v.as_str())
874 .map(ToString::to_string);
875 let ctx_effective_cwd = args
876 .get("__effective_cwd")
877 .and_then(|v| v.as_str())
878 .map(ToString::to_string);
879 let ctx_session_id = args
880 .get("__session_id")
881 .and_then(|v| v.as_str())
882 .map(ToString::to_string);
883 let ctx_project_id = args
884 .get("__project_id")
885 .and_then(|v| v.as_str())
886 .map(ToString::to_string);
887
888 let raw_calls = args
890 .get("tool_calls")
891 .and_then(|v| v.as_array())
892 .cloned()
893 .unwrap_or_default();
894
895 let mut governed_calls: Vec<Value> = Vec::new();
896 for mut call in raw_calls {
897 let (sub_tool, mut sub_args) = {
898 let obj = match call.as_object() {
899 Some(o) => o,
900 None => {
901 governed_calls.push(call);
902 continue;
903 }
904 };
905 let tool_raw = non_empty_string_at(obj, "tool")
906 .or_else(|| nested_non_empty_string_at(obj, "function", "name"))
907 .or_else(|| nested_non_empty_string_at(obj, "tool", "name"))
908 .or_else(|| non_empty_string_at(obj, "name"));
909 let sub_tool = match tool_raw {
910 Some(t) => normalize_tool_name(t),
911 None => {
912 governed_calls.push(call);
913 continue;
914 }
915 };
916 let sub_args = obj.get("args").cloned().unwrap_or_else(|| json!({}));
917 (sub_tool, sub_args)
918 };
919
920 if !allowed_tools.is_empty() && !any_policy_matches(&allowed_tools, &sub_tool) {
922 if let Some(obj) = call.as_object_mut() {
924 obj.insert(
925 "_blocked".to_string(),
926 Value::String(format!(
927 "batch sub-call skipped: tool `{sub_tool}` is not in the allowed list for this run"
928 )),
929 );
930 }
931 governed_calls.push(call);
932 continue;
933 }
934
935 if let Some(violation) = self
937 .workspace_sandbox_violation(session_id, &sub_tool, &sub_args)
938 .await
939 {
940 if let Some(obj) = call.as_object_mut() {
941 obj.insert(
942 "_blocked".to_string(),
943 Value::String(format!("batch sub-call skipped: {violation}")),
944 );
945 }
946 governed_calls.push(call);
947 continue;
948 }
949
950 if let Some(sub_obj) = sub_args.as_object_mut() {
952 if let Some(ref v) = ctx_workspace_root {
953 sub_obj
954 .entry("__workspace_root")
955 .or_insert_with(|| Value::String(v.clone()));
956 }
957 if let Some(ref v) = ctx_effective_cwd {
958 sub_obj
959 .entry("__effective_cwd")
960 .or_insert_with(|| Value::String(v.clone()));
961 }
962 if let Some(ref v) = ctx_session_id {
963 sub_obj
964 .entry("__session_id")
965 .or_insert_with(|| Value::String(v.clone()));
966 }
967 if let Some(ref v) = ctx_project_id {
968 sub_obj
969 .entry("__project_id")
970 .or_insert_with(|| Value::String(v.clone()));
971 }
972 }
973
974 if let Some(obj) = call.as_object_mut() {
976 obj.insert("args".to_string(), sub_args);
977 }
978 governed_calls.push(call);
979 }
980
981 if let Some(obj) = args.as_object_mut() {
983 obj.insert("tool_calls".to_string(), Value::Array(governed_calls));
984 }
985 }
986 let result = match self
987 .execute_tool_with_timeout(&tool, args, cancel.clone(), Some(progress_sink))
988 .await
989 {
990 Ok(result) => result,
991 Err(err) => {
992 let err_text = err.to_string();
993 if err_text.contains("TOOL_EXEC_TIMEOUT_MS_EXCEEDED(") {
994 let timeout_ms = tool_exec_timeout_ms();
995 let timeout_output = format!(
996 "Tool `{tool}` timed out after {timeout_ms} ms. It was stopped to keep this run responsive."
997 );
998 let mut failed_part = WireMessagePart::tool_result(
999 session_id,
1000 message_id,
1001 tool.clone(),
1002 Some(args_for_side_events.clone()),
1003 json!(null),
1004 );
1005 failed_part.id = invoke_part_id.clone();
1006 failed_part.state = Some("failed".to_string());
1007 failed_part.error = Some(timeout_output.clone());
1008 self.event_bus.publish(EngineEvent::new(
1009 "message.part.updated",
1010 json!({"part": failed_part}),
1011 ));
1012 publish_tool_effect(
1013 invoke_part_id.as_deref(),
1014 ToolEffectLedgerPhase::Outcome,
1015 ToolEffectLedgerStatus::Failed,
1016 &args_for_side_events,
1017 None,
1018 None,
1019 Some(&timeout_output),
1020 );
1021 publish_mutation_checkpoint(
1022 invoke_part_id.as_deref(),
1023 MutationCheckpointOutcome::Failed,
1024 );
1025 return Ok(Some(timeout_output));
1026 }
1027 if let Some(auth) = extract_mcp_auth_required_from_error_text(&tool, &err_text) {
1028 self.event_bus.publish(EngineEvent::new(
1029 "mcp.auth.required",
1030 json!({
1031 "sessionID": session_id,
1032 "messageID": message_id,
1033 "tool": tool.clone(),
1034 "server": auth.server,
1035 "authorizationUrl": auth.authorization_url,
1036 "message": auth.message,
1037 "challengeId": auth.challenge_id
1038 }),
1039 ));
1040 let auth_output = format!(
1041 "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
1042 tool, auth.message, auth.authorization_url
1043 );
1044 let mut result_part = WireMessagePart::tool_result(
1045 session_id,
1046 message_id,
1047 tool.clone(),
1048 Some(args_for_side_events.clone()),
1049 json!(auth_output.clone()),
1050 );
1051 result_part.id = invoke_part_id.clone();
1052 self.event_bus.publish(EngineEvent::new(
1053 "message.part.updated",
1054 json!({"part": result_part}),
1055 ));
1056 publish_tool_effect(
1057 invoke_part_id.as_deref(),
1058 ToolEffectLedgerPhase::Outcome,
1059 ToolEffectLedgerStatus::Blocked,
1060 &args_for_side_events,
1061 None,
1062 Some(&auth_output),
1063 Some(&auth.message),
1064 );
1065 publish_mutation_checkpoint(
1066 invoke_part_id.as_deref(),
1067 MutationCheckpointOutcome::Blocked,
1068 );
1069 return Ok(Some(truncate_text(
1070 &format!("Tool `{tool}` result:\n{auth_output}"),
1071 16_000,
1072 )));
1073 }
1074 let mut failed_part = WireMessagePart::tool_result(
1075 session_id,
1076 message_id,
1077 tool.clone(),
1078 Some(args_for_side_events.clone()),
1079 json!(null),
1080 );
1081 failed_part.id = invoke_part_id.clone();
1082 failed_part.state = Some("failed".to_string());
1083 failed_part.error = Some(err_text.clone());
1084 self.event_bus.publish(EngineEvent::new(
1085 "message.part.updated",
1086 json!({"part": failed_part}),
1087 ));
1088 publish_tool_effect(
1089 invoke_part_id.as_deref(),
1090 ToolEffectLedgerPhase::Outcome,
1091 ToolEffectLedgerStatus::Failed,
1092 &args_for_side_events,
1093 None,
1094 None,
1095 Some(&err_text),
1096 );
1097 publish_mutation_checkpoint(
1098 invoke_part_id.as_deref(),
1099 MutationCheckpointOutcome::Failed,
1100 );
1101 return Err(err);
1102 }
1103 };
1104 if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
1105 let event_name = if auth.pending && auth.blocked {
1106 "mcp.auth.pending"
1107 } else {
1108 "mcp.auth.required"
1109 };
1110 self.event_bus.publish(EngineEvent::new(
1111 event_name,
1112 json!({
1113 "sessionID": session_id,
1114 "messageID": message_id,
1115 "tool": tool.clone(),
1116 "server": auth.server,
1117 "authorizationUrl": auth.authorization_url,
1118 "message": auth.message,
1119 "challengeId": auth.challenge_id,
1120 "pending": auth.pending,
1121 "blocked": auth.blocked,
1122 "retryAfterMs": auth.retry_after_ms
1123 }),
1124 ));
1125 }
1126 emit_tool_side_events(
1127 self.storage.clone(),
1128 &self.event_bus,
1129 ToolSideEventContext {
1130 session_id,
1131 message_id,
1132 tool: &tool,
1133 args: &args_for_side_events,
1134 metadata: &result.metadata,
1135 workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
1136 effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
1137 },
1138 )
1139 .await;
1140 let output = if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
1141 if auth.pending && auth.blocked {
1142 let retry_after_secs = auth.retry_after_ms.unwrap_or(0).div_ceil(1000);
1143 format!(
1144 "Authorization pending for `{}`.\n{}\n\nAuthorize here: {}\nRetry after {}s.",
1145 tool, auth.message, auth.authorization_url, retry_after_secs
1146 )
1147 } else {
1148 format!(
1149 "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
1150 tool, auth.message, auth.authorization_url
1151 )
1152 }
1153 } else {
1154 self.plugins.transform_tool_output(result.output).await
1155 };
1156 let output = truncate_text(&output, 16_000);
1157 let mut result_part = WireMessagePart::tool_result(
1158 session_id,
1159 message_id,
1160 tool.clone(),
1161 Some(args_for_side_events.clone()),
1162 json!(output.clone()),
1163 );
1164 result_part.id = invoke_part_id.clone();
1165 self.event_bus.publish(EngineEvent::new(
1166 "message.part.updated",
1167 json!({"part": result_part}),
1168 ));
1169 publish_tool_effect(
1170 invoke_part_id.as_deref(),
1171 ToolEffectLedgerPhase::Outcome,
1172 ToolEffectLedgerStatus::Succeeded,
1173 &args_for_side_events,
1174 Some(&result.metadata),
1175 Some(&output),
1176 None,
1177 );
1178 publish_mutation_checkpoint(
1179 invoke_part_id.as_deref(),
1180 MutationCheckpointOutcome::Succeeded,
1181 );
1182 Ok(Some(truncate_text(
1183 &format!("Tool `{tool}` result:\n{output}"),
1184 16_000,
1185 )))
1186 }
1187}
1188
1189#[cfg(test)]
1190mod tests;