1use chrono::Utc;
2use futures::StreamExt;
3use serde_json::{json, Value};
4use std::collections::{hash_map::DefaultHasher, HashMap, HashSet};
5use std::hash::{Hash, Hasher};
6use std::path::{Path, PathBuf};
7use std::time::Duration;
8use tandem_observability::{emit_event, ObservabilityEvent, ProcessKind};
9use tandem_providers::{ChatMessage, ProviderRegistry, StreamChunk, TokenUsage};
10use tandem_tools::{validate_tool_schemas, ToolRegistry};
11use tandem_types::{
12 ContextMode, EngineEvent, HostRuntimeContext, Message, MessagePart, MessagePartInput,
13 MessageRole, ModelSpec, SendMessageRequest, SharedToolProgressSink, ToolMode, ToolSchema,
14};
15use tandem_wire::WireMessagePart;
16use tokio_util::sync::CancellationToken;
17use tracing::Level;
18
19mod loop_guards;
20mod loop_tuning;
21mod prewrite_gate;
22mod prewrite_mode;
23mod prompt_context;
24mod prompt_execution;
25mod prompt_helpers;
26mod prompt_runtime;
27mod tool_execution;
28mod tool_output;
29mod tool_parsing;
30mod types;
31
32use loop_guards::{
33 duplicate_signature_limit_for, tool_budget_for, websearch_duplicate_signature_limit,
34};
35use loop_tuning::{
36 max_tool_iterations, permission_wait_timeout_ms, prompt_context_hook_timeout_ms,
37 provider_stream_connect_timeout_ms, provider_stream_idle_timeout_ms,
38 strict_write_retry_max_attempts, tool_exec_timeout_ms,
39};
40use prewrite_gate::{evaluate_prewrite_gate, PrewriteProgress};
41use prewrite_mode::*;
42use prompt_context::{
43 format_context_mode, mcp_catalog_in_system_prompt_enabled, semantic_tool_retrieval_enabled,
44 semantic_tool_retrieval_k, tandem_runtime_system_prompt,
45};
46use prompt_helpers::*;
47use prompt_runtime::*;
48use tool_output::*;
49use tool_parsing::*;
50use types::{EngineToolProgressSink, StreamedToolCall, WritePathRecoveryMode};
51
52pub use prewrite_mode::prewrite_repair_retry_max_attempts;
53pub use types::{
54 KnowledgebaseGroundingPolicy, PromptContextHook, PromptContextHookContext, SpawnAgentHook,
55 SpawnAgentToolContext, SpawnAgentToolResult, ToolPolicyContext, ToolPolicyDecision,
56 ToolPolicyHook,
57};
58
59use crate::tool_router::{
60 classify_intent, default_mode_name, is_short_simple_prompt, select_tool_subset,
61 should_escalate_auto_tools, tool_router_enabled, ToolIntent, ToolRoutingDecision,
62};
63use crate::{
64 any_policy_matches, derive_session_title_from_prompt, title_needs_repair,
65 tool_name_matches_policy, AgentDefinition, AgentRegistry, CancellationRegistry, EventBus,
66 PermissionAction, PermissionManager, PluginRegistry, Storage,
67};
68use crate::{
69 build_tool_effect_ledger_record, finalize_mutation_checkpoint_record,
70 mutation_checkpoint_event, prepare_mutation_checkpoint, tool_effect_ledger_event,
71 MutationCheckpointOutcome, ToolEffectLedgerPhase, ToolEffectLedgerStatus,
72};
73use tokio::sync::RwLock;
74
75#[derive(Clone)]
76pub struct EngineLoop {
77 storage: std::sync::Arc<Storage>,
78 event_bus: EventBus,
79 providers: ProviderRegistry,
80 plugins: PluginRegistry,
81 agents: AgentRegistry,
82 permissions: PermissionManager,
83 tools: ToolRegistry,
84 cancellations: CancellationRegistry,
85 host_runtime_context: HostRuntimeContext,
86 workspace_overrides: std::sync::Arc<RwLock<HashMap<String, u64>>>,
87 session_allowed_tools: std::sync::Arc<RwLock<HashMap<String, Vec<String>>>>,
88 session_kb_grounding_policies:
89 std::sync::Arc<RwLock<HashMap<String, KnowledgebaseGroundingPolicy>>>,
90 session_auto_approve_permissions: std::sync::Arc<RwLock<HashMap<String, bool>>>,
91 spawn_agent_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn SpawnAgentHook>>>>,
92 tool_policy_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn ToolPolicyHook>>>>,
93 prompt_context_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn PromptContextHook>>>>,
94}
95
96impl EngineLoop {
97 #[allow(clippy::too_many_arguments)]
98 pub fn new(
99 storage: std::sync::Arc<Storage>,
100 event_bus: EventBus,
101 providers: ProviderRegistry,
102 plugins: PluginRegistry,
103 agents: AgentRegistry,
104 permissions: PermissionManager,
105 tools: ToolRegistry,
106 cancellations: CancellationRegistry,
107 host_runtime_context: HostRuntimeContext,
108 ) -> Self {
109 Self {
110 storage,
111 event_bus,
112 providers,
113 plugins,
114 agents,
115 permissions,
116 tools,
117 cancellations,
118 host_runtime_context,
119 workspace_overrides: std::sync::Arc::new(RwLock::new(HashMap::new())),
120 session_allowed_tools: std::sync::Arc::new(RwLock::new(HashMap::new())),
121 session_kb_grounding_policies: std::sync::Arc::new(RwLock::new(HashMap::new())),
122 session_auto_approve_permissions: std::sync::Arc::new(RwLock::new(HashMap::new())),
123 spawn_agent_hook: std::sync::Arc::new(RwLock::new(None)),
124 tool_policy_hook: std::sync::Arc::new(RwLock::new(None)),
125 prompt_context_hook: std::sync::Arc::new(RwLock::new(None)),
126 }
127 }
128
129 pub async fn set_spawn_agent_hook(&self, hook: std::sync::Arc<dyn SpawnAgentHook>) {
130 *self.spawn_agent_hook.write().await = Some(hook);
131 }
132
133 pub async fn set_tool_policy_hook(&self, hook: std::sync::Arc<dyn ToolPolicyHook>) {
134 *self.tool_policy_hook.write().await = Some(hook);
135 }
136
137 pub async fn set_prompt_context_hook(&self, hook: std::sync::Arc<dyn PromptContextHook>) {
138 *self.prompt_context_hook.write().await = Some(hook);
139 }
140
141 pub async fn set_session_allowed_tools(&self, session_id: &str, allowed_tools: Vec<String>) {
142 let normalized = allowed_tools
143 .into_iter()
144 .map(|tool| normalize_tool_name(&tool))
145 .filter(|tool| !tool.trim().is_empty())
146 .collect::<Vec<_>>();
147 self.session_allowed_tools
148 .write()
149 .await
150 .insert(session_id.to_string(), normalized);
151 }
152
153 pub async fn clear_session_allowed_tools(&self, session_id: &str) {
154 self.session_allowed_tools.write().await.remove(session_id);
155 }
156
157 pub async fn get_session_allowed_tools(&self, session_id: &str) -> Vec<String> {
158 self.session_allowed_tools
159 .read()
160 .await
161 .get(session_id)
162 .cloned()
163 .unwrap_or_default()
164 }
165
166 pub async fn set_session_kb_grounding_policy(
167 &self,
168 session_id: &str,
169 policy: KnowledgebaseGroundingPolicy,
170 ) {
171 let mut seen_servers = HashSet::new();
172 let server_names = policy
173 .server_names
174 .into_iter()
175 .map(|server| server.trim().to_ascii_lowercase())
176 .filter(|server| !server.is_empty())
177 .filter(|server| seen_servers.insert(server.clone()))
178 .collect::<Vec<_>>();
179 let mut seen_patterns = HashSet::new();
180 let tool_patterns = policy
181 .tool_patterns
182 .into_iter()
183 .map(|tool| normalize_tool_name(&tool))
184 .filter(|tool| !tool.trim().is_empty())
185 .filter(|tool| seen_patterns.insert(tool.clone()))
186 .collect::<Vec<_>>();
187 if !policy.required || tool_patterns.is_empty() {
188 self.clear_session_kb_grounding_policy(session_id).await;
189 return;
190 }
191 self.session_kb_grounding_policies.write().await.insert(
192 session_id.to_string(),
193 KnowledgebaseGroundingPolicy {
194 required: true,
195 strict: policy.strict,
196 server_names,
197 tool_patterns,
198 },
199 );
200 }
201
202 pub async fn clear_session_kb_grounding_policy(&self, session_id: &str) {
203 self.session_kb_grounding_policies
204 .write()
205 .await
206 .remove(session_id);
207 }
208
209 pub async fn get_session_kb_grounding_policy(
210 &self,
211 session_id: &str,
212 ) -> Option<KnowledgebaseGroundingPolicy> {
213 self.session_kb_grounding_policies
214 .read()
215 .await
216 .get(session_id)
217 .cloned()
218 }
219
220 pub async fn set_session_auto_approve_permissions(&self, session_id: &str, enabled: bool) {
221 if enabled {
222 self.session_auto_approve_permissions
223 .write()
224 .await
225 .insert(session_id.to_string(), true);
226 } else {
227 self.session_auto_approve_permissions
228 .write()
229 .await
230 .remove(session_id);
231 }
232 }
233
234 pub async fn clear_session_auto_approve_permissions(&self, session_id: &str) {
235 self.session_auto_approve_permissions
236 .write()
237 .await
238 .remove(session_id);
239 }
240
241 pub async fn grant_workspace_override_for_session(
242 &self,
243 session_id: &str,
244 ttl_seconds: u64,
245 ) -> u64 {
246 const MAX_WORKSPACE_OVERRIDE_TTL_SECONDS: u64 = 600; let capped_ttl = ttl_seconds.min(MAX_WORKSPACE_OVERRIDE_TTL_SECONDS);
249 if capped_ttl < ttl_seconds {
250 tracing::warn!(
251 session_id = %session_id,
252 requested_ttl_s = %ttl_seconds,
253 capped_ttl_s = %capped_ttl,
254 "workspace override TTL capped to maximum allowed value"
255 );
256 }
257 let expires_at = chrono::Utc::now()
258 .timestamp_millis()
259 .max(0)
260 .saturating_add((capped_ttl as i64).saturating_mul(1000))
261 as u64;
262 self.workspace_overrides
263 .write()
264 .await
265 .insert(session_id.to_string(), expires_at);
266 self.event_bus.publish(EngineEvent::new(
267 "workspace.override.activated",
268 json!({
269 "sessionID": session_id,
270 "requestedTtlSeconds": ttl_seconds,
271 "cappedTtlSeconds": capped_ttl,
272 "expiresAt": expires_at,
273 }),
274 ));
275 expires_at
276 }
277
278 pub async fn run_prompt_async(
279 &self,
280 session_id: String,
281 req: SendMessageRequest,
282 ) -> anyhow::Result<()> {
283 self.run_prompt_async_with_context(session_id, req, None)
284 .await
285 }
286
287 pub async fn run_oneshot(&self, prompt: String) -> anyhow::Result<String> {
288 self.providers.default_complete(&prompt).await
289 }
290
291 pub async fn run_oneshot_for_provider(
292 &self,
293 prompt: String,
294 provider_id: Option<&str>,
295 ) -> anyhow::Result<String> {
296 self.providers
297 .complete_for_provider(provider_id, &prompt, None)
298 .await
299 }
300
301 #[allow(clippy::too_many_arguments)]
302 async fn execute_tool_with_permission(
303 &self,
304 session_id: &str,
305 message_id: &str,
306 tool: String,
307 args: Value,
308 initial_tool_call_id: Option<String>,
309 equipped_skills: Option<&[String]>,
310 latest_user_text: &str,
311 write_required: bool,
312 latest_assistant_context: Option<&str>,
313 cancel: CancellationToken,
314 ) -> anyhow::Result<Option<String>> {
315 let tool = normalize_tool_name(&tool);
316 let raw_args = args.clone();
317 let publish_tool_effect = |tool_call_id: Option<&str>,
318 phase: ToolEffectLedgerPhase,
319 status: ToolEffectLedgerStatus,
320 args: &Value,
321 metadata: Option<&Value>,
322 output: Option<&str>,
323 error: Option<&str>| {
324 self.event_bus
325 .publish(tool_effect_ledger_event(build_tool_effect_ledger_record(
326 session_id,
327 message_id,
328 tool_call_id,
329 &tool,
330 phase,
331 status,
332 args,
333 metadata,
334 output,
335 error,
336 )));
337 };
338 let normalized = normalize_tool_args_with_mode(
339 &tool,
340 args,
341 latest_user_text,
342 latest_assistant_context.unwrap_or_default(),
343 if write_required {
344 WritePathRecoveryMode::OutputTargetOnly
345 } else {
346 WritePathRecoveryMode::Heuristic
347 },
348 );
349 let raw_args_preview = truncate_text(&raw_args.to_string(), 2_000);
350 let normalized_args_preview = truncate_text(&normalized.args.to_string(), 2_000);
351 self.event_bus.publish(EngineEvent::new(
352 "tool.args.normalized",
353 json!({
354 "sessionID": session_id,
355 "messageID": message_id,
356 "tool": tool,
357 "argsSource": normalized.args_source,
358 "argsIntegrity": normalized.args_integrity,
359 "rawArgsState": normalized.raw_args_state.as_str(),
360 "rawArgsPreview": raw_args_preview,
361 "normalizedArgsPreview": normalized_args_preview,
362 "query": normalized.query,
363 "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
364 "requestID": Value::Null
365 }),
366 ));
367 if normalized.args_integrity == "recovered" {
368 self.event_bus.publish(EngineEvent::new(
369 "tool.args.recovered",
370 json!({
371 "sessionID": session_id,
372 "messageID": message_id,
373 "tool": tool,
374 "argsSource": normalized.args_source,
375 "rawArgsPreview": raw_args_preview,
376 "normalizedArgsPreview": normalized_args_preview,
377 "query": normalized.query,
378 "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
379 "requestID": Value::Null
380 }),
381 ));
382 }
383 if normalized.missing_terminal {
384 let missing_reason = normalized
385 .missing_terminal_reason
386 .clone()
387 .unwrap_or_else(|| "TOOL_ARGUMENTS_MISSING".to_string());
388 let latest_user_preview = truncate_text(latest_user_text, 500);
389 let latest_assistant_preview =
390 truncate_text(latest_assistant_context.unwrap_or_default(), 500);
391 self.event_bus.publish(EngineEvent::new(
392 "tool.args.missing_terminal",
393 json!({
394 "sessionID": session_id,
395 "messageID": message_id,
396 "tool": tool,
397 "argsSource": normalized.args_source,
398 "argsIntegrity": normalized.args_integrity,
399 "rawArgsState": normalized.raw_args_state.as_str(),
400 "requestID": Value::Null,
401 "error": missing_reason,
402 "rawArgsPreview": raw_args_preview,
403 "normalizedArgsPreview": normalized_args_preview,
404 "latestUserPreview": latest_user_preview,
405 "latestAssistantPreview": latest_assistant_preview,
406 }),
407 ));
408 if tool == "write" {
409 tracing::warn!(
410 session_id = %session_id,
411 message_id = %message_id,
412 tool = %tool,
413 reason = %missing_reason,
414 args_source = %normalized.args_source,
415 args_integrity = %normalized.args_integrity,
416 raw_args_state = %normalized.raw_args_state.as_str(),
417 raw_args = %raw_args_preview,
418 normalized_args = %normalized_args_preview,
419 latest_user = %latest_user_preview,
420 latest_assistant = %latest_assistant_preview,
421 "write tool arguments missing terminal field"
422 );
423 }
424 let best_effort_args = persisted_failed_tool_args(&raw_args, &normalized.args);
425 let mut failed_part = WireMessagePart::tool_result(
426 session_id,
427 message_id,
428 tool.clone(),
429 Some(best_effort_args),
430 json!(null),
431 );
432 failed_part.state = Some("failed".to_string());
433 let surfaced_reason =
434 provider_specific_write_reason(&tool, &missing_reason, normalized.raw_args_state)
435 .unwrap_or_else(|| missing_reason.clone());
436 failed_part.error = Some(surfaced_reason.clone());
437 self.event_bus.publish(EngineEvent::new(
438 "message.part.updated",
439 json!({"part": failed_part}),
440 ));
441 publish_tool_effect(
442 None,
443 ToolEffectLedgerPhase::Outcome,
444 ToolEffectLedgerStatus::Blocked,
445 &normalized.args,
446 None,
447 None,
448 Some(&surfaced_reason),
449 );
450 return Ok(Some(surfaced_reason));
451 }
452
453 let args = match enforce_skill_scope(&tool, normalized.args, equipped_skills) {
454 Ok(args) => args,
455 Err(message) => {
456 publish_tool_effect(
457 None,
458 ToolEffectLedgerPhase::Outcome,
459 ToolEffectLedgerStatus::Blocked,
460 &raw_args,
461 None,
462 None,
463 Some(&message),
464 );
465 return Ok(Some(message));
466 }
467 };
468 if let Some(allowed_tools) = self
469 .session_allowed_tools
470 .read()
471 .await
472 .get(session_id)
473 .cloned()
474 {
475 if !allowed_tools.is_empty() && !any_policy_matches(&allowed_tools, &tool) {
476 let reason = format!("Tool `{tool}` is not allowed for this run.");
477 publish_tool_effect(
478 None,
479 ToolEffectLedgerPhase::Outcome,
480 ToolEffectLedgerStatus::Blocked,
481 &args,
482 None,
483 None,
484 Some(&reason),
485 );
486 return Ok(Some(reason));
487 }
488 }
489 if let Some(hook) = self.tool_policy_hook.read().await.clone() {
490 let decision = hook
491 .evaluate_tool(ToolPolicyContext {
492 session_id: session_id.to_string(),
493 message_id: message_id.to_string(),
494 tool: tool.clone(),
495 args: args.clone(),
496 })
497 .await?;
498 if !decision.allowed {
499 let reason = decision
500 .reason
501 .unwrap_or_else(|| "Tool denied by runtime policy".to_string());
502 let mut blocked_part = WireMessagePart::tool_result(
503 session_id,
504 message_id,
505 tool.clone(),
506 Some(args.clone()),
507 json!(null),
508 );
509 blocked_part.state = Some("failed".to_string());
510 blocked_part.error = Some(reason.clone());
511 self.event_bus.publish(EngineEvent::new(
512 "message.part.updated",
513 json!({"part": blocked_part}),
514 ));
515 publish_tool_effect(
516 None,
517 ToolEffectLedgerPhase::Outcome,
518 ToolEffectLedgerStatus::Blocked,
519 &args,
520 None,
521 None,
522 Some(&reason),
523 );
524 return Ok(Some(reason));
525 }
526 }
527 let mut tool_call_id: Option<String> = initial_tool_call_id;
528 if let Some(violation) = self
529 .workspace_sandbox_violation(session_id, &tool, &args)
530 .await
531 {
532 let mut blocked_part = WireMessagePart::tool_result(
533 session_id,
534 message_id,
535 tool.clone(),
536 Some(args.clone()),
537 json!(null),
538 );
539 blocked_part.state = Some("failed".to_string());
540 blocked_part.error = Some(violation.clone());
541 self.event_bus.publish(EngineEvent::new(
542 "message.part.updated",
543 json!({"part": blocked_part}),
544 ));
545 publish_tool_effect(
546 tool_call_id.as_deref(),
547 ToolEffectLedgerPhase::Outcome,
548 ToolEffectLedgerStatus::Blocked,
549 &args,
550 None,
551 None,
552 Some(&violation),
553 );
554 return Ok(Some(violation));
555 }
556 let rule = self
557 .plugins
558 .permission_override(&tool)
559 .await
560 .unwrap_or(self.permissions.evaluate(&tool, &tool).await);
561 if matches!(rule, PermissionAction::Deny) {
562 let reason = format!("Permission denied for tool `{tool}` by policy.");
563 publish_tool_effect(
564 tool_call_id.as_deref(),
565 ToolEffectLedgerPhase::Outcome,
566 ToolEffectLedgerStatus::Blocked,
567 &args,
568 None,
569 None,
570 Some(&reason),
571 );
572 return Ok(Some(reason));
573 }
574
575 let mut effective_args = args.clone();
576 if matches!(rule, PermissionAction::Ask) {
577 let auto_approve_permissions = self
578 .session_auto_approve_permissions
579 .read()
580 .await
581 .get(session_id)
582 .copied()
583 .unwrap_or(false);
584 if auto_approve_permissions {
585 if normalized.args_integrity == "recovered" && is_workspace_write_tool(&tool) {
590 tracing::warn!(
591 session_id = %session_id,
592 message_id = %message_id,
593 tool = %tool,
594 args_source = %normalized.args_source,
595 "auto-approve granted for mutating tool with recovered args; verify intent"
596 );
597 self.event_bus.publish(EngineEvent::new(
598 "tool.args.recovered_write_auto_approved",
599 json!({
600 "sessionID": session_id,
601 "messageID": message_id,
602 "tool": tool,
603 "argsSource": normalized.args_source,
604 "argsIntegrity": normalized.args_integrity,
605 }),
606 ));
607 }
608 self.event_bus.publish(EngineEvent::new(
609 "permission.auto_approved",
610 json!({
611 "sessionID": session_id,
612 "messageID": message_id,
613 "tool": tool,
614 }),
615 ));
616 effective_args = args;
617 } else {
618 let pending = self
619 .permissions
620 .ask_for_session_with_context(
621 Some(session_id),
622 &tool,
623 args.clone(),
624 Some(crate::PermissionArgsContext {
625 args_source: normalized.args_source.clone(),
626 args_integrity: normalized.args_integrity.clone(),
627 query: normalized.query.clone(),
628 }),
629 )
630 .await;
631 let mut pending_part = WireMessagePart::tool_invocation(
632 session_id,
633 message_id,
634 tool.clone(),
635 args.clone(),
636 );
637 pending_part.id = Some(pending.id.clone());
638 tool_call_id = Some(pending.id.clone());
639 pending_part.state = Some("pending".to_string());
640 self.event_bus.publish(EngineEvent::new(
641 "message.part.updated",
642 json!({"part": pending_part}),
643 ));
644 let reply = self
645 .permissions
646 .wait_for_reply_with_timeout(
647 &pending.id,
648 cancel.clone(),
649 Some(Duration::from_millis(permission_wait_timeout_ms() as u64)),
650 )
651 .await;
652 let (reply, timed_out) = reply;
653 if cancel.is_cancelled() {
654 return Ok(None);
655 }
656 if timed_out {
657 let timeout_ms = permission_wait_timeout_ms();
658 self.event_bus.publish(EngineEvent::new(
659 "permission.wait.timeout",
660 json!({
661 "sessionID": session_id,
662 "messageID": message_id,
663 "tool": tool,
664 "requestID": pending.id,
665 "timeoutMs": timeout_ms,
666 }),
667 ));
668 let mut timeout_part = WireMessagePart::tool_result(
669 session_id,
670 message_id,
671 tool.clone(),
672 Some(args.clone()),
673 json!(null),
674 );
675 timeout_part.id = Some(pending.id);
676 timeout_part.state = Some("failed".to_string());
677 timeout_part.error = Some(format!(
678 "Permission request timed out after {} ms",
679 timeout_ms
680 ));
681 self.event_bus.publish(EngineEvent::new(
682 "message.part.updated",
683 json!({"part": timeout_part}),
684 ));
685 let timeout_reason = format!(
686 "Permission request for tool `{tool}` timed out after {timeout_ms} ms."
687 );
688 publish_tool_effect(
689 tool_call_id.as_deref(),
690 ToolEffectLedgerPhase::Outcome,
691 ToolEffectLedgerStatus::Blocked,
692 &args,
693 None,
694 None,
695 Some(&timeout_reason),
696 );
697 return Ok(Some(format!(
698 "Permission request for tool `{tool}` timed out after {timeout_ms} ms."
699 )));
700 }
701 let approved = matches!(reply.as_deref(), Some("once" | "always" | "allow"));
702 if !approved {
703 let mut denied_part = WireMessagePart::tool_result(
704 session_id,
705 message_id,
706 tool.clone(),
707 Some(args.clone()),
708 json!(null),
709 );
710 denied_part.id = Some(pending.id);
711 denied_part.state = Some("denied".to_string());
712 denied_part.error = Some("Permission denied by user".to_string());
713 self.event_bus.publish(EngineEvent::new(
714 "message.part.updated",
715 json!({"part": denied_part}),
716 ));
717 let denied_reason = format!("Permission denied for tool `{tool}` by user.");
718 publish_tool_effect(
719 tool_call_id.as_deref(),
720 ToolEffectLedgerPhase::Outcome,
721 ToolEffectLedgerStatus::Blocked,
722 &args,
723 None,
724 None,
725 Some(&denied_reason),
726 );
727 return Ok(Some(format!(
728 "Permission denied for tool `{tool}` by user."
729 )));
730 }
731 effective_args = args;
732 }
733 }
734
735 let mut args = self.plugins.inject_tool_args(&tool, effective_args).await;
736 let session = self.storage.get_session(session_id).await;
737 if let (Some(obj), Some(session)) = (args.as_object_mut(), session.as_ref()) {
738 obj.insert(
739 "__session_id".to_string(),
740 Value::String(session_id.to_string()),
741 );
742 if let Some(project_id) = session.project_id.clone() {
743 obj.insert(
744 "__project_id".to_string(),
745 Value::String(project_id.clone()),
746 );
747 if project_id.starts_with("channel-public::") {
748 obj.insert(
749 "__memory_max_visible_scope".to_string(),
750 Value::String("project".to_string()),
751 );
752 }
753 }
754 }
755 let tool_context = self.resolve_tool_execution_context(session_id).await;
756 if let Some((workspace_root, effective_cwd, project_id)) = tool_context.as_ref() {
757 args = rewrite_workspace_alias_tool_args(&tool, args, workspace_root);
758 if let Some(obj) = args.as_object_mut() {
759 obj.insert(
760 "__workspace_root".to_string(),
761 Value::String(workspace_root.clone()),
762 );
763 obj.insert(
764 "__effective_cwd".to_string(),
765 Value::String(effective_cwd.clone()),
766 );
767 obj.insert(
768 "__session_id".to_string(),
769 Value::String(session_id.to_string()),
770 );
771 if let Some(project_id) = project_id.clone() {
772 obj.insert("__project_id".to_string(), Value::String(project_id));
773 }
774 }
775 tracing::info!(
776 "tool execution context session_id={} tool={} workspace_root={} effective_cwd={} project_id={}",
777 session_id,
778 tool,
779 workspace_root,
780 effective_cwd,
781 project_id.clone().unwrap_or_default()
782 );
783 }
784 let mut invoke_part =
785 WireMessagePart::tool_invocation(session_id, message_id, tool.clone(), args.clone());
786 if let Some(call_id) = tool_call_id.clone() {
787 invoke_part.id = Some(call_id);
788 }
789 let invoke_part_id = invoke_part.id.clone();
790 self.event_bus.publish(EngineEvent::new(
791 "message.part.updated",
792 json!({"part": invoke_part}),
793 ));
794 let args_for_side_events = args.clone();
795 let mutation_checkpoint = prepare_mutation_checkpoint(&tool, &args_for_side_events);
796 let progress_sink: SharedToolProgressSink = std::sync::Arc::new(EngineToolProgressSink {
797 event_bus: self.event_bus.clone(),
798 session_id: session_id.to_string(),
799 message_id: message_id.to_string(),
800 tool_call_id: invoke_part_id.clone(),
801 source_tool: tool.clone(),
802 });
803 publish_tool_effect(
804 invoke_part_id.as_deref(),
805 ToolEffectLedgerPhase::Invocation,
806 ToolEffectLedgerStatus::Started,
807 &args_for_side_events,
808 None,
809 None,
810 None,
811 );
812 let publish_mutation_checkpoint =
813 |tool_call_id: Option<&str>, outcome: MutationCheckpointOutcome| {
814 if let Some(baseline) = mutation_checkpoint.as_ref() {
815 self.event_bus.publish(mutation_checkpoint_event(
816 finalize_mutation_checkpoint_record(
817 session_id,
818 message_id,
819 tool_call_id,
820 baseline,
821 outcome,
822 ),
823 ));
824 }
825 };
826 if tool == "spawn_agent" {
827 let hook = self.spawn_agent_hook.read().await.clone();
828 if let Some(hook) = hook {
829 let spawned = hook
830 .spawn_agent(SpawnAgentToolContext {
831 session_id: session_id.to_string(),
832 message_id: message_id.to_string(),
833 tool_call_id: invoke_part_id.clone(),
834 args: args_for_side_events.clone(),
835 })
836 .await?;
837 let output = self.plugins.transform_tool_output(spawned.output).await;
838 let output = truncate_text(&output, 16_000);
839 emit_tool_side_events(
840 self.storage.clone(),
841 &self.event_bus,
842 ToolSideEventContext {
843 session_id,
844 message_id,
845 tool: &tool,
846 args: &args_for_side_events,
847 metadata: &spawned.metadata,
848 workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
849 effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
850 },
851 )
852 .await;
853 let mut result_part = WireMessagePart::tool_result(
854 session_id,
855 message_id,
856 tool.clone(),
857 Some(args_for_side_events.clone()),
858 json!(output.clone()),
859 );
860 result_part.id = invoke_part_id.clone();
861 self.event_bus.publish(EngineEvent::new(
862 "message.part.updated",
863 json!({"part": result_part}),
864 ));
865 publish_tool_effect(
866 invoke_part_id.as_deref(),
867 ToolEffectLedgerPhase::Outcome,
868 ToolEffectLedgerStatus::Succeeded,
869 &args_for_side_events,
870 Some(&spawned.metadata),
871 Some(&output),
872 None,
873 );
874 publish_mutation_checkpoint(
875 invoke_part_id.as_deref(),
876 MutationCheckpointOutcome::Succeeded,
877 );
878 return Ok(Some(truncate_text(
879 &format!("Tool `{tool}` result:\n{output}"),
880 16_000,
881 )));
882 }
883 let output = "spawn_agent is unavailable in this runtime (no spawn hook installed).";
884 let mut failed_part = WireMessagePart::tool_result(
885 session_id,
886 message_id,
887 tool.clone(),
888 Some(args_for_side_events.clone()),
889 json!(null),
890 );
891 failed_part.id = invoke_part_id.clone();
892 failed_part.state = Some("failed".to_string());
893 failed_part.error = Some(output.to_string());
894 self.event_bus.publish(EngineEvent::new(
895 "message.part.updated",
896 json!({"part": failed_part}),
897 ));
898 publish_tool_effect(
899 invoke_part_id.as_deref(),
900 ToolEffectLedgerPhase::Outcome,
901 ToolEffectLedgerStatus::Failed,
902 &args_for_side_events,
903 None,
904 None,
905 Some(output),
906 );
907 publish_mutation_checkpoint(
908 invoke_part_id.as_deref(),
909 MutationCheckpointOutcome::Failed,
910 );
911 return Ok(Some(output.to_string()));
912 }
913 if tool == "batch" {
920 let allowed_tools = self
921 .session_allowed_tools
922 .read()
923 .await
924 .get(session_id)
925 .cloned()
926 .unwrap_or_default();
927
928 let ctx_workspace_root = args
930 .get("__workspace_root")
931 .and_then(|v| v.as_str())
932 .map(ToString::to_string);
933 let ctx_effective_cwd = args
934 .get("__effective_cwd")
935 .and_then(|v| v.as_str())
936 .map(ToString::to_string);
937 let ctx_session_id = args
938 .get("__session_id")
939 .and_then(|v| v.as_str())
940 .map(ToString::to_string);
941 let ctx_project_id = args
942 .get("__project_id")
943 .and_then(|v| v.as_str())
944 .map(ToString::to_string);
945
946 let raw_calls = args
948 .get("tool_calls")
949 .and_then(|v| v.as_array())
950 .cloned()
951 .unwrap_or_default();
952
953 let mut governed_calls: Vec<Value> = Vec::new();
954 for mut call in raw_calls {
955 let (sub_tool, mut sub_args) = {
956 let obj = match call.as_object() {
957 Some(o) => o,
958 None => {
959 governed_calls.push(call);
960 continue;
961 }
962 };
963 let tool_raw = non_empty_string_at(obj, "tool")
964 .or_else(|| nested_non_empty_string_at(obj, "function", "name"))
965 .or_else(|| nested_non_empty_string_at(obj, "tool", "name"))
966 .or_else(|| non_empty_string_at(obj, "name"));
967 let sub_tool = match tool_raw {
968 Some(t) => normalize_tool_name(t),
969 None => {
970 governed_calls.push(call);
971 continue;
972 }
973 };
974 let sub_args = obj.get("args").cloned().unwrap_or_else(|| json!({}));
975 (sub_tool, sub_args)
976 };
977
978 if !allowed_tools.is_empty() && !any_policy_matches(&allowed_tools, &sub_tool) {
980 if let Some(obj) = call.as_object_mut() {
982 obj.insert(
983 "_blocked".to_string(),
984 Value::String(format!(
985 "batch sub-call skipped: tool `{sub_tool}` is not in the allowed list for this run"
986 )),
987 );
988 }
989 governed_calls.push(call);
990 continue;
991 }
992
993 if let Some(violation) = self
995 .workspace_sandbox_violation(session_id, &sub_tool, &sub_args)
996 .await
997 {
998 if let Some(obj) = call.as_object_mut() {
999 obj.insert(
1000 "_blocked".to_string(),
1001 Value::String(format!("batch sub-call skipped: {violation}")),
1002 );
1003 }
1004 governed_calls.push(call);
1005 continue;
1006 }
1007
1008 if let Some(sub_obj) = sub_args.as_object_mut() {
1010 if let Some(ref v) = ctx_workspace_root {
1011 sub_obj
1012 .entry("__workspace_root")
1013 .or_insert_with(|| Value::String(v.clone()));
1014 }
1015 if let Some(ref v) = ctx_effective_cwd {
1016 sub_obj
1017 .entry("__effective_cwd")
1018 .or_insert_with(|| Value::String(v.clone()));
1019 }
1020 if let Some(ref v) = ctx_session_id {
1021 sub_obj
1022 .entry("__session_id")
1023 .or_insert_with(|| Value::String(v.clone()));
1024 }
1025 if let Some(ref v) = ctx_project_id {
1026 sub_obj
1027 .entry("__project_id")
1028 .or_insert_with(|| Value::String(v.clone()));
1029 }
1030 }
1031
1032 if let Some(obj) = call.as_object_mut() {
1034 obj.insert("args".to_string(), sub_args);
1035 }
1036 governed_calls.push(call);
1037 }
1038
1039 if let Some(obj) = args.as_object_mut() {
1041 obj.insert("tool_calls".to_string(), Value::Array(governed_calls));
1042 }
1043 }
1044 let result = match self
1045 .execute_tool_with_timeout(&tool, args, cancel.clone(), Some(progress_sink))
1046 .await
1047 {
1048 Ok(result) => result,
1049 Err(err) => {
1050 let err_text = err.to_string();
1051 if err_text.contains("TOOL_EXEC_TIMEOUT_MS_EXCEEDED(") {
1052 let timeout_ms = tool_exec_timeout_ms();
1053 let timeout_output = format!(
1054 "Tool `{tool}` timed out after {timeout_ms} ms. It was stopped to keep this run responsive."
1055 );
1056 let mut failed_part = WireMessagePart::tool_result(
1057 session_id,
1058 message_id,
1059 tool.clone(),
1060 Some(args_for_side_events.clone()),
1061 json!(null),
1062 );
1063 failed_part.id = invoke_part_id.clone();
1064 failed_part.state = Some("failed".to_string());
1065 failed_part.error = Some(timeout_output.clone());
1066 self.event_bus.publish(EngineEvent::new(
1067 "message.part.updated",
1068 json!({"part": failed_part}),
1069 ));
1070 publish_tool_effect(
1071 invoke_part_id.as_deref(),
1072 ToolEffectLedgerPhase::Outcome,
1073 ToolEffectLedgerStatus::Failed,
1074 &args_for_side_events,
1075 None,
1076 None,
1077 Some(&timeout_output),
1078 );
1079 publish_mutation_checkpoint(
1080 invoke_part_id.as_deref(),
1081 MutationCheckpointOutcome::Failed,
1082 );
1083 return Ok(Some(timeout_output));
1084 }
1085 if let Some(auth) = extract_mcp_auth_required_from_error_text(&tool, &err_text) {
1086 self.event_bus.publish(EngineEvent::new(
1087 "mcp.auth.required",
1088 json!({
1089 "sessionID": session_id,
1090 "messageID": message_id,
1091 "tool": tool.clone(),
1092 "server": auth.server,
1093 "authorizationUrl": auth.authorization_url,
1094 "message": auth.message,
1095 "challengeId": auth.challenge_id
1096 }),
1097 ));
1098 let auth_output = format!(
1099 "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
1100 tool, auth.message, auth.authorization_url
1101 );
1102 let mut result_part = WireMessagePart::tool_result(
1103 session_id,
1104 message_id,
1105 tool.clone(),
1106 Some(args_for_side_events.clone()),
1107 json!(auth_output.clone()),
1108 );
1109 result_part.id = invoke_part_id.clone();
1110 self.event_bus.publish(EngineEvent::new(
1111 "message.part.updated",
1112 json!({"part": result_part}),
1113 ));
1114 publish_tool_effect(
1115 invoke_part_id.as_deref(),
1116 ToolEffectLedgerPhase::Outcome,
1117 ToolEffectLedgerStatus::Blocked,
1118 &args_for_side_events,
1119 None,
1120 Some(&auth_output),
1121 Some(&auth.message),
1122 );
1123 publish_mutation_checkpoint(
1124 invoke_part_id.as_deref(),
1125 MutationCheckpointOutcome::Blocked,
1126 );
1127 return Ok(Some(truncate_text(
1128 &format!("Tool `{tool}` result:\n{auth_output}"),
1129 16_000,
1130 )));
1131 }
1132 let mut failed_part = WireMessagePart::tool_result(
1133 session_id,
1134 message_id,
1135 tool.clone(),
1136 Some(args_for_side_events.clone()),
1137 json!(null),
1138 );
1139 failed_part.id = invoke_part_id.clone();
1140 failed_part.state = Some("failed".to_string());
1141 failed_part.error = Some(err_text.clone());
1142 self.event_bus.publish(EngineEvent::new(
1143 "message.part.updated",
1144 json!({"part": failed_part}),
1145 ));
1146 publish_tool_effect(
1147 invoke_part_id.as_deref(),
1148 ToolEffectLedgerPhase::Outcome,
1149 ToolEffectLedgerStatus::Failed,
1150 &args_for_side_events,
1151 None,
1152 None,
1153 Some(&err_text),
1154 );
1155 publish_mutation_checkpoint(
1156 invoke_part_id.as_deref(),
1157 MutationCheckpointOutcome::Failed,
1158 );
1159 return Err(err);
1160 }
1161 };
1162 if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
1163 let event_name = if auth.pending && auth.blocked {
1164 "mcp.auth.pending"
1165 } else {
1166 "mcp.auth.required"
1167 };
1168 self.event_bus.publish(EngineEvent::new(
1169 event_name,
1170 json!({
1171 "sessionID": session_id,
1172 "messageID": message_id,
1173 "tool": tool.clone(),
1174 "server": auth.server,
1175 "authorizationUrl": auth.authorization_url,
1176 "message": auth.message,
1177 "challengeId": auth.challenge_id,
1178 "pending": auth.pending,
1179 "blocked": auth.blocked,
1180 "retryAfterMs": auth.retry_after_ms
1181 }),
1182 ));
1183 }
1184 emit_tool_side_events(
1185 self.storage.clone(),
1186 &self.event_bus,
1187 ToolSideEventContext {
1188 session_id,
1189 message_id,
1190 tool: &tool,
1191 args: &args_for_side_events,
1192 metadata: &result.metadata,
1193 workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
1194 effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
1195 },
1196 )
1197 .await;
1198 let output = if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
1199 if auth.pending && auth.blocked {
1200 let retry_after_secs = auth.retry_after_ms.unwrap_or(0).div_ceil(1000);
1201 format!(
1202 "Authorization pending for `{}`.\n{}\n\nAuthorize here: {}\nRetry after {}s.",
1203 tool, auth.message, auth.authorization_url, retry_after_secs
1204 )
1205 } else {
1206 format!(
1207 "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
1208 tool, auth.message, auth.authorization_url
1209 )
1210 }
1211 } else {
1212 self.plugins.transform_tool_output(result.output).await
1213 };
1214 let output = truncate_text(&output, 16_000);
1215 let mut result_part = WireMessagePart::tool_result(
1216 session_id,
1217 message_id,
1218 tool.clone(),
1219 Some(args_for_side_events.clone()),
1220 json!(output.clone()),
1221 );
1222 result_part.id = invoke_part_id.clone();
1223 self.event_bus.publish(EngineEvent::new(
1224 "message.part.updated",
1225 json!({"part": result_part}),
1226 ));
1227 publish_tool_effect(
1228 invoke_part_id.as_deref(),
1229 ToolEffectLedgerPhase::Outcome,
1230 ToolEffectLedgerStatus::Succeeded,
1231 &args_for_side_events,
1232 Some(&result.metadata),
1233 Some(&output),
1234 None,
1235 );
1236 publish_mutation_checkpoint(
1237 invoke_part_id.as_deref(),
1238 MutationCheckpointOutcome::Succeeded,
1239 );
1240 Ok(Some(truncate_text(
1241 &format!("Tool `{tool}` result:\n{output}"),
1242 16_000,
1243 )))
1244 }
1245}
1246
1247#[cfg(test)]
1248mod tests;