1use chrono::Utc;
2use futures::StreamExt;
3use serde_json::{json, Value};
4use std::collections::{hash_map::DefaultHasher, HashMap, HashSet};
5use std::hash::{Hash, Hasher};
6use std::path::{Path, PathBuf};
7use std::time::Duration;
8use tandem_observability::{emit_event, ObservabilityEvent, ProcessKind};
9use tandem_providers::{ChatMessage, ProviderRegistry, StreamChunk, TokenUsage};
10use tandem_tools::{validate_tool_schemas, ToolRegistry};
11use tandem_types::{
12 ContextMode, EngineEvent, HostRuntimeContext, Message, MessagePart, MessagePartInput,
13 MessageRole, ModelSpec, SendMessageRequest, SharedToolProgressSink, ToolMode, ToolSchema,
14};
15use tandem_wire::WireMessagePart;
16use tokio_util::sync::CancellationToken;
17use tracing::Level;
18
19mod loop_guards;
20mod loop_tuning;
21mod prewrite_gate;
22mod prewrite_mode;
23mod prompt_context;
24mod prompt_execution;
25mod prompt_helpers;
26mod prompt_runtime;
27mod tool_execution;
28mod tool_output;
29mod tool_parsing;
30mod types;
31mod write_targets;
32
33use loop_guards::{
34 duplicate_signature_limit_for, tool_budget_for, websearch_duplicate_signature_limit,
35};
36use loop_tuning::{
37 max_tool_iterations, permission_wait_timeout_ms, prompt_context_hook_timeout_ms,
38 provider_stream_connect_timeout_ms, provider_stream_decode_retry_attempts,
39 provider_stream_idle_timeout_ms, strict_write_retry_max_attempts, tool_exec_timeout_ms,
40};
41use prewrite_gate::{evaluate_prewrite_gate, PrewriteProgress};
42use prewrite_mode::*;
43use prompt_context::{
44 format_context_mode, mcp_catalog_in_system_prompt_enabled, semantic_tool_retrieval_enabled,
45 semantic_tool_retrieval_k, tandem_runtime_system_prompt,
46};
47use prompt_helpers::*;
48use prompt_runtime::*;
49use tool_output::*;
50use tool_parsing::*;
51use types::{EngineToolProgressSink, StreamedToolCall, WritePathRecoveryMode};
52
53pub use prewrite_mode::prewrite_repair_retry_max_attempts;
54pub use types::{
55 KnowledgebaseGroundingPolicy, PromptContextHook, PromptContextHookContext, SpawnAgentHook,
56 SpawnAgentToolContext, SpawnAgentToolResult, ToolPolicyContext, ToolPolicyDecision,
57 ToolPolicyHook,
58};
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum SessionWritePolicyMode {
62 ArtifactOnly,
63 ExplicitTargets,
64 RepoEdit,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct SessionWritePolicy {
69 pub mode: SessionWritePolicyMode,
70 pub allowed_paths: Vec<String>,
71 pub reason: String,
72}
73
74use crate::tool_router::{
75 classify_intent, default_mode_name, is_short_simple_prompt, select_tool_subset,
76 should_escalate_auto_tools, tool_router_enabled, ToolIntent, ToolRoutingDecision,
77};
78use crate::{
79 any_policy_matches, derive_session_title_from_prompt, title_needs_repair,
80 tool_name_matches_policy, AgentDefinition, AgentRegistry, CancellationRegistry, EventBus,
81 PermissionAction, PermissionManager, PluginRegistry, Storage,
82};
83use crate::{
84 build_tool_effect_ledger_record, finalize_mutation_checkpoint_record,
85 mutation_checkpoint_event, prepare_mutation_checkpoint, tool_effect_ledger_event,
86 MutationCheckpointOutcome, ToolEffectLedgerPhase, ToolEffectLedgerStatus,
87};
88use tokio::sync::RwLock;
89
90#[derive(Clone)]
91pub struct EngineLoop {
92 storage: std::sync::Arc<Storage>,
93 event_bus: EventBus,
94 providers: ProviderRegistry,
95 plugins: PluginRegistry,
96 agents: AgentRegistry,
97 permissions: PermissionManager,
98 tools: ToolRegistry,
99 cancellations: CancellationRegistry,
100 host_runtime_context: HostRuntimeContext,
101 workspace_overrides: std::sync::Arc<RwLock<HashMap<String, u64>>>,
102 session_allowed_tools: std::sync::Arc<RwLock<HashMap<String, Vec<String>>>>,
103 session_write_policies: std::sync::Arc<RwLock<HashMap<String, SessionWritePolicy>>>,
104 session_kb_grounding_policies:
105 std::sync::Arc<RwLock<HashMap<String, KnowledgebaseGroundingPolicy>>>,
106 session_auto_approve_permissions: std::sync::Arc<RwLock<HashMap<String, bool>>>,
107 spawn_agent_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn SpawnAgentHook>>>>,
108 tool_policy_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn ToolPolicyHook>>>>,
109 prompt_context_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn PromptContextHook>>>>,
110}
111
112impl EngineLoop {
113 #[allow(clippy::too_many_arguments)]
114 pub fn new(
115 storage: std::sync::Arc<Storage>,
116 event_bus: EventBus,
117 providers: ProviderRegistry,
118 plugins: PluginRegistry,
119 agents: AgentRegistry,
120 permissions: PermissionManager,
121 tools: ToolRegistry,
122 cancellations: CancellationRegistry,
123 host_runtime_context: HostRuntimeContext,
124 ) -> Self {
125 Self {
126 storage,
127 event_bus,
128 providers,
129 plugins,
130 agents,
131 permissions,
132 tools,
133 cancellations,
134 host_runtime_context,
135 workspace_overrides: std::sync::Arc::new(RwLock::new(HashMap::new())),
136 session_allowed_tools: std::sync::Arc::new(RwLock::new(HashMap::new())),
137 session_write_policies: std::sync::Arc::new(RwLock::new(HashMap::new())),
138 session_kb_grounding_policies: std::sync::Arc::new(RwLock::new(HashMap::new())),
139 session_auto_approve_permissions: std::sync::Arc::new(RwLock::new(HashMap::new())),
140 spawn_agent_hook: std::sync::Arc::new(RwLock::new(None)),
141 tool_policy_hook: std::sync::Arc::new(RwLock::new(None)),
142 prompt_context_hook: std::sync::Arc::new(RwLock::new(None)),
143 }
144 }
145
146 pub async fn set_spawn_agent_hook(&self, hook: std::sync::Arc<dyn SpawnAgentHook>) {
147 *self.spawn_agent_hook.write().await = Some(hook);
148 }
149
150 pub async fn set_tool_policy_hook(&self, hook: std::sync::Arc<dyn ToolPolicyHook>) {
151 *self.tool_policy_hook.write().await = Some(hook);
152 }
153
154 pub async fn set_prompt_context_hook(&self, hook: std::sync::Arc<dyn PromptContextHook>) {
155 *self.prompt_context_hook.write().await = Some(hook);
156 }
157
158 pub async fn set_session_allowed_tools(&self, session_id: &str, allowed_tools: Vec<String>) {
159 let normalized = allowed_tools
160 .into_iter()
161 .map(|tool| normalize_tool_name(&tool))
162 .filter(|tool| !tool.trim().is_empty())
163 .collect::<Vec<_>>();
164 self.session_allowed_tools
165 .write()
166 .await
167 .insert(session_id.to_string(), normalized);
168 }
169
170 pub async fn clear_session_allowed_tools(&self, session_id: &str) {
171 self.session_allowed_tools.write().await.remove(session_id);
172 }
173
174 pub async fn get_session_allowed_tools(&self, session_id: &str) -> Vec<String> {
175 self.session_allowed_tools
176 .read()
177 .await
178 .get(session_id)
179 .cloned()
180 .unwrap_or_default()
181 }
182
183 pub async fn set_session_write_policy(&self, session_id: &str, policy: SessionWritePolicy) {
184 let mut seen = HashSet::new();
185 let allowed_paths = policy
186 .allowed_paths
187 .into_iter()
188 .map(|path| path.trim().to_string())
189 .filter(|path| !path.is_empty())
190 .filter(|path| seen.insert(path.clone()))
191 .collect::<Vec<_>>();
192 self.session_write_policies.write().await.insert(
193 session_id.to_string(),
194 SessionWritePolicy {
195 mode: policy.mode,
196 allowed_paths,
197 reason: policy.reason,
198 },
199 );
200 }
201
202 pub async fn clear_session_write_policy(&self, session_id: &str) {
203 self.session_write_policies.write().await.remove(session_id);
204 }
205
206 pub async fn get_session_write_policy(&self, session_id: &str) -> Option<SessionWritePolicy> {
207 self.session_write_policies
208 .read()
209 .await
210 .get(session_id)
211 .cloned()
212 }
213
214 pub async fn set_session_kb_grounding_policy(
215 &self,
216 session_id: &str,
217 policy: KnowledgebaseGroundingPolicy,
218 ) {
219 let mut seen_servers = HashSet::new();
220 let server_names = policy
221 .server_names
222 .into_iter()
223 .map(|server| server.trim().to_ascii_lowercase())
224 .filter(|server| !server.is_empty())
225 .filter(|server| seen_servers.insert(server.clone()))
226 .collect::<Vec<_>>();
227 let mut seen_patterns = HashSet::new();
228 let tool_patterns = policy
229 .tool_patterns
230 .into_iter()
231 .map(|tool| normalize_tool_name(&tool))
232 .filter(|tool| !tool.trim().is_empty())
233 .filter(|tool| seen_patterns.insert(tool.clone()))
234 .collect::<Vec<_>>();
235 if !policy.required || tool_patterns.is_empty() {
236 self.clear_session_kb_grounding_policy(session_id).await;
237 return;
238 }
239 self.session_kb_grounding_policies.write().await.insert(
240 session_id.to_string(),
241 KnowledgebaseGroundingPolicy {
242 required: true,
243 strict: policy.strict,
244 server_names,
245 tool_patterns,
246 },
247 );
248 }
249
250 pub async fn clear_session_kb_grounding_policy(&self, session_id: &str) {
251 self.session_kb_grounding_policies
252 .write()
253 .await
254 .remove(session_id);
255 }
256
257 pub async fn get_session_kb_grounding_policy(
258 &self,
259 session_id: &str,
260 ) -> Option<KnowledgebaseGroundingPolicy> {
261 self.session_kb_grounding_policies
262 .read()
263 .await
264 .get(session_id)
265 .cloned()
266 }
267
268 pub async fn set_session_auto_approve_permissions(&self, session_id: &str, enabled: bool) {
269 if enabled {
270 self.session_auto_approve_permissions
271 .write()
272 .await
273 .insert(session_id.to_string(), true);
274 } else {
275 self.session_auto_approve_permissions
276 .write()
277 .await
278 .remove(session_id);
279 }
280 }
281
282 pub async fn clear_session_auto_approve_permissions(&self, session_id: &str) {
283 self.session_auto_approve_permissions
284 .write()
285 .await
286 .remove(session_id);
287 }
288
289 pub async fn grant_workspace_override_for_session(
290 &self,
291 session_id: &str,
292 ttl_seconds: u64,
293 ) -> u64 {
294 const MAX_WORKSPACE_OVERRIDE_TTL_SECONDS: u64 = 600; let capped_ttl = ttl_seconds.min(MAX_WORKSPACE_OVERRIDE_TTL_SECONDS);
297 if capped_ttl < ttl_seconds {
298 tracing::warn!(
299 session_id = %session_id,
300 requested_ttl_s = %ttl_seconds,
301 capped_ttl_s = %capped_ttl,
302 "workspace override TTL capped to maximum allowed value"
303 );
304 }
305 let expires_at = chrono::Utc::now()
306 .timestamp_millis()
307 .max(0)
308 .saturating_add((capped_ttl as i64).saturating_mul(1000))
309 as u64;
310 self.workspace_overrides
311 .write()
312 .await
313 .insert(session_id.to_string(), expires_at);
314 self.event_bus.publish(EngineEvent::new(
315 "workspace.override.activated",
316 json!({
317 "sessionID": session_id,
318 "requestedTtlSeconds": ttl_seconds,
319 "cappedTtlSeconds": capped_ttl,
320 "expiresAt": expires_at,
321 }),
322 ));
323 expires_at
324 }
325
326 pub async fn run_prompt_async(
327 &self,
328 session_id: String,
329 req: SendMessageRequest,
330 ) -> anyhow::Result<()> {
331 self.run_prompt_async_with_context(session_id, req, None)
332 .await
333 }
334
335 pub async fn run_oneshot(&self, prompt: String) -> anyhow::Result<String> {
336 self.providers.default_complete(&prompt).await
337 }
338
339 pub async fn run_oneshot_for_provider(
340 &self,
341 prompt: String,
342 provider_id: Option<&str>,
343 ) -> anyhow::Result<String> {
344 self.providers
345 .complete_for_provider(provider_id, &prompt, None)
346 .await
347 }
348
349 #[allow(clippy::too_many_arguments)]
350 async fn execute_tool_with_permission(
351 &self,
352 session_id: &str,
353 message_id: &str,
354 tool: String,
355 args: Value,
356 initial_tool_call_id: Option<String>,
357 equipped_skills: Option<&[String]>,
358 latest_user_text: &str,
359 write_required: bool,
360 latest_assistant_context: Option<&str>,
361 cancel: CancellationToken,
362 ) -> anyhow::Result<Option<String>> {
363 let tool = normalize_tool_name(&tool);
364 let raw_args = args.clone();
365 let publish_tool_effect = |tool_call_id: Option<&str>,
366 phase: ToolEffectLedgerPhase,
367 status: ToolEffectLedgerStatus,
368 args: &Value,
369 metadata: Option<&Value>,
370 output: Option<&str>,
371 error: Option<&str>| {
372 self.event_bus
373 .publish(tool_effect_ledger_event(build_tool_effect_ledger_record(
374 session_id,
375 message_id,
376 tool_call_id,
377 &tool,
378 phase,
379 status,
380 args,
381 metadata,
382 output,
383 error,
384 )));
385 };
386 let normalized = normalize_tool_args_with_mode(
387 &tool,
388 args,
389 latest_user_text,
390 latest_assistant_context.unwrap_or_default(),
391 if write_required {
392 WritePathRecoveryMode::OutputTargetOnly
393 } else {
394 WritePathRecoveryMode::Heuristic
395 },
396 );
397 let raw_args_preview = truncate_text(&raw_args.to_string(), 2_000);
398 let normalized_args_preview = truncate_text(&normalized.args.to_string(), 2_000);
399 self.event_bus.publish(EngineEvent::new(
400 "tool.args.normalized",
401 json!({
402 "sessionID": session_id,
403 "messageID": message_id,
404 "tool": tool,
405 "argsSource": normalized.args_source,
406 "argsIntegrity": normalized.args_integrity,
407 "rawArgsState": normalized.raw_args_state.as_str(),
408 "rawArgsPreview": raw_args_preview,
409 "normalizedArgsPreview": normalized_args_preview,
410 "query": normalized.query,
411 "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
412 "requestID": Value::Null
413 }),
414 ));
415 if normalized.args_integrity == "recovered" {
416 self.event_bus.publish(EngineEvent::new(
417 "tool.args.recovered",
418 json!({
419 "sessionID": session_id,
420 "messageID": message_id,
421 "tool": tool,
422 "argsSource": normalized.args_source,
423 "rawArgsPreview": raw_args_preview,
424 "normalizedArgsPreview": normalized_args_preview,
425 "query": normalized.query,
426 "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
427 "requestID": Value::Null
428 }),
429 ));
430 }
431 if normalized.missing_terminal {
432 let missing_reason = normalized
433 .missing_terminal_reason
434 .clone()
435 .unwrap_or_else(|| "TOOL_ARGUMENTS_MISSING".to_string());
436 let latest_user_preview = truncate_text(latest_user_text, 500);
437 let latest_assistant_preview =
438 truncate_text(latest_assistant_context.unwrap_or_default(), 500);
439 self.event_bus.publish(EngineEvent::new(
440 "tool.args.missing_terminal",
441 json!({
442 "sessionID": session_id,
443 "messageID": message_id,
444 "tool": tool,
445 "argsSource": normalized.args_source,
446 "argsIntegrity": normalized.args_integrity,
447 "rawArgsState": normalized.raw_args_state.as_str(),
448 "requestID": Value::Null,
449 "error": missing_reason,
450 "rawArgsPreview": raw_args_preview,
451 "normalizedArgsPreview": normalized_args_preview,
452 "latestUserPreview": latest_user_preview,
453 "latestAssistantPreview": latest_assistant_preview,
454 }),
455 ));
456 if tool == "write" {
457 tracing::warn!(
458 session_id = %session_id,
459 message_id = %message_id,
460 tool = %tool,
461 reason = %missing_reason,
462 args_source = %normalized.args_source,
463 args_integrity = %normalized.args_integrity,
464 raw_args_state = %normalized.raw_args_state.as_str(),
465 raw_args = %raw_args_preview,
466 normalized_args = %normalized_args_preview,
467 latest_user = %latest_user_preview,
468 latest_assistant = %latest_assistant_preview,
469 "write tool arguments missing terminal field"
470 );
471 }
472 let best_effort_args = persisted_failed_tool_args(&raw_args, &normalized.args);
473 let mut failed_part = WireMessagePart::tool_result(
474 session_id,
475 message_id,
476 tool.clone(),
477 Some(best_effort_args),
478 json!(null),
479 );
480 failed_part.state = Some("failed".to_string());
481 let surfaced_reason =
482 provider_specific_write_reason(&tool, &missing_reason, normalized.raw_args_state)
483 .unwrap_or_else(|| missing_reason.clone());
484 failed_part.error = Some(surfaced_reason.clone());
485 self.event_bus.publish(EngineEvent::new(
486 "message.part.updated",
487 json!({"part": failed_part}),
488 ));
489 publish_tool_effect(
490 None,
491 ToolEffectLedgerPhase::Outcome,
492 ToolEffectLedgerStatus::Blocked,
493 &normalized.args,
494 None,
495 None,
496 Some(&surfaced_reason),
497 );
498 return Ok(Some(surfaced_reason));
499 }
500
501 let args = match enforce_skill_scope(&tool, normalized.args, equipped_skills) {
502 Ok(args) => args,
503 Err(message) => {
504 publish_tool_effect(
505 None,
506 ToolEffectLedgerPhase::Outcome,
507 ToolEffectLedgerStatus::Blocked,
508 &raw_args,
509 None,
510 None,
511 Some(&message),
512 );
513 return Ok(Some(message));
514 }
515 };
516 if let Some(allowed_tools) = self
517 .session_allowed_tools
518 .read()
519 .await
520 .get(session_id)
521 .cloned()
522 {
523 if !allowed_tools.is_empty() && !any_policy_matches(&allowed_tools, &tool) {
524 let reason = format!("Tool `{tool}` is not allowed for this run.");
525 publish_tool_effect(
526 None,
527 ToolEffectLedgerPhase::Outcome,
528 ToolEffectLedgerStatus::Blocked,
529 &args,
530 None,
531 None,
532 Some(&reason),
533 );
534 return Ok(Some(reason));
535 }
536 }
537 if let Some(hook) = self.tool_policy_hook.read().await.clone() {
538 let decision = hook
539 .evaluate_tool(ToolPolicyContext {
540 session_id: session_id.to_string(),
541 message_id: message_id.to_string(),
542 tool: tool.clone(),
543 args: args.clone(),
544 })
545 .await?;
546 if !decision.allowed {
547 let reason = decision
548 .reason
549 .unwrap_or_else(|| "Tool denied by runtime policy".to_string());
550 let mut blocked_part = WireMessagePart::tool_result(
551 session_id,
552 message_id,
553 tool.clone(),
554 Some(args.clone()),
555 json!(null),
556 );
557 blocked_part.state = Some("failed".to_string());
558 blocked_part.error = Some(reason.clone());
559 self.event_bus.publish(EngineEvent::new(
560 "message.part.updated",
561 json!({"part": blocked_part}),
562 ));
563 publish_tool_effect(
564 None,
565 ToolEffectLedgerPhase::Outcome,
566 ToolEffectLedgerStatus::Blocked,
567 &args,
568 None,
569 None,
570 Some(&reason),
571 );
572 return Ok(Some(reason));
573 }
574 }
575 let mut tool_call_id: Option<String> = initial_tool_call_id;
576 if let Some(violation) = self
577 .session_write_policy_violation(session_id, &tool, &args)
578 .await
579 {
580 let mut blocked_part = WireMessagePart::tool_result(
581 session_id,
582 message_id,
583 tool.clone(),
584 Some(args.clone()),
585 json!(null),
586 );
587 blocked_part.state = Some("failed".to_string());
588 blocked_part.error = Some(violation.clone());
589 self.event_bus.publish(EngineEvent::new(
590 "message.part.updated",
591 json!({"part": blocked_part}),
592 ));
593 self.event_bus.publish(EngineEvent::new(
594 "tool.call.rejected_write_policy",
595 json!({
596 "sessionID": session_id,
597 "messageID": message_id,
598 "tool": tool,
599 "error": violation.clone(),
600 }),
601 ));
602 publish_tool_effect(
603 tool_call_id.as_deref(),
604 ToolEffectLedgerPhase::Outcome,
605 ToolEffectLedgerStatus::Blocked,
606 &args,
607 None,
608 None,
609 Some(&violation),
610 );
611 return Ok(Some(violation));
612 }
613 if let Some(violation) = self
614 .workspace_sandbox_violation(session_id, &tool, &args)
615 .await
616 {
617 let mut blocked_part = WireMessagePart::tool_result(
618 session_id,
619 message_id,
620 tool.clone(),
621 Some(args.clone()),
622 json!(null),
623 );
624 blocked_part.state = Some("failed".to_string());
625 blocked_part.error = Some(violation.clone());
626 self.event_bus.publish(EngineEvent::new(
627 "message.part.updated",
628 json!({"part": blocked_part}),
629 ));
630 publish_tool_effect(
631 tool_call_id.as_deref(),
632 ToolEffectLedgerPhase::Outcome,
633 ToolEffectLedgerStatus::Blocked,
634 &args,
635 None,
636 None,
637 Some(&violation),
638 );
639 return Ok(Some(violation));
640 }
641 let rule = self
642 .plugins
643 .permission_override(&tool)
644 .await
645 .unwrap_or(self.permissions.evaluate(&tool, &tool).await);
646 if matches!(rule, PermissionAction::Deny) {
647 let reason = format!("Permission denied for tool `{tool}` by policy.");
648 publish_tool_effect(
649 tool_call_id.as_deref(),
650 ToolEffectLedgerPhase::Outcome,
651 ToolEffectLedgerStatus::Blocked,
652 &args,
653 None,
654 None,
655 Some(&reason),
656 );
657 return Ok(Some(reason));
658 }
659
660 let mut effective_args = args.clone();
661 if matches!(rule, PermissionAction::Ask) {
662 let auto_approve_permissions = self
663 .session_auto_approve_permissions
664 .read()
665 .await
666 .get(session_id)
667 .copied()
668 .unwrap_or(false);
669 if auto_approve_permissions {
670 if normalized.args_integrity == "recovered" && is_workspace_write_tool(&tool) {
675 tracing::warn!(
676 session_id = %session_id,
677 message_id = %message_id,
678 tool = %tool,
679 args_source = %normalized.args_source,
680 "auto-approve granted for mutating tool with recovered args; verify intent"
681 );
682 self.event_bus.publish(EngineEvent::new(
683 "tool.args.recovered_write_auto_approved",
684 json!({
685 "sessionID": session_id,
686 "messageID": message_id,
687 "tool": tool,
688 "argsSource": normalized.args_source,
689 "argsIntegrity": normalized.args_integrity,
690 }),
691 ));
692 }
693 self.event_bus.publish(EngineEvent::new(
694 "permission.auto_approved",
695 json!({
696 "sessionID": session_id,
697 "messageID": message_id,
698 "tool": tool,
699 }),
700 ));
701 effective_args = args;
702 } else {
703 let pending = self
704 .permissions
705 .ask_for_session_with_context(
706 Some(session_id),
707 &tool,
708 args.clone(),
709 Some(crate::PermissionArgsContext {
710 args_source: normalized.args_source.clone(),
711 args_integrity: normalized.args_integrity.clone(),
712 query: normalized.query.clone(),
713 }),
714 )
715 .await;
716 let mut pending_part = WireMessagePart::tool_invocation(
717 session_id,
718 message_id,
719 tool.clone(),
720 args.clone(),
721 );
722 pending_part.id = Some(pending.id.clone());
723 tool_call_id = Some(pending.id.clone());
724 pending_part.state = Some("pending".to_string());
725 self.event_bus.publish(EngineEvent::new(
726 "message.part.updated",
727 json!({"part": pending_part}),
728 ));
729 let reply = self
730 .permissions
731 .wait_for_reply_with_timeout(
732 &pending.id,
733 cancel.clone(),
734 Some(Duration::from_millis(permission_wait_timeout_ms() as u64)),
735 )
736 .await;
737 let (reply, timed_out) = reply;
738 if cancel.is_cancelled() {
739 return Ok(None);
740 }
741 if timed_out {
742 let timeout_ms = permission_wait_timeout_ms();
743 self.event_bus.publish(EngineEvent::new(
744 "permission.wait.timeout",
745 json!({
746 "sessionID": session_id,
747 "messageID": message_id,
748 "tool": tool,
749 "requestID": pending.id,
750 "timeoutMs": timeout_ms,
751 }),
752 ));
753 let mut timeout_part = WireMessagePart::tool_result(
754 session_id,
755 message_id,
756 tool.clone(),
757 Some(args.clone()),
758 json!(null),
759 );
760 timeout_part.id = Some(pending.id);
761 timeout_part.state = Some("failed".to_string());
762 timeout_part.error = Some(format!(
763 "Permission request timed out after {} ms",
764 timeout_ms
765 ));
766 self.event_bus.publish(EngineEvent::new(
767 "message.part.updated",
768 json!({"part": timeout_part}),
769 ));
770 let timeout_reason = format!(
771 "Permission request for tool `{tool}` timed out after {timeout_ms} ms."
772 );
773 publish_tool_effect(
774 tool_call_id.as_deref(),
775 ToolEffectLedgerPhase::Outcome,
776 ToolEffectLedgerStatus::Blocked,
777 &args,
778 None,
779 None,
780 Some(&timeout_reason),
781 );
782 return Ok(Some(format!(
783 "Permission request for tool `{tool}` timed out after {timeout_ms} ms."
784 )));
785 }
786 let approved = matches!(reply.as_deref(), Some("once" | "always" | "allow"));
787 if !approved {
788 let mut denied_part = WireMessagePart::tool_result(
789 session_id,
790 message_id,
791 tool.clone(),
792 Some(args.clone()),
793 json!(null),
794 );
795 denied_part.id = Some(pending.id);
796 denied_part.state = Some("denied".to_string());
797 denied_part.error = Some("Permission denied by user".to_string());
798 self.event_bus.publish(EngineEvent::new(
799 "message.part.updated",
800 json!({"part": denied_part}),
801 ));
802 let denied_reason = format!("Permission denied for tool `{tool}` by user.");
803 publish_tool_effect(
804 tool_call_id.as_deref(),
805 ToolEffectLedgerPhase::Outcome,
806 ToolEffectLedgerStatus::Blocked,
807 &args,
808 None,
809 None,
810 Some(&denied_reason),
811 );
812 return Ok(Some(format!(
813 "Permission denied for tool `{tool}` by user."
814 )));
815 }
816 effective_args = args;
817 }
818 }
819
820 let mut args = self.plugins.inject_tool_args(&tool, effective_args).await;
821 let session = self.storage.get_session(session_id).await;
822 if let (Some(obj), Some(session)) = (args.as_object_mut(), session.as_ref()) {
823 obj.insert(
824 "__session_id".to_string(),
825 Value::String(session_id.to_string()),
826 );
827 if let Some(project_id) = session.project_id.clone() {
828 obj.insert(
829 "__project_id".to_string(),
830 Value::String(project_id.clone()),
831 );
832 if project_id.starts_with("channel-public::") {
833 obj.insert(
834 "__memory_max_visible_scope".to_string(),
835 Value::String("project".to_string()),
836 );
837 }
838 }
839 }
840 let tool_context = self.resolve_tool_execution_context(session_id).await;
841 if let Some((workspace_root, effective_cwd, project_id)) = tool_context.as_ref() {
842 args = rewrite_workspace_alias_tool_args(&tool, args, workspace_root);
843 if let Some(obj) = args.as_object_mut() {
844 obj.insert(
845 "__workspace_root".to_string(),
846 Value::String(workspace_root.clone()),
847 );
848 obj.insert(
849 "__effective_cwd".to_string(),
850 Value::String(effective_cwd.clone()),
851 );
852 obj.insert(
853 "__session_id".to_string(),
854 Value::String(session_id.to_string()),
855 );
856 if let Some(project_id) = project_id.clone() {
857 obj.insert("__project_id".to_string(), Value::String(project_id));
858 }
859 }
860 tracing::info!(
861 "tool execution context session_id={} tool={} workspace_root={} effective_cwd={} project_id={}",
862 session_id,
863 tool,
864 workspace_root,
865 effective_cwd,
866 project_id.clone().unwrap_or_default()
867 );
868 }
869 let mut invoke_part =
870 WireMessagePart::tool_invocation(session_id, message_id, tool.clone(), args.clone());
871 if let Some(call_id) = tool_call_id.clone() {
872 invoke_part.id = Some(call_id);
873 }
874 let invoke_part_id = invoke_part.id.clone();
875 self.event_bus.publish(EngineEvent::new(
876 "message.part.updated",
877 json!({"part": invoke_part}),
878 ));
879 let args_for_side_events = args.clone();
880 let mutation_checkpoint = prepare_mutation_checkpoint(&tool, &args_for_side_events);
881 let progress_sink: SharedToolProgressSink = std::sync::Arc::new(EngineToolProgressSink {
882 event_bus: self.event_bus.clone(),
883 session_id: session_id.to_string(),
884 message_id: message_id.to_string(),
885 tool_call_id: invoke_part_id.clone(),
886 source_tool: tool.clone(),
887 });
888 publish_tool_effect(
889 invoke_part_id.as_deref(),
890 ToolEffectLedgerPhase::Invocation,
891 ToolEffectLedgerStatus::Started,
892 &args_for_side_events,
893 None,
894 None,
895 None,
896 );
897 let publish_mutation_checkpoint =
898 |tool_call_id: Option<&str>, outcome: MutationCheckpointOutcome| {
899 if let Some(baseline) = mutation_checkpoint.as_ref() {
900 self.event_bus.publish(mutation_checkpoint_event(
901 finalize_mutation_checkpoint_record(
902 session_id,
903 message_id,
904 tool_call_id,
905 baseline,
906 outcome,
907 ),
908 ));
909 }
910 };
911 if tool == "spawn_agent" {
912 let hook = self.spawn_agent_hook.read().await.clone();
913 if let Some(hook) = hook {
914 let spawned = hook
915 .spawn_agent(SpawnAgentToolContext {
916 session_id: session_id.to_string(),
917 message_id: message_id.to_string(),
918 tool_call_id: invoke_part_id.clone(),
919 args: args_for_side_events.clone(),
920 })
921 .await?;
922 let output = self.plugins.transform_tool_output(spawned.output).await;
923 let output = truncate_text(&output, 16_000);
924 emit_tool_side_events(
925 self.storage.clone(),
926 &self.event_bus,
927 ToolSideEventContext {
928 session_id,
929 message_id,
930 tool: &tool,
931 args: &args_for_side_events,
932 metadata: &spawned.metadata,
933 workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
934 effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
935 },
936 )
937 .await;
938 let mut result_part = WireMessagePart::tool_result(
939 session_id,
940 message_id,
941 tool.clone(),
942 Some(args_for_side_events.clone()),
943 json!(output.clone()),
944 );
945 result_part.id = invoke_part_id.clone();
946 self.event_bus.publish(EngineEvent::new(
947 "message.part.updated",
948 json!({"part": result_part}),
949 ));
950 publish_tool_effect(
951 invoke_part_id.as_deref(),
952 ToolEffectLedgerPhase::Outcome,
953 ToolEffectLedgerStatus::Succeeded,
954 &args_for_side_events,
955 Some(&spawned.metadata),
956 Some(&output),
957 None,
958 );
959 publish_mutation_checkpoint(
960 invoke_part_id.as_deref(),
961 MutationCheckpointOutcome::Succeeded,
962 );
963 return Ok(Some(truncate_text(
964 &format!("Tool `{tool}` result:\n{output}"),
965 16_000,
966 )));
967 }
968 let output = "spawn_agent is unavailable in this runtime (no spawn hook installed).";
969 let mut failed_part = WireMessagePart::tool_result(
970 session_id,
971 message_id,
972 tool.clone(),
973 Some(args_for_side_events.clone()),
974 json!(null),
975 );
976 failed_part.id = invoke_part_id.clone();
977 failed_part.state = Some("failed".to_string());
978 failed_part.error = Some(output.to_string());
979 self.event_bus.publish(EngineEvent::new(
980 "message.part.updated",
981 json!({"part": failed_part}),
982 ));
983 publish_tool_effect(
984 invoke_part_id.as_deref(),
985 ToolEffectLedgerPhase::Outcome,
986 ToolEffectLedgerStatus::Failed,
987 &args_for_side_events,
988 None,
989 None,
990 Some(output),
991 );
992 publish_mutation_checkpoint(
993 invoke_part_id.as_deref(),
994 MutationCheckpointOutcome::Failed,
995 );
996 return Ok(Some(output.to_string()));
997 }
998 if tool == "batch" {
1005 let allowed_tools = self
1006 .session_allowed_tools
1007 .read()
1008 .await
1009 .get(session_id)
1010 .cloned()
1011 .unwrap_or_default();
1012
1013 let ctx_workspace_root = args
1015 .get("__workspace_root")
1016 .and_then(|v| v.as_str())
1017 .map(ToString::to_string);
1018 let ctx_effective_cwd = args
1019 .get("__effective_cwd")
1020 .and_then(|v| v.as_str())
1021 .map(ToString::to_string);
1022 let ctx_session_id = args
1023 .get("__session_id")
1024 .and_then(|v| v.as_str())
1025 .map(ToString::to_string);
1026 let ctx_project_id = args
1027 .get("__project_id")
1028 .and_then(|v| v.as_str())
1029 .map(ToString::to_string);
1030
1031 let raw_calls = args
1033 .get("tool_calls")
1034 .and_then(|v| v.as_array())
1035 .cloned()
1036 .unwrap_or_default();
1037
1038 let mut governed_calls: Vec<Value> = Vec::new();
1039 for mut call in raw_calls {
1040 let (sub_tool, mut sub_args) = {
1041 let obj = match call.as_object() {
1042 Some(o) => o,
1043 None => {
1044 governed_calls.push(call);
1045 continue;
1046 }
1047 };
1048 let tool_raw = non_empty_string_at(obj, "tool")
1049 .or_else(|| nested_non_empty_string_at(obj, "function", "name"))
1050 .or_else(|| nested_non_empty_string_at(obj, "tool", "name"))
1051 .or_else(|| non_empty_string_at(obj, "name"));
1052 let sub_tool = match tool_raw {
1053 Some(t) => normalize_tool_name(t),
1054 None => {
1055 governed_calls.push(call);
1056 continue;
1057 }
1058 };
1059 let sub_args = obj.get("args").cloned().unwrap_or_else(|| json!({}));
1060 (sub_tool, sub_args)
1061 };
1062
1063 if !allowed_tools.is_empty() && !any_policy_matches(&allowed_tools, &sub_tool) {
1065 if let Some(obj) = call.as_object_mut() {
1067 obj.insert(
1068 "_blocked".to_string(),
1069 Value::String(format!(
1070 "batch sub-call skipped: tool `{sub_tool}` is not in the allowed list for this run"
1071 )),
1072 );
1073 }
1074 governed_calls.push(call);
1075 continue;
1076 }
1077
1078 if let Some(violation) = self
1080 .session_write_policy_violation(session_id, &sub_tool, &sub_args)
1081 .await
1082 {
1083 if let Some(obj) = call.as_object_mut() {
1084 obj.insert(
1085 "_blocked".to_string(),
1086 Value::String(format!("batch sub-call skipped: {violation}")),
1087 );
1088 }
1089 governed_calls.push(call);
1090 continue;
1091 }
1092
1093 if let Some(violation) = self
1095 .workspace_sandbox_violation(session_id, &sub_tool, &sub_args)
1096 .await
1097 {
1098 if let Some(obj) = call.as_object_mut() {
1099 obj.insert(
1100 "_blocked".to_string(),
1101 Value::String(format!("batch sub-call skipped: {violation}")),
1102 );
1103 }
1104 governed_calls.push(call);
1105 continue;
1106 }
1107
1108 if let Some(sub_obj) = sub_args.as_object_mut() {
1110 if let Some(ref v) = ctx_workspace_root {
1111 sub_obj
1112 .entry("__workspace_root")
1113 .or_insert_with(|| Value::String(v.clone()));
1114 }
1115 if let Some(ref v) = ctx_effective_cwd {
1116 sub_obj
1117 .entry("__effective_cwd")
1118 .or_insert_with(|| Value::String(v.clone()));
1119 }
1120 if let Some(ref v) = ctx_session_id {
1121 sub_obj
1122 .entry("__session_id")
1123 .or_insert_with(|| Value::String(v.clone()));
1124 }
1125 if let Some(ref v) = ctx_project_id {
1126 sub_obj
1127 .entry("__project_id")
1128 .or_insert_with(|| Value::String(v.clone()));
1129 }
1130 }
1131
1132 if let Some(obj) = call.as_object_mut() {
1134 obj.insert("args".to_string(), sub_args);
1135 }
1136 governed_calls.push(call);
1137 }
1138
1139 if let Some(obj) = args.as_object_mut() {
1141 obj.insert("tool_calls".to_string(), Value::Array(governed_calls));
1142 }
1143 }
1144 let result = match self
1145 .execute_tool_with_timeout(&tool, args, cancel.clone(), Some(progress_sink))
1146 .await
1147 {
1148 Ok(result) => result,
1149 Err(err) => {
1150 let err_text = err.to_string();
1151 if err_text.contains("TOOL_EXEC_TIMEOUT_MS_EXCEEDED(") {
1152 let timeout_ms = tool_exec_timeout_ms();
1153 let timeout_output = format!(
1154 "Tool `{tool}` timed out after {timeout_ms} ms. It was stopped to keep this run responsive."
1155 );
1156 let mut failed_part = WireMessagePart::tool_result(
1157 session_id,
1158 message_id,
1159 tool.clone(),
1160 Some(args_for_side_events.clone()),
1161 json!(null),
1162 );
1163 failed_part.id = invoke_part_id.clone();
1164 failed_part.state = Some("failed".to_string());
1165 failed_part.error = Some(timeout_output.clone());
1166 self.event_bus.publish(EngineEvent::new(
1167 "message.part.updated",
1168 json!({"part": failed_part}),
1169 ));
1170 publish_tool_effect(
1171 invoke_part_id.as_deref(),
1172 ToolEffectLedgerPhase::Outcome,
1173 ToolEffectLedgerStatus::Failed,
1174 &args_for_side_events,
1175 None,
1176 None,
1177 Some(&timeout_output),
1178 );
1179 publish_mutation_checkpoint(
1180 invoke_part_id.as_deref(),
1181 MutationCheckpointOutcome::Failed,
1182 );
1183 return Ok(Some(timeout_output));
1184 }
1185 if let Some(auth) = extract_mcp_auth_required_from_error_text(&tool, &err_text) {
1186 self.event_bus.publish(EngineEvent::new(
1187 "mcp.auth.required",
1188 json!({
1189 "sessionID": session_id,
1190 "messageID": message_id,
1191 "tool": tool.clone(),
1192 "server": auth.server,
1193 "authorizationUrl": auth.authorization_url,
1194 "message": auth.message,
1195 "challengeId": auth.challenge_id
1196 }),
1197 ));
1198 let auth_output = format!(
1199 "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
1200 tool, auth.message, auth.authorization_url
1201 );
1202 let mut result_part = WireMessagePart::tool_result(
1203 session_id,
1204 message_id,
1205 tool.clone(),
1206 Some(args_for_side_events.clone()),
1207 json!(auth_output.clone()),
1208 );
1209 result_part.id = invoke_part_id.clone();
1210 self.event_bus.publish(EngineEvent::new(
1211 "message.part.updated",
1212 json!({"part": result_part}),
1213 ));
1214 publish_tool_effect(
1215 invoke_part_id.as_deref(),
1216 ToolEffectLedgerPhase::Outcome,
1217 ToolEffectLedgerStatus::Blocked,
1218 &args_for_side_events,
1219 None,
1220 Some(&auth_output),
1221 Some(&auth.message),
1222 );
1223 publish_mutation_checkpoint(
1224 invoke_part_id.as_deref(),
1225 MutationCheckpointOutcome::Blocked,
1226 );
1227 return Ok(Some(truncate_text(
1228 &format!("Tool `{tool}` result:\n{auth_output}"),
1229 16_000,
1230 )));
1231 }
1232 let mut failed_part = WireMessagePart::tool_result(
1233 session_id,
1234 message_id,
1235 tool.clone(),
1236 Some(args_for_side_events.clone()),
1237 json!(null),
1238 );
1239 failed_part.id = invoke_part_id.clone();
1240 failed_part.state = Some("failed".to_string());
1241 failed_part.error = Some(err_text.clone());
1242 self.event_bus.publish(EngineEvent::new(
1243 "message.part.updated",
1244 json!({"part": failed_part}),
1245 ));
1246 publish_tool_effect(
1247 invoke_part_id.as_deref(),
1248 ToolEffectLedgerPhase::Outcome,
1249 ToolEffectLedgerStatus::Failed,
1250 &args_for_side_events,
1251 None,
1252 None,
1253 Some(&err_text),
1254 );
1255 publish_mutation_checkpoint(
1256 invoke_part_id.as_deref(),
1257 MutationCheckpointOutcome::Failed,
1258 );
1259 return Err(err);
1260 }
1261 };
1262 if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
1263 let event_name = if auth.pending && auth.blocked {
1264 "mcp.auth.pending"
1265 } else {
1266 "mcp.auth.required"
1267 };
1268 self.event_bus.publish(EngineEvent::new(
1269 event_name,
1270 json!({
1271 "sessionID": session_id,
1272 "messageID": message_id,
1273 "tool": tool.clone(),
1274 "server": auth.server,
1275 "authorizationUrl": auth.authorization_url,
1276 "message": auth.message,
1277 "challengeId": auth.challenge_id,
1278 "pending": auth.pending,
1279 "blocked": auth.blocked,
1280 "retryAfterMs": auth.retry_after_ms
1281 }),
1282 ));
1283 }
1284 emit_tool_side_events(
1285 self.storage.clone(),
1286 &self.event_bus,
1287 ToolSideEventContext {
1288 session_id,
1289 message_id,
1290 tool: &tool,
1291 args: &args_for_side_events,
1292 metadata: &result.metadata,
1293 workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
1294 effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
1295 },
1296 )
1297 .await;
1298 let output = if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
1299 if auth.pending && auth.blocked {
1300 let retry_after_secs = auth.retry_after_ms.unwrap_or(0).div_ceil(1000);
1301 format!(
1302 "Authorization pending for `{}`.\n{}\n\nAuthorize here: {}\nRetry after {}s.",
1303 tool, auth.message, auth.authorization_url, retry_after_secs
1304 )
1305 } else {
1306 format!(
1307 "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
1308 tool, auth.message, auth.authorization_url
1309 )
1310 }
1311 } else {
1312 self.plugins.transform_tool_output(result.output).await
1313 };
1314 let output = truncate_text(&output, 16_000);
1315 let mut result_part = WireMessagePart::tool_result(
1316 session_id,
1317 message_id,
1318 tool.clone(),
1319 Some(args_for_side_events.clone()),
1320 json!(output.clone()),
1321 );
1322 result_part.id = invoke_part_id.clone();
1323 self.event_bus.publish(EngineEvent::new(
1324 "message.part.updated",
1325 json!({"part": result_part}),
1326 ));
1327 publish_tool_effect(
1328 invoke_part_id.as_deref(),
1329 ToolEffectLedgerPhase::Outcome,
1330 ToolEffectLedgerStatus::Succeeded,
1331 &args_for_side_events,
1332 Some(&result.metadata),
1333 Some(&output),
1334 None,
1335 );
1336 publish_mutation_checkpoint(
1337 invoke_part_id.as_deref(),
1338 MutationCheckpointOutcome::Succeeded,
1339 );
1340 Ok(Some(truncate_text(
1341 &format!("Tool `{tool}` result:\n{output}"),
1342 16_000,
1343 )))
1344 }
1345}
1346
1347#[cfg(test)]
1348mod tests;