Skip to main content

tandem_core/
engine_loop.rs

1use chrono::Utc;
2use futures::StreamExt;
3use serde_json::{json, Value};
4use std::collections::{hash_map::DefaultHasher, HashMap, HashSet};
5use std::hash::{Hash, Hasher};
6use std::path::{Path, PathBuf};
7use std::time::Duration;
8use tandem_observability::{emit_event, ObservabilityEvent, ProcessKind};
9use tandem_providers::{ChatMessage, ProviderRegistry, StreamChunk, TokenUsage};
10use tandem_tools::{validate_tool_schemas, ToolRegistry};
11use tandem_types::{
12    ContextMode, EngineEvent, HostRuntimeContext, Message, MessagePart, MessagePartInput,
13    MessageRole, ModelSpec, SendMessageRequest, SharedToolProgressSink, ToolMode, ToolSchema,
14};
15use tandem_wire::WireMessagePart;
16use tokio_util::sync::CancellationToken;
17use tracing::Level;
18
19mod loop_guards;
20mod loop_tuning;
21mod prewrite_gate;
22mod prewrite_mode;
23mod prompt_context;
24mod prompt_execution;
25mod prompt_helpers;
26mod prompt_runtime;
27mod tool_execution;
28mod tool_output;
29mod tool_parsing;
30mod types;
31
32use loop_guards::{
33    duplicate_signature_limit_for, tool_budget_for, websearch_duplicate_signature_limit,
34};
35use loop_tuning::{
36    max_tool_iterations, permission_wait_timeout_ms, prompt_context_hook_timeout_ms,
37    provider_stream_connect_timeout_ms, provider_stream_idle_timeout_ms,
38    strict_write_retry_max_attempts, tool_exec_timeout_ms,
39};
40use prewrite_gate::{evaluate_prewrite_gate, PrewriteProgress};
41use prewrite_mode::*;
42use prompt_context::{
43    format_context_mode, mcp_catalog_in_system_prompt_enabled, semantic_tool_retrieval_enabled,
44    semantic_tool_retrieval_k, tandem_runtime_system_prompt,
45};
46use prompt_helpers::*;
47use prompt_runtime::*;
48use tool_output::*;
49use tool_parsing::*;
50use types::{EngineToolProgressSink, StreamedToolCall, WritePathRecoveryMode};
51
52pub use prewrite_mode::prewrite_repair_retry_max_attempts;
53pub use types::{
54    KnowledgebaseGroundingPolicy, PromptContextHook, PromptContextHookContext, SpawnAgentHook,
55    SpawnAgentToolContext, SpawnAgentToolResult, ToolPolicyContext, ToolPolicyDecision,
56    ToolPolicyHook,
57};
58
59use crate::tool_router::{
60    classify_intent, default_mode_name, is_short_simple_prompt, select_tool_subset,
61    should_escalate_auto_tools, tool_router_enabled, ToolIntent, ToolRoutingDecision,
62};
63use crate::{
64    any_policy_matches, derive_session_title_from_prompt, title_needs_repair,
65    tool_name_matches_policy, AgentDefinition, AgentRegistry, CancellationRegistry, EventBus,
66    PermissionAction, PermissionManager, PluginRegistry, Storage,
67};
68use crate::{
69    build_tool_effect_ledger_record, finalize_mutation_checkpoint_record,
70    mutation_checkpoint_event, prepare_mutation_checkpoint, tool_effect_ledger_event,
71    MutationCheckpointOutcome, ToolEffectLedgerPhase, ToolEffectLedgerStatus,
72};
73use tokio::sync::RwLock;
74
75#[derive(Clone)]
76pub struct EngineLoop {
77    storage: std::sync::Arc<Storage>,
78    event_bus: EventBus,
79    providers: ProviderRegistry,
80    plugins: PluginRegistry,
81    agents: AgentRegistry,
82    permissions: PermissionManager,
83    tools: ToolRegistry,
84    cancellations: CancellationRegistry,
85    host_runtime_context: HostRuntimeContext,
86    workspace_overrides: std::sync::Arc<RwLock<HashMap<String, u64>>>,
87    session_allowed_tools: std::sync::Arc<RwLock<HashMap<String, Vec<String>>>>,
88    session_kb_grounding_policies:
89        std::sync::Arc<RwLock<HashMap<String, KnowledgebaseGroundingPolicy>>>,
90    session_auto_approve_permissions: std::sync::Arc<RwLock<HashMap<String, bool>>>,
91    spawn_agent_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn SpawnAgentHook>>>>,
92    tool_policy_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn ToolPolicyHook>>>>,
93    prompt_context_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn PromptContextHook>>>>,
94}
95
96impl EngineLoop {
97    #[allow(clippy::too_many_arguments)]
98    pub fn new(
99        storage: std::sync::Arc<Storage>,
100        event_bus: EventBus,
101        providers: ProviderRegistry,
102        plugins: PluginRegistry,
103        agents: AgentRegistry,
104        permissions: PermissionManager,
105        tools: ToolRegistry,
106        cancellations: CancellationRegistry,
107        host_runtime_context: HostRuntimeContext,
108    ) -> Self {
109        Self {
110            storage,
111            event_bus,
112            providers,
113            plugins,
114            agents,
115            permissions,
116            tools,
117            cancellations,
118            host_runtime_context,
119            workspace_overrides: std::sync::Arc::new(RwLock::new(HashMap::new())),
120            session_allowed_tools: std::sync::Arc::new(RwLock::new(HashMap::new())),
121            session_kb_grounding_policies: std::sync::Arc::new(RwLock::new(HashMap::new())),
122            session_auto_approve_permissions: std::sync::Arc::new(RwLock::new(HashMap::new())),
123            spawn_agent_hook: std::sync::Arc::new(RwLock::new(None)),
124            tool_policy_hook: std::sync::Arc::new(RwLock::new(None)),
125            prompt_context_hook: std::sync::Arc::new(RwLock::new(None)),
126        }
127    }
128
129    pub async fn set_spawn_agent_hook(&self, hook: std::sync::Arc<dyn SpawnAgentHook>) {
130        *self.spawn_agent_hook.write().await = Some(hook);
131    }
132
133    pub async fn set_tool_policy_hook(&self, hook: std::sync::Arc<dyn ToolPolicyHook>) {
134        *self.tool_policy_hook.write().await = Some(hook);
135    }
136
137    pub async fn set_prompt_context_hook(&self, hook: std::sync::Arc<dyn PromptContextHook>) {
138        *self.prompt_context_hook.write().await = Some(hook);
139    }
140
141    pub async fn set_session_allowed_tools(&self, session_id: &str, allowed_tools: Vec<String>) {
142        let normalized = allowed_tools
143            .into_iter()
144            .map(|tool| normalize_tool_name(&tool))
145            .filter(|tool| !tool.trim().is_empty())
146            .collect::<Vec<_>>();
147        self.session_allowed_tools
148            .write()
149            .await
150            .insert(session_id.to_string(), normalized);
151    }
152
153    pub async fn clear_session_allowed_tools(&self, session_id: &str) {
154        self.session_allowed_tools.write().await.remove(session_id);
155    }
156
157    pub async fn get_session_allowed_tools(&self, session_id: &str) -> Vec<String> {
158        self.session_allowed_tools
159            .read()
160            .await
161            .get(session_id)
162            .cloned()
163            .unwrap_or_default()
164    }
165
166    pub async fn set_session_kb_grounding_policy(
167        &self,
168        session_id: &str,
169        policy: KnowledgebaseGroundingPolicy,
170    ) {
171        let mut seen_servers = HashSet::new();
172        let server_names = policy
173            .server_names
174            .into_iter()
175            .map(|server| server.trim().to_ascii_lowercase())
176            .filter(|server| !server.is_empty())
177            .filter(|server| seen_servers.insert(server.clone()))
178            .collect::<Vec<_>>();
179        let mut seen_patterns = HashSet::new();
180        let tool_patterns = policy
181            .tool_patterns
182            .into_iter()
183            .map(|tool| normalize_tool_name(&tool))
184            .filter(|tool| !tool.trim().is_empty())
185            .filter(|tool| seen_patterns.insert(tool.clone()))
186            .collect::<Vec<_>>();
187        if !policy.required || tool_patterns.is_empty() {
188            self.clear_session_kb_grounding_policy(session_id).await;
189            return;
190        }
191        self.session_kb_grounding_policies.write().await.insert(
192            session_id.to_string(),
193            KnowledgebaseGroundingPolicy {
194                required: true,
195                strict: policy.strict,
196                server_names,
197                tool_patterns,
198            },
199        );
200    }
201
202    pub async fn clear_session_kb_grounding_policy(&self, session_id: &str) {
203        self.session_kb_grounding_policies
204            .write()
205            .await
206            .remove(session_id);
207    }
208
209    pub async fn get_session_kb_grounding_policy(
210        &self,
211        session_id: &str,
212    ) -> Option<KnowledgebaseGroundingPolicy> {
213        self.session_kb_grounding_policies
214            .read()
215            .await
216            .get(session_id)
217            .cloned()
218    }
219
220    pub async fn set_session_auto_approve_permissions(&self, session_id: &str, enabled: bool) {
221        if enabled {
222            self.session_auto_approve_permissions
223                .write()
224                .await
225                .insert(session_id.to_string(), true);
226        } else {
227            self.session_auto_approve_permissions
228                .write()
229                .await
230                .remove(session_id);
231        }
232    }
233
234    pub async fn clear_session_auto_approve_permissions(&self, session_id: &str) {
235        self.session_auto_approve_permissions
236            .write()
237            .await
238            .remove(session_id);
239    }
240
241    pub async fn grant_workspace_override_for_session(
242        &self,
243        session_id: &str,
244        ttl_seconds: u64,
245    ) -> u64 {
246        // Cap the override TTL to prevent indefinite sandbox bypass.
247        const MAX_WORKSPACE_OVERRIDE_TTL_SECONDS: u64 = 600; // 10 minutes
248        let capped_ttl = ttl_seconds.min(MAX_WORKSPACE_OVERRIDE_TTL_SECONDS);
249        if capped_ttl < ttl_seconds {
250            tracing::warn!(
251                session_id = %session_id,
252                requested_ttl_s = %ttl_seconds,
253                capped_ttl_s = %capped_ttl,
254                "workspace override TTL capped to maximum allowed value"
255            );
256        }
257        let expires_at = chrono::Utc::now()
258            .timestamp_millis()
259            .max(0)
260            .saturating_add((capped_ttl as i64).saturating_mul(1000))
261            as u64;
262        self.workspace_overrides
263            .write()
264            .await
265            .insert(session_id.to_string(), expires_at);
266        self.event_bus.publish(EngineEvent::new(
267            "workspace.override.activated",
268            json!({
269                "sessionID": session_id,
270                "requestedTtlSeconds": ttl_seconds,
271                "cappedTtlSeconds": capped_ttl,
272                "expiresAt": expires_at,
273            }),
274        ));
275        expires_at
276    }
277
278    pub async fn run_prompt_async(
279        &self,
280        session_id: String,
281        req: SendMessageRequest,
282    ) -> anyhow::Result<()> {
283        self.run_prompt_async_with_context(session_id, req, None)
284            .await
285    }
286
287    pub async fn run_oneshot(&self, prompt: String) -> anyhow::Result<String> {
288        self.providers.default_complete(&prompt).await
289    }
290
291    pub async fn run_oneshot_for_provider(
292        &self,
293        prompt: String,
294        provider_id: Option<&str>,
295    ) -> anyhow::Result<String> {
296        self.providers
297            .complete_for_provider(provider_id, &prompt, None)
298            .await
299    }
300
301    #[allow(clippy::too_many_arguments)]
302    async fn execute_tool_with_permission(
303        &self,
304        session_id: &str,
305        message_id: &str,
306        tool: String,
307        args: Value,
308        initial_tool_call_id: Option<String>,
309        equipped_skills: Option<&[String]>,
310        latest_user_text: &str,
311        write_required: bool,
312        latest_assistant_context: Option<&str>,
313        cancel: CancellationToken,
314    ) -> anyhow::Result<Option<String>> {
315        let tool = normalize_tool_name(&tool);
316        let raw_args = args.clone();
317        let publish_tool_effect = |tool_call_id: Option<&str>,
318                                   phase: ToolEffectLedgerPhase,
319                                   status: ToolEffectLedgerStatus,
320                                   args: &Value,
321                                   metadata: Option<&Value>,
322                                   output: Option<&str>,
323                                   error: Option<&str>| {
324            self.event_bus
325                .publish(tool_effect_ledger_event(build_tool_effect_ledger_record(
326                    session_id,
327                    message_id,
328                    tool_call_id,
329                    &tool,
330                    phase,
331                    status,
332                    args,
333                    metadata,
334                    output,
335                    error,
336                )));
337        };
338        let normalized = normalize_tool_args_with_mode(
339            &tool,
340            args,
341            latest_user_text,
342            latest_assistant_context.unwrap_or_default(),
343            if write_required {
344                WritePathRecoveryMode::OutputTargetOnly
345            } else {
346                WritePathRecoveryMode::Heuristic
347            },
348        );
349        let raw_args_preview = truncate_text(&raw_args.to_string(), 2_000);
350        let normalized_args_preview = truncate_text(&normalized.args.to_string(), 2_000);
351        self.event_bus.publish(EngineEvent::new(
352            "tool.args.normalized",
353            json!({
354                "sessionID": session_id,
355                "messageID": message_id,
356                "tool": tool,
357                "argsSource": normalized.args_source,
358                "argsIntegrity": normalized.args_integrity,
359                "rawArgsState": normalized.raw_args_state.as_str(),
360                "rawArgsPreview": raw_args_preview,
361                "normalizedArgsPreview": normalized_args_preview,
362                "query": normalized.query,
363                "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
364                "requestID": Value::Null
365            }),
366        ));
367        if normalized.args_integrity == "recovered" {
368            self.event_bus.publish(EngineEvent::new(
369                "tool.args.recovered",
370                json!({
371                    "sessionID": session_id,
372                    "messageID": message_id,
373                    "tool": tool,
374                    "argsSource": normalized.args_source,
375                    "rawArgsPreview": raw_args_preview,
376                    "normalizedArgsPreview": normalized_args_preview,
377                    "query": normalized.query,
378                    "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
379                    "requestID": Value::Null
380                }),
381            ));
382        }
383        if normalized.missing_terminal {
384            let missing_reason = normalized
385                .missing_terminal_reason
386                .clone()
387                .unwrap_or_else(|| "TOOL_ARGUMENTS_MISSING".to_string());
388            let latest_user_preview = truncate_text(latest_user_text, 500);
389            let latest_assistant_preview =
390                truncate_text(latest_assistant_context.unwrap_or_default(), 500);
391            self.event_bus.publish(EngineEvent::new(
392                "tool.args.missing_terminal",
393                json!({
394                    "sessionID": session_id,
395                    "messageID": message_id,
396                    "tool": tool,
397                    "argsSource": normalized.args_source,
398                    "argsIntegrity": normalized.args_integrity,
399                    "rawArgsState": normalized.raw_args_state.as_str(),
400                    "requestID": Value::Null,
401                    "error": missing_reason,
402                    "rawArgsPreview": raw_args_preview,
403                    "normalizedArgsPreview": normalized_args_preview,
404                    "latestUserPreview": latest_user_preview,
405                    "latestAssistantPreview": latest_assistant_preview,
406                }),
407            ));
408            if tool == "write" {
409                tracing::warn!(
410                    session_id = %session_id,
411                    message_id = %message_id,
412                    tool = %tool,
413                    reason = %missing_reason,
414                    args_source = %normalized.args_source,
415                    args_integrity = %normalized.args_integrity,
416                    raw_args_state = %normalized.raw_args_state.as_str(),
417                    raw_args = %raw_args_preview,
418                    normalized_args = %normalized_args_preview,
419                    latest_user = %latest_user_preview,
420                    latest_assistant = %latest_assistant_preview,
421                    "write tool arguments missing terminal field"
422                );
423            }
424            let best_effort_args = persisted_failed_tool_args(&raw_args, &normalized.args);
425            let mut failed_part = WireMessagePart::tool_result(
426                session_id,
427                message_id,
428                tool.clone(),
429                Some(best_effort_args),
430                json!(null),
431            );
432            failed_part.state = Some("failed".to_string());
433            let surfaced_reason =
434                provider_specific_write_reason(&tool, &missing_reason, normalized.raw_args_state)
435                    .unwrap_or_else(|| missing_reason.clone());
436            failed_part.error = Some(surfaced_reason.clone());
437            self.event_bus.publish(EngineEvent::new(
438                "message.part.updated",
439                json!({"part": failed_part}),
440            ));
441            publish_tool_effect(
442                None,
443                ToolEffectLedgerPhase::Outcome,
444                ToolEffectLedgerStatus::Blocked,
445                &normalized.args,
446                None,
447                None,
448                Some(&surfaced_reason),
449            );
450            return Ok(Some(surfaced_reason));
451        }
452
453        let args = match enforce_skill_scope(&tool, normalized.args, equipped_skills) {
454            Ok(args) => args,
455            Err(message) => {
456                publish_tool_effect(
457                    None,
458                    ToolEffectLedgerPhase::Outcome,
459                    ToolEffectLedgerStatus::Blocked,
460                    &raw_args,
461                    None,
462                    None,
463                    Some(&message),
464                );
465                return Ok(Some(message));
466            }
467        };
468        if let Some(allowed_tools) = self
469            .session_allowed_tools
470            .read()
471            .await
472            .get(session_id)
473            .cloned()
474        {
475            if !allowed_tools.is_empty() && !any_policy_matches(&allowed_tools, &tool) {
476                let reason = format!("Tool `{tool}` is not allowed for this run.");
477                publish_tool_effect(
478                    None,
479                    ToolEffectLedgerPhase::Outcome,
480                    ToolEffectLedgerStatus::Blocked,
481                    &args,
482                    None,
483                    None,
484                    Some(&reason),
485                );
486                return Ok(Some(reason));
487            }
488        }
489        if let Some(hook) = self.tool_policy_hook.read().await.clone() {
490            let decision = hook
491                .evaluate_tool(ToolPolicyContext {
492                    session_id: session_id.to_string(),
493                    message_id: message_id.to_string(),
494                    tool: tool.clone(),
495                    args: args.clone(),
496                })
497                .await?;
498            if !decision.allowed {
499                let reason = decision
500                    .reason
501                    .unwrap_or_else(|| "Tool denied by runtime policy".to_string());
502                let mut blocked_part = WireMessagePart::tool_result(
503                    session_id,
504                    message_id,
505                    tool.clone(),
506                    Some(args.clone()),
507                    json!(null),
508                );
509                blocked_part.state = Some("failed".to_string());
510                blocked_part.error = Some(reason.clone());
511                self.event_bus.publish(EngineEvent::new(
512                    "message.part.updated",
513                    json!({"part": blocked_part}),
514                ));
515                publish_tool_effect(
516                    None,
517                    ToolEffectLedgerPhase::Outcome,
518                    ToolEffectLedgerStatus::Blocked,
519                    &args,
520                    None,
521                    None,
522                    Some(&reason),
523                );
524                return Ok(Some(reason));
525            }
526        }
527        let mut tool_call_id: Option<String> = initial_tool_call_id;
528        if let Some(violation) = self
529            .workspace_sandbox_violation(session_id, &tool, &args)
530            .await
531        {
532            let mut blocked_part = WireMessagePart::tool_result(
533                session_id,
534                message_id,
535                tool.clone(),
536                Some(args.clone()),
537                json!(null),
538            );
539            blocked_part.state = Some("failed".to_string());
540            blocked_part.error = Some(violation.clone());
541            self.event_bus.publish(EngineEvent::new(
542                "message.part.updated",
543                json!({"part": blocked_part}),
544            ));
545            publish_tool_effect(
546                tool_call_id.as_deref(),
547                ToolEffectLedgerPhase::Outcome,
548                ToolEffectLedgerStatus::Blocked,
549                &args,
550                None,
551                None,
552                Some(&violation),
553            );
554            return Ok(Some(violation));
555        }
556        let rule = self
557            .plugins
558            .permission_override(&tool)
559            .await
560            .unwrap_or(self.permissions.evaluate(&tool, &tool).await);
561        if matches!(rule, PermissionAction::Deny) {
562            let reason = format!("Permission denied for tool `{tool}` by policy.");
563            publish_tool_effect(
564                tool_call_id.as_deref(),
565                ToolEffectLedgerPhase::Outcome,
566                ToolEffectLedgerStatus::Blocked,
567                &args,
568                None,
569                None,
570                Some(&reason),
571            );
572            return Ok(Some(reason));
573        }
574
575        let mut effective_args = args.clone();
576        if matches!(rule, PermissionAction::Ask) {
577            let auto_approve_permissions = self
578                .session_auto_approve_permissions
579                .read()
580                .await
581                .get(session_id)
582                .copied()
583                .unwrap_or(false);
584            if auto_approve_permissions {
585                // Governance audit: if args were recovered via heuristics and the tool is
586                // mutating, log a WARN so recovered writes are never silent in automation
587                // mode. Does not block — operators must opt out via TANDEM_AUTO_APPROVE_RECOVERED_ARGS=false
588                // if they want a hard block (reserved for strict automation policy).
589                if normalized.args_integrity == "recovered" && is_workspace_write_tool(&tool) {
590                    tracing::warn!(
591                        session_id = %session_id,
592                        message_id = %message_id,
593                        tool = %tool,
594                        args_source = %normalized.args_source,
595                        "auto-approve granted for mutating tool with recovered args; verify intent"
596                    );
597                    self.event_bus.publish(EngineEvent::new(
598                        "tool.args.recovered_write_auto_approved",
599                        json!({
600                            "sessionID": session_id,
601                            "messageID": message_id,
602                            "tool": tool,
603                            "argsSource": normalized.args_source,
604                            "argsIntegrity": normalized.args_integrity,
605                        }),
606                    ));
607                }
608                self.event_bus.publish(EngineEvent::new(
609                    "permission.auto_approved",
610                    json!({
611                        "sessionID": session_id,
612                        "messageID": message_id,
613                        "tool": tool,
614                    }),
615                ));
616                effective_args = args;
617            } else {
618                let pending = self
619                    .permissions
620                    .ask_for_session_with_context(
621                        Some(session_id),
622                        &tool,
623                        args.clone(),
624                        Some(crate::PermissionArgsContext {
625                            args_source: normalized.args_source.clone(),
626                            args_integrity: normalized.args_integrity.clone(),
627                            query: normalized.query.clone(),
628                        }),
629                    )
630                    .await;
631                let mut pending_part = WireMessagePart::tool_invocation(
632                    session_id,
633                    message_id,
634                    tool.clone(),
635                    args.clone(),
636                );
637                pending_part.id = Some(pending.id.clone());
638                tool_call_id = Some(pending.id.clone());
639                pending_part.state = Some("pending".to_string());
640                self.event_bus.publish(EngineEvent::new(
641                    "message.part.updated",
642                    json!({"part": pending_part}),
643                ));
644                let reply = self
645                    .permissions
646                    .wait_for_reply_with_timeout(
647                        &pending.id,
648                        cancel.clone(),
649                        Some(Duration::from_millis(permission_wait_timeout_ms() as u64)),
650                    )
651                    .await;
652                let (reply, timed_out) = reply;
653                if cancel.is_cancelled() {
654                    return Ok(None);
655                }
656                if timed_out {
657                    let timeout_ms = permission_wait_timeout_ms();
658                    self.event_bus.publish(EngineEvent::new(
659                        "permission.wait.timeout",
660                        json!({
661                            "sessionID": session_id,
662                            "messageID": message_id,
663                            "tool": tool,
664                            "requestID": pending.id,
665                            "timeoutMs": timeout_ms,
666                        }),
667                    ));
668                    let mut timeout_part = WireMessagePart::tool_result(
669                        session_id,
670                        message_id,
671                        tool.clone(),
672                        Some(args.clone()),
673                        json!(null),
674                    );
675                    timeout_part.id = Some(pending.id);
676                    timeout_part.state = Some("failed".to_string());
677                    timeout_part.error = Some(format!(
678                        "Permission request timed out after {} ms",
679                        timeout_ms
680                    ));
681                    self.event_bus.publish(EngineEvent::new(
682                        "message.part.updated",
683                        json!({"part": timeout_part}),
684                    ));
685                    let timeout_reason = format!(
686                        "Permission request for tool `{tool}` timed out after {timeout_ms} ms."
687                    );
688                    publish_tool_effect(
689                        tool_call_id.as_deref(),
690                        ToolEffectLedgerPhase::Outcome,
691                        ToolEffectLedgerStatus::Blocked,
692                        &args,
693                        None,
694                        None,
695                        Some(&timeout_reason),
696                    );
697                    return Ok(Some(format!(
698                        "Permission request for tool `{tool}` timed out after {timeout_ms} ms."
699                    )));
700                }
701                let approved = matches!(reply.as_deref(), Some("once" | "always" | "allow"));
702                if !approved {
703                    let mut denied_part = WireMessagePart::tool_result(
704                        session_id,
705                        message_id,
706                        tool.clone(),
707                        Some(args.clone()),
708                        json!(null),
709                    );
710                    denied_part.id = Some(pending.id);
711                    denied_part.state = Some("denied".to_string());
712                    denied_part.error = Some("Permission denied by user".to_string());
713                    self.event_bus.publish(EngineEvent::new(
714                        "message.part.updated",
715                        json!({"part": denied_part}),
716                    ));
717                    let denied_reason = format!("Permission denied for tool `{tool}` by user.");
718                    publish_tool_effect(
719                        tool_call_id.as_deref(),
720                        ToolEffectLedgerPhase::Outcome,
721                        ToolEffectLedgerStatus::Blocked,
722                        &args,
723                        None,
724                        None,
725                        Some(&denied_reason),
726                    );
727                    return Ok(Some(format!(
728                        "Permission denied for tool `{tool}` by user."
729                    )));
730                }
731                effective_args = args;
732            }
733        }
734
735        let mut args = self.plugins.inject_tool_args(&tool, effective_args).await;
736        let session = self.storage.get_session(session_id).await;
737        if let (Some(obj), Some(session)) = (args.as_object_mut(), session.as_ref()) {
738            obj.insert(
739                "__session_id".to_string(),
740                Value::String(session_id.to_string()),
741            );
742            if let Some(project_id) = session.project_id.clone() {
743                obj.insert(
744                    "__project_id".to_string(),
745                    Value::String(project_id.clone()),
746                );
747                if project_id.starts_with("channel-public::") {
748                    obj.insert(
749                        "__memory_max_visible_scope".to_string(),
750                        Value::String("project".to_string()),
751                    );
752                }
753            }
754        }
755        let tool_context = self.resolve_tool_execution_context(session_id).await;
756        if let Some((workspace_root, effective_cwd, project_id)) = tool_context.as_ref() {
757            args = rewrite_workspace_alias_tool_args(&tool, args, workspace_root);
758            if let Some(obj) = args.as_object_mut() {
759                obj.insert(
760                    "__workspace_root".to_string(),
761                    Value::String(workspace_root.clone()),
762                );
763                obj.insert(
764                    "__effective_cwd".to_string(),
765                    Value::String(effective_cwd.clone()),
766                );
767                obj.insert(
768                    "__session_id".to_string(),
769                    Value::String(session_id.to_string()),
770                );
771                if let Some(project_id) = project_id.clone() {
772                    obj.insert("__project_id".to_string(), Value::String(project_id));
773                }
774            }
775            tracing::info!(
776                "tool execution context session_id={} tool={} workspace_root={} effective_cwd={} project_id={}",
777                session_id,
778                tool,
779                workspace_root,
780                effective_cwd,
781                project_id.clone().unwrap_or_default()
782            );
783        }
784        let mut invoke_part =
785            WireMessagePart::tool_invocation(session_id, message_id, tool.clone(), args.clone());
786        if let Some(call_id) = tool_call_id.clone() {
787            invoke_part.id = Some(call_id);
788        }
789        let invoke_part_id = invoke_part.id.clone();
790        self.event_bus.publish(EngineEvent::new(
791            "message.part.updated",
792            json!({"part": invoke_part}),
793        ));
794        let args_for_side_events = args.clone();
795        let mutation_checkpoint = prepare_mutation_checkpoint(&tool, &args_for_side_events);
796        let progress_sink: SharedToolProgressSink = std::sync::Arc::new(EngineToolProgressSink {
797            event_bus: self.event_bus.clone(),
798            session_id: session_id.to_string(),
799            message_id: message_id.to_string(),
800            tool_call_id: invoke_part_id.clone(),
801            source_tool: tool.clone(),
802        });
803        publish_tool_effect(
804            invoke_part_id.as_deref(),
805            ToolEffectLedgerPhase::Invocation,
806            ToolEffectLedgerStatus::Started,
807            &args_for_side_events,
808            None,
809            None,
810            None,
811        );
812        let publish_mutation_checkpoint =
813            |tool_call_id: Option<&str>, outcome: MutationCheckpointOutcome| {
814                if let Some(baseline) = mutation_checkpoint.as_ref() {
815                    self.event_bus.publish(mutation_checkpoint_event(
816                        finalize_mutation_checkpoint_record(
817                            session_id,
818                            message_id,
819                            tool_call_id,
820                            baseline,
821                            outcome,
822                        ),
823                    ));
824                }
825            };
826        if tool == "spawn_agent" {
827            let hook = self.spawn_agent_hook.read().await.clone();
828            if let Some(hook) = hook {
829                let spawned = hook
830                    .spawn_agent(SpawnAgentToolContext {
831                        session_id: session_id.to_string(),
832                        message_id: message_id.to_string(),
833                        tool_call_id: invoke_part_id.clone(),
834                        args: args_for_side_events.clone(),
835                    })
836                    .await?;
837                let output = self.plugins.transform_tool_output(spawned.output).await;
838                let output = truncate_text(&output, 16_000);
839                emit_tool_side_events(
840                    self.storage.clone(),
841                    &self.event_bus,
842                    ToolSideEventContext {
843                        session_id,
844                        message_id,
845                        tool: &tool,
846                        args: &args_for_side_events,
847                        metadata: &spawned.metadata,
848                        workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
849                        effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
850                    },
851                )
852                .await;
853                let mut result_part = WireMessagePart::tool_result(
854                    session_id,
855                    message_id,
856                    tool.clone(),
857                    Some(args_for_side_events.clone()),
858                    json!(output.clone()),
859                );
860                result_part.id = invoke_part_id.clone();
861                self.event_bus.publish(EngineEvent::new(
862                    "message.part.updated",
863                    json!({"part": result_part}),
864                ));
865                publish_tool_effect(
866                    invoke_part_id.as_deref(),
867                    ToolEffectLedgerPhase::Outcome,
868                    ToolEffectLedgerStatus::Succeeded,
869                    &args_for_side_events,
870                    Some(&spawned.metadata),
871                    Some(&output),
872                    None,
873                );
874                publish_mutation_checkpoint(
875                    invoke_part_id.as_deref(),
876                    MutationCheckpointOutcome::Succeeded,
877                );
878                return Ok(Some(truncate_text(
879                    &format!("Tool `{tool}` result:\n{output}"),
880                    16_000,
881                )));
882            }
883            let output = "spawn_agent is unavailable in this runtime (no spawn hook installed).";
884            let mut failed_part = WireMessagePart::tool_result(
885                session_id,
886                message_id,
887                tool.clone(),
888                Some(args_for_side_events.clone()),
889                json!(null),
890            );
891            failed_part.id = invoke_part_id.clone();
892            failed_part.state = Some("failed".to_string());
893            failed_part.error = Some(output.to_string());
894            self.event_bus.publish(EngineEvent::new(
895                "message.part.updated",
896                json!({"part": failed_part}),
897            ));
898            publish_tool_effect(
899                invoke_part_id.as_deref(),
900                ToolEffectLedgerPhase::Outcome,
901                ToolEffectLedgerStatus::Failed,
902                &args_for_side_events,
903                None,
904                None,
905                Some(output),
906            );
907            publish_mutation_checkpoint(
908                invoke_part_id.as_deref(),
909                MutationCheckpointOutcome::Failed,
910            );
911            return Ok(Some(output.to_string()));
912        }
913        // Batch governance: validate sub-calls against engine policy and inject execution context
914        // before delegating to BatchTool. This ensures sub-calls cannot bypass permissions,
915        // sandbox checks, or allowed-tool lists, and that they receive the correct workspace
916        // context (__workspace_root, __effective_cwd, __session_id, __project_id).
917        //
918        // By this point `args` already has those keys injected (see context injection above).
919        if tool == "batch" {
920            let allowed_tools = self
921                .session_allowed_tools
922                .read()
923                .await
924                .get(session_id)
925                .cloned()
926                .unwrap_or_default();
927
928            // Extract parent execution context from already-injected batch args.
929            let ctx_workspace_root = args
930                .get("__workspace_root")
931                .and_then(|v| v.as_str())
932                .map(ToString::to_string);
933            let ctx_effective_cwd = args
934                .get("__effective_cwd")
935                .and_then(|v| v.as_str())
936                .map(ToString::to_string);
937            let ctx_session_id = args
938                .get("__session_id")
939                .and_then(|v| v.as_str())
940                .map(ToString::to_string);
941            let ctx_project_id = args
942                .get("__project_id")
943                .and_then(|v| v.as_str())
944                .map(ToString::to_string);
945
946            // Process each sub-call: check governance, inject context.
947            let raw_calls = args
948                .get("tool_calls")
949                .and_then(|v| v.as_array())
950                .cloned()
951                .unwrap_or_default();
952
953            let mut governed_calls: Vec<Value> = Vec::new();
954            for mut call in raw_calls {
955                let (sub_tool, mut sub_args) = {
956                    let obj = match call.as_object() {
957                        Some(o) => o,
958                        None => {
959                            governed_calls.push(call);
960                            continue;
961                        }
962                    };
963                    let tool_raw = non_empty_string_at(obj, "tool")
964                        .or_else(|| nested_non_empty_string_at(obj, "function", "name"))
965                        .or_else(|| nested_non_empty_string_at(obj, "tool", "name"))
966                        .or_else(|| non_empty_string_at(obj, "name"));
967                    let sub_tool = match tool_raw {
968                        Some(t) => normalize_tool_name(t),
969                        None => {
970                            governed_calls.push(call);
971                            continue;
972                        }
973                    };
974                    let sub_args = obj.get("args").cloned().unwrap_or_else(|| json!({}));
975                    (sub_tool, sub_args)
976                };
977
978                // 1. Allowed-tools check.
979                if !allowed_tools.is_empty() && !any_policy_matches(&allowed_tools, &sub_tool) {
980                    // Strip this sub-call: replace it with an explanatory result.
981                    if let Some(obj) = call.as_object_mut() {
982                        obj.insert(
983                            "_blocked".to_string(),
984                            Value::String(format!(
985                                "batch sub-call skipped: tool `{sub_tool}` is not in the allowed list for this run"
986                            )),
987                        );
988                    }
989                    governed_calls.push(call);
990                    continue;
991                }
992
993                // 2. Workspace sandbox check.
994                if let Some(violation) = self
995                    .workspace_sandbox_violation(session_id, &sub_tool, &sub_args)
996                    .await
997                {
998                    if let Some(obj) = call.as_object_mut() {
999                        obj.insert(
1000                            "_blocked".to_string(),
1001                            Value::String(format!("batch sub-call skipped: {violation}")),
1002                        );
1003                    }
1004                    governed_calls.push(call);
1005                    continue;
1006                }
1007
1008                // 3. Inject parent execution context into sub-call args.
1009                if let Some(sub_obj) = sub_args.as_object_mut() {
1010                    if let Some(ref v) = ctx_workspace_root {
1011                        sub_obj
1012                            .entry("__workspace_root")
1013                            .or_insert_with(|| Value::String(v.clone()));
1014                    }
1015                    if let Some(ref v) = ctx_effective_cwd {
1016                        sub_obj
1017                            .entry("__effective_cwd")
1018                            .or_insert_with(|| Value::String(v.clone()));
1019                    }
1020                    if let Some(ref v) = ctx_session_id {
1021                        sub_obj
1022                            .entry("__session_id")
1023                            .or_insert_with(|| Value::String(v.clone()));
1024                    }
1025                    if let Some(ref v) = ctx_project_id {
1026                        sub_obj
1027                            .entry("__project_id")
1028                            .or_insert_with(|| Value::String(v.clone()));
1029                    }
1030                }
1031
1032                // Write enriched args back into the call object.
1033                if let Some(obj) = call.as_object_mut() {
1034                    obj.insert("args".to_string(), sub_args);
1035                }
1036                governed_calls.push(call);
1037            }
1038
1039            // Rebuild batch args with the governed sub-calls.
1040            if let Some(obj) = args.as_object_mut() {
1041                obj.insert("tool_calls".to_string(), Value::Array(governed_calls));
1042            }
1043        }
1044        let result = match self
1045            .execute_tool_with_timeout(&tool, args, cancel.clone(), Some(progress_sink))
1046            .await
1047        {
1048            Ok(result) => result,
1049            Err(err) => {
1050                let err_text = err.to_string();
1051                if err_text.contains("TOOL_EXEC_TIMEOUT_MS_EXCEEDED(") {
1052                    let timeout_ms = tool_exec_timeout_ms();
1053                    let timeout_output = format!(
1054                        "Tool `{tool}` timed out after {timeout_ms} ms. It was stopped to keep this run responsive."
1055                    );
1056                    let mut failed_part = WireMessagePart::tool_result(
1057                        session_id,
1058                        message_id,
1059                        tool.clone(),
1060                        Some(args_for_side_events.clone()),
1061                        json!(null),
1062                    );
1063                    failed_part.id = invoke_part_id.clone();
1064                    failed_part.state = Some("failed".to_string());
1065                    failed_part.error = Some(timeout_output.clone());
1066                    self.event_bus.publish(EngineEvent::new(
1067                        "message.part.updated",
1068                        json!({"part": failed_part}),
1069                    ));
1070                    publish_tool_effect(
1071                        invoke_part_id.as_deref(),
1072                        ToolEffectLedgerPhase::Outcome,
1073                        ToolEffectLedgerStatus::Failed,
1074                        &args_for_side_events,
1075                        None,
1076                        None,
1077                        Some(&timeout_output),
1078                    );
1079                    publish_mutation_checkpoint(
1080                        invoke_part_id.as_deref(),
1081                        MutationCheckpointOutcome::Failed,
1082                    );
1083                    return Ok(Some(timeout_output));
1084                }
1085                if let Some(auth) = extract_mcp_auth_required_from_error_text(&tool, &err_text) {
1086                    self.event_bus.publish(EngineEvent::new(
1087                        "mcp.auth.required",
1088                        json!({
1089                            "sessionID": session_id,
1090                            "messageID": message_id,
1091                            "tool": tool.clone(),
1092                            "server": auth.server,
1093                            "authorizationUrl": auth.authorization_url,
1094                            "message": auth.message,
1095                            "challengeId": auth.challenge_id
1096                        }),
1097                    ));
1098                    let auth_output = format!(
1099                        "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
1100                        tool, auth.message, auth.authorization_url
1101                    );
1102                    let mut result_part = WireMessagePart::tool_result(
1103                        session_id,
1104                        message_id,
1105                        tool.clone(),
1106                        Some(args_for_side_events.clone()),
1107                        json!(auth_output.clone()),
1108                    );
1109                    result_part.id = invoke_part_id.clone();
1110                    self.event_bus.publish(EngineEvent::new(
1111                        "message.part.updated",
1112                        json!({"part": result_part}),
1113                    ));
1114                    publish_tool_effect(
1115                        invoke_part_id.as_deref(),
1116                        ToolEffectLedgerPhase::Outcome,
1117                        ToolEffectLedgerStatus::Blocked,
1118                        &args_for_side_events,
1119                        None,
1120                        Some(&auth_output),
1121                        Some(&auth.message),
1122                    );
1123                    publish_mutation_checkpoint(
1124                        invoke_part_id.as_deref(),
1125                        MutationCheckpointOutcome::Blocked,
1126                    );
1127                    return Ok(Some(truncate_text(
1128                        &format!("Tool `{tool}` result:\n{auth_output}"),
1129                        16_000,
1130                    )));
1131                }
1132                let mut failed_part = WireMessagePart::tool_result(
1133                    session_id,
1134                    message_id,
1135                    tool.clone(),
1136                    Some(args_for_side_events.clone()),
1137                    json!(null),
1138                );
1139                failed_part.id = invoke_part_id.clone();
1140                failed_part.state = Some("failed".to_string());
1141                failed_part.error = Some(err_text.clone());
1142                self.event_bus.publish(EngineEvent::new(
1143                    "message.part.updated",
1144                    json!({"part": failed_part}),
1145                ));
1146                publish_tool_effect(
1147                    invoke_part_id.as_deref(),
1148                    ToolEffectLedgerPhase::Outcome,
1149                    ToolEffectLedgerStatus::Failed,
1150                    &args_for_side_events,
1151                    None,
1152                    None,
1153                    Some(&err_text),
1154                );
1155                publish_mutation_checkpoint(
1156                    invoke_part_id.as_deref(),
1157                    MutationCheckpointOutcome::Failed,
1158                );
1159                return Err(err);
1160            }
1161        };
1162        if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
1163            let event_name = if auth.pending && auth.blocked {
1164                "mcp.auth.pending"
1165            } else {
1166                "mcp.auth.required"
1167            };
1168            self.event_bus.publish(EngineEvent::new(
1169                event_name,
1170                json!({
1171                    "sessionID": session_id,
1172                    "messageID": message_id,
1173                    "tool": tool.clone(),
1174                    "server": auth.server,
1175                    "authorizationUrl": auth.authorization_url,
1176                    "message": auth.message,
1177                    "challengeId": auth.challenge_id,
1178                    "pending": auth.pending,
1179                    "blocked": auth.blocked,
1180                    "retryAfterMs": auth.retry_after_ms
1181                }),
1182            ));
1183        }
1184        emit_tool_side_events(
1185            self.storage.clone(),
1186            &self.event_bus,
1187            ToolSideEventContext {
1188                session_id,
1189                message_id,
1190                tool: &tool,
1191                args: &args_for_side_events,
1192                metadata: &result.metadata,
1193                workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
1194                effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
1195            },
1196        )
1197        .await;
1198        let output = if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
1199            if auth.pending && auth.blocked {
1200                let retry_after_secs = auth.retry_after_ms.unwrap_or(0).div_ceil(1000);
1201                format!(
1202                    "Authorization pending for `{}`.\n{}\n\nAuthorize here: {}\nRetry after {}s.",
1203                    tool, auth.message, auth.authorization_url, retry_after_secs
1204                )
1205            } else {
1206                format!(
1207                    "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
1208                    tool, auth.message, auth.authorization_url
1209                )
1210            }
1211        } else {
1212            self.plugins.transform_tool_output(result.output).await
1213        };
1214        let output = truncate_text(&output, 16_000);
1215        let mut result_part = WireMessagePart::tool_result(
1216            session_id,
1217            message_id,
1218            tool.clone(),
1219            Some(args_for_side_events.clone()),
1220            json!(output.clone()),
1221        );
1222        result_part.id = invoke_part_id.clone();
1223        self.event_bus.publish(EngineEvent::new(
1224            "message.part.updated",
1225            json!({"part": result_part}),
1226        ));
1227        publish_tool_effect(
1228            invoke_part_id.as_deref(),
1229            ToolEffectLedgerPhase::Outcome,
1230            ToolEffectLedgerStatus::Succeeded,
1231            &args_for_side_events,
1232            Some(&result.metadata),
1233            Some(&output),
1234            None,
1235        );
1236        publish_mutation_checkpoint(
1237            invoke_part_id.as_deref(),
1238            MutationCheckpointOutcome::Succeeded,
1239        );
1240        Ok(Some(truncate_text(
1241            &format!("Tool `{tool}` result:\n{output}"),
1242            16_000,
1243        )))
1244    }
1245}
1246
1247#[cfg(test)]
1248mod tests;