Skip to main content

tandem_core/
engine_loop.rs

1use chrono::Utc;
2use futures::StreamExt;
3use serde_json::{json, Value};
4use std::collections::{hash_map::DefaultHasher, HashMap, HashSet};
5use std::hash::{Hash, Hasher};
6use std::path::{Path, PathBuf};
7use std::time::Duration;
8use tandem_observability::{emit_event, ObservabilityEvent, ProcessKind};
9use tandem_providers::{ChatMessage, ProviderRegistry, StreamChunk, TokenUsage};
10use tandem_tools::{validate_tool_schemas, ToolRegistry};
11use tandem_types::{
12    ContextMode, EngineEvent, HostRuntimeContext, Message, MessagePart, MessagePartInput,
13    MessageRole, ModelSpec, SendMessageRequest, SharedToolProgressSink, ToolMode, ToolSchema,
14};
15use tandem_wire::WireMessagePart;
16use tokio_util::sync::CancellationToken;
17use tracing::Level;
18
19mod loop_guards;
20mod loop_tuning;
21mod prewrite_gate;
22mod prewrite_mode;
23mod prompt_context;
24mod prompt_execution;
25mod prompt_helpers;
26mod prompt_runtime;
27mod tool_execution;
28mod tool_output;
29mod tool_parsing;
30mod types;
31
32use loop_guards::{
33    duplicate_signature_limit_for, tool_budget_for, websearch_duplicate_signature_limit,
34};
35use loop_tuning::{
36    max_tool_iterations, permission_wait_timeout_ms, prompt_context_hook_timeout_ms,
37    provider_stream_connect_timeout_ms, provider_stream_idle_timeout_ms,
38    strict_write_retry_max_attempts, tool_exec_timeout_ms,
39};
40use prewrite_gate::{evaluate_prewrite_gate, PrewriteProgress};
41use prewrite_mode::*;
42use prompt_context::{
43    format_context_mode, mcp_catalog_in_system_prompt_enabled, semantic_tool_retrieval_enabled,
44    semantic_tool_retrieval_k, tandem_runtime_system_prompt,
45};
46use prompt_helpers::*;
47use prompt_runtime::*;
48use tool_output::*;
49use tool_parsing::*;
50use types::{EngineToolProgressSink, StreamedToolCall, WritePathRecoveryMode};
51
52pub use prewrite_mode::prewrite_repair_retry_max_attempts;
53pub use types::{
54    PromptContextHook, PromptContextHookContext, SpawnAgentHook, SpawnAgentToolContext,
55    SpawnAgentToolResult, ToolPolicyContext, ToolPolicyDecision, ToolPolicyHook,
56};
57
58use crate::tool_router::{
59    classify_intent, default_mode_name, is_short_simple_prompt, select_tool_subset,
60    should_escalate_auto_tools, tool_router_enabled, ToolIntent, ToolRoutingDecision,
61};
62use crate::{
63    any_policy_matches, derive_session_title_from_prompt, title_needs_repair,
64    tool_name_matches_policy, AgentDefinition, AgentRegistry, CancellationRegistry, EventBus,
65    PermissionAction, PermissionManager, PluginRegistry, Storage,
66};
67use crate::{
68    build_tool_effect_ledger_record, finalize_mutation_checkpoint_record,
69    mutation_checkpoint_event, prepare_mutation_checkpoint, tool_effect_ledger_event,
70    MutationCheckpointOutcome, ToolEffectLedgerPhase, ToolEffectLedgerStatus,
71};
72use tokio::sync::RwLock;
73
74#[derive(Clone)]
75pub struct EngineLoop {
76    storage: std::sync::Arc<Storage>,
77    event_bus: EventBus,
78    providers: ProviderRegistry,
79    plugins: PluginRegistry,
80    agents: AgentRegistry,
81    permissions: PermissionManager,
82    tools: ToolRegistry,
83    cancellations: CancellationRegistry,
84    host_runtime_context: HostRuntimeContext,
85    workspace_overrides: std::sync::Arc<RwLock<HashMap<String, u64>>>,
86    session_allowed_tools: std::sync::Arc<RwLock<HashMap<String, Vec<String>>>>,
87    session_auto_approve_permissions: std::sync::Arc<RwLock<HashMap<String, bool>>>,
88    spawn_agent_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn SpawnAgentHook>>>>,
89    tool_policy_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn ToolPolicyHook>>>>,
90    prompt_context_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn PromptContextHook>>>>,
91}
92
93impl EngineLoop {
94    #[allow(clippy::too_many_arguments)]
95    pub fn new(
96        storage: std::sync::Arc<Storage>,
97        event_bus: EventBus,
98        providers: ProviderRegistry,
99        plugins: PluginRegistry,
100        agents: AgentRegistry,
101        permissions: PermissionManager,
102        tools: ToolRegistry,
103        cancellations: CancellationRegistry,
104        host_runtime_context: HostRuntimeContext,
105    ) -> Self {
106        Self {
107            storage,
108            event_bus,
109            providers,
110            plugins,
111            agents,
112            permissions,
113            tools,
114            cancellations,
115            host_runtime_context,
116            workspace_overrides: std::sync::Arc::new(RwLock::new(HashMap::new())),
117            session_allowed_tools: std::sync::Arc::new(RwLock::new(HashMap::new())),
118            session_auto_approve_permissions: std::sync::Arc::new(RwLock::new(HashMap::new())),
119            spawn_agent_hook: std::sync::Arc::new(RwLock::new(None)),
120            tool_policy_hook: std::sync::Arc::new(RwLock::new(None)),
121            prompt_context_hook: std::sync::Arc::new(RwLock::new(None)),
122        }
123    }
124
125    pub async fn set_spawn_agent_hook(&self, hook: std::sync::Arc<dyn SpawnAgentHook>) {
126        *self.spawn_agent_hook.write().await = Some(hook);
127    }
128
129    pub async fn set_tool_policy_hook(&self, hook: std::sync::Arc<dyn ToolPolicyHook>) {
130        *self.tool_policy_hook.write().await = Some(hook);
131    }
132
133    pub async fn set_prompt_context_hook(&self, hook: std::sync::Arc<dyn PromptContextHook>) {
134        *self.prompt_context_hook.write().await = Some(hook);
135    }
136
137    pub async fn set_session_allowed_tools(&self, session_id: &str, allowed_tools: Vec<String>) {
138        let normalized = allowed_tools
139            .into_iter()
140            .map(|tool| normalize_tool_name(&tool))
141            .filter(|tool| !tool.trim().is_empty())
142            .collect::<Vec<_>>();
143        self.session_allowed_tools
144            .write()
145            .await
146            .insert(session_id.to_string(), normalized);
147    }
148
149    pub async fn clear_session_allowed_tools(&self, session_id: &str) {
150        self.session_allowed_tools.write().await.remove(session_id);
151    }
152
153    pub async fn get_session_allowed_tools(&self, session_id: &str) -> Vec<String> {
154        self.session_allowed_tools
155            .read()
156            .await
157            .get(session_id)
158            .cloned()
159            .unwrap_or_default()
160    }
161
162    pub async fn set_session_auto_approve_permissions(&self, session_id: &str, enabled: bool) {
163        if enabled {
164            self.session_auto_approve_permissions
165                .write()
166                .await
167                .insert(session_id.to_string(), true);
168        } else {
169            self.session_auto_approve_permissions
170                .write()
171                .await
172                .remove(session_id);
173        }
174    }
175
176    pub async fn clear_session_auto_approve_permissions(&self, session_id: &str) {
177        self.session_auto_approve_permissions
178            .write()
179            .await
180            .remove(session_id);
181    }
182
183    pub async fn grant_workspace_override_for_session(
184        &self,
185        session_id: &str,
186        ttl_seconds: u64,
187    ) -> u64 {
188        // Cap the override TTL to prevent indefinite sandbox bypass.
189        const MAX_WORKSPACE_OVERRIDE_TTL_SECONDS: u64 = 600; // 10 minutes
190        let capped_ttl = ttl_seconds.min(MAX_WORKSPACE_OVERRIDE_TTL_SECONDS);
191        if capped_ttl < ttl_seconds {
192            tracing::warn!(
193                session_id = %session_id,
194                requested_ttl_s = %ttl_seconds,
195                capped_ttl_s = %capped_ttl,
196                "workspace override TTL capped to maximum allowed value"
197            );
198        }
199        let expires_at = chrono::Utc::now()
200            .timestamp_millis()
201            .max(0)
202            .saturating_add((capped_ttl as i64).saturating_mul(1000))
203            as u64;
204        self.workspace_overrides
205            .write()
206            .await
207            .insert(session_id.to_string(), expires_at);
208        self.event_bus.publish(EngineEvent::new(
209            "workspace.override.activated",
210            json!({
211                "sessionID": session_id,
212                "requestedTtlSeconds": ttl_seconds,
213                "cappedTtlSeconds": capped_ttl,
214                "expiresAt": expires_at,
215            }),
216        ));
217        expires_at
218    }
219
220    pub async fn run_prompt_async(
221        &self,
222        session_id: String,
223        req: SendMessageRequest,
224    ) -> anyhow::Result<()> {
225        self.run_prompt_async_with_context(session_id, req, None)
226            .await
227    }
228
229    pub async fn run_oneshot(&self, prompt: String) -> anyhow::Result<String> {
230        self.providers.default_complete(&prompt).await
231    }
232
233    pub async fn run_oneshot_for_provider(
234        &self,
235        prompt: String,
236        provider_id: Option<&str>,
237    ) -> anyhow::Result<String> {
238        self.providers
239            .complete_for_provider(provider_id, &prompt, None)
240            .await
241    }
242
243    #[allow(clippy::too_many_arguments)]
244    async fn execute_tool_with_permission(
245        &self,
246        session_id: &str,
247        message_id: &str,
248        tool: String,
249        args: Value,
250        initial_tool_call_id: Option<String>,
251        equipped_skills: Option<&[String]>,
252        latest_user_text: &str,
253        write_required: bool,
254        latest_assistant_context: Option<&str>,
255        cancel: CancellationToken,
256    ) -> anyhow::Result<Option<String>> {
257        let tool = normalize_tool_name(&tool);
258        let raw_args = args.clone();
259        let publish_tool_effect = |tool_call_id: Option<&str>,
260                                   phase: ToolEffectLedgerPhase,
261                                   status: ToolEffectLedgerStatus,
262                                   args: &Value,
263                                   metadata: Option<&Value>,
264                                   output: Option<&str>,
265                                   error: Option<&str>| {
266            self.event_bus
267                .publish(tool_effect_ledger_event(build_tool_effect_ledger_record(
268                    session_id,
269                    message_id,
270                    tool_call_id,
271                    &tool,
272                    phase,
273                    status,
274                    args,
275                    metadata,
276                    output,
277                    error,
278                )));
279        };
280        let normalized = normalize_tool_args_with_mode(
281            &tool,
282            args,
283            latest_user_text,
284            latest_assistant_context.unwrap_or_default(),
285            if write_required {
286                WritePathRecoveryMode::OutputTargetOnly
287            } else {
288                WritePathRecoveryMode::Heuristic
289            },
290        );
291        let raw_args_preview = truncate_text(&raw_args.to_string(), 2_000);
292        let normalized_args_preview = truncate_text(&normalized.args.to_string(), 2_000);
293        self.event_bus.publish(EngineEvent::new(
294            "tool.args.normalized",
295            json!({
296                "sessionID": session_id,
297                "messageID": message_id,
298                "tool": tool,
299                "argsSource": normalized.args_source,
300                "argsIntegrity": normalized.args_integrity,
301                "rawArgsState": normalized.raw_args_state.as_str(),
302                "rawArgsPreview": raw_args_preview,
303                "normalizedArgsPreview": normalized_args_preview,
304                "query": normalized.query,
305                "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
306                "requestID": Value::Null
307            }),
308        ));
309        if normalized.args_integrity == "recovered" {
310            self.event_bus.publish(EngineEvent::new(
311                "tool.args.recovered",
312                json!({
313                    "sessionID": session_id,
314                    "messageID": message_id,
315                    "tool": tool,
316                    "argsSource": normalized.args_source,
317                    "rawArgsPreview": raw_args_preview,
318                    "normalizedArgsPreview": normalized_args_preview,
319                    "query": normalized.query,
320                    "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
321                    "requestID": Value::Null
322                }),
323            ));
324        }
325        if normalized.missing_terminal {
326            let missing_reason = normalized
327                .missing_terminal_reason
328                .clone()
329                .unwrap_or_else(|| "TOOL_ARGUMENTS_MISSING".to_string());
330            let latest_user_preview = truncate_text(latest_user_text, 500);
331            let latest_assistant_preview =
332                truncate_text(latest_assistant_context.unwrap_or_default(), 500);
333            self.event_bus.publish(EngineEvent::new(
334                "tool.args.missing_terminal",
335                json!({
336                    "sessionID": session_id,
337                    "messageID": message_id,
338                    "tool": tool,
339                    "argsSource": normalized.args_source,
340                    "argsIntegrity": normalized.args_integrity,
341                    "rawArgsState": normalized.raw_args_state.as_str(),
342                    "requestID": Value::Null,
343                    "error": missing_reason,
344                    "rawArgsPreview": raw_args_preview,
345                    "normalizedArgsPreview": normalized_args_preview,
346                    "latestUserPreview": latest_user_preview,
347                    "latestAssistantPreview": latest_assistant_preview,
348                }),
349            ));
350            if tool == "write" {
351                tracing::warn!(
352                    session_id = %session_id,
353                    message_id = %message_id,
354                    tool = %tool,
355                    reason = %missing_reason,
356                    args_source = %normalized.args_source,
357                    args_integrity = %normalized.args_integrity,
358                    raw_args_state = %normalized.raw_args_state.as_str(),
359                    raw_args = %raw_args_preview,
360                    normalized_args = %normalized_args_preview,
361                    latest_user = %latest_user_preview,
362                    latest_assistant = %latest_assistant_preview,
363                    "write tool arguments missing terminal field"
364                );
365            }
366            let best_effort_args = persisted_failed_tool_args(&raw_args, &normalized.args);
367            let mut failed_part = WireMessagePart::tool_result(
368                session_id,
369                message_id,
370                tool.clone(),
371                Some(best_effort_args),
372                json!(null),
373            );
374            failed_part.state = Some("failed".to_string());
375            let surfaced_reason =
376                provider_specific_write_reason(&tool, &missing_reason, normalized.raw_args_state)
377                    .unwrap_or_else(|| missing_reason.clone());
378            failed_part.error = Some(surfaced_reason.clone());
379            self.event_bus.publish(EngineEvent::new(
380                "message.part.updated",
381                json!({"part": failed_part}),
382            ));
383            publish_tool_effect(
384                None,
385                ToolEffectLedgerPhase::Outcome,
386                ToolEffectLedgerStatus::Blocked,
387                &normalized.args,
388                None,
389                None,
390                Some(&surfaced_reason),
391            );
392            return Ok(Some(surfaced_reason));
393        }
394
395        let args = match enforce_skill_scope(&tool, normalized.args, equipped_skills) {
396            Ok(args) => args,
397            Err(message) => {
398                publish_tool_effect(
399                    None,
400                    ToolEffectLedgerPhase::Outcome,
401                    ToolEffectLedgerStatus::Blocked,
402                    &raw_args,
403                    None,
404                    None,
405                    Some(&message),
406                );
407                return Ok(Some(message));
408            }
409        };
410        if let Some(allowed_tools) = self
411            .session_allowed_tools
412            .read()
413            .await
414            .get(session_id)
415            .cloned()
416        {
417            if !allowed_tools.is_empty() && !any_policy_matches(&allowed_tools, &tool) {
418                let reason = format!("Tool `{tool}` is not allowed for this run.");
419                publish_tool_effect(
420                    None,
421                    ToolEffectLedgerPhase::Outcome,
422                    ToolEffectLedgerStatus::Blocked,
423                    &args,
424                    None,
425                    None,
426                    Some(&reason),
427                );
428                return Ok(Some(reason));
429            }
430        }
431        if let Some(hook) = self.tool_policy_hook.read().await.clone() {
432            let decision = hook
433                .evaluate_tool(ToolPolicyContext {
434                    session_id: session_id.to_string(),
435                    message_id: message_id.to_string(),
436                    tool: tool.clone(),
437                    args: args.clone(),
438                })
439                .await?;
440            if !decision.allowed {
441                let reason = decision
442                    .reason
443                    .unwrap_or_else(|| "Tool denied by runtime policy".to_string());
444                let mut blocked_part = WireMessagePart::tool_result(
445                    session_id,
446                    message_id,
447                    tool.clone(),
448                    Some(args.clone()),
449                    json!(null),
450                );
451                blocked_part.state = Some("failed".to_string());
452                blocked_part.error = Some(reason.clone());
453                self.event_bus.publish(EngineEvent::new(
454                    "message.part.updated",
455                    json!({"part": blocked_part}),
456                ));
457                publish_tool_effect(
458                    None,
459                    ToolEffectLedgerPhase::Outcome,
460                    ToolEffectLedgerStatus::Blocked,
461                    &args,
462                    None,
463                    None,
464                    Some(&reason),
465                );
466                return Ok(Some(reason));
467            }
468        }
469        let mut tool_call_id: Option<String> = initial_tool_call_id;
470        if let Some(violation) = self
471            .workspace_sandbox_violation(session_id, &tool, &args)
472            .await
473        {
474            let mut blocked_part = WireMessagePart::tool_result(
475                session_id,
476                message_id,
477                tool.clone(),
478                Some(args.clone()),
479                json!(null),
480            );
481            blocked_part.state = Some("failed".to_string());
482            blocked_part.error = Some(violation.clone());
483            self.event_bus.publish(EngineEvent::new(
484                "message.part.updated",
485                json!({"part": blocked_part}),
486            ));
487            publish_tool_effect(
488                tool_call_id.as_deref(),
489                ToolEffectLedgerPhase::Outcome,
490                ToolEffectLedgerStatus::Blocked,
491                &args,
492                None,
493                None,
494                Some(&violation),
495            );
496            return Ok(Some(violation));
497        }
498        let rule = self
499            .plugins
500            .permission_override(&tool)
501            .await
502            .unwrap_or(self.permissions.evaluate(&tool, &tool).await);
503        if matches!(rule, PermissionAction::Deny) {
504            let reason = format!("Permission denied for tool `{tool}` by policy.");
505            publish_tool_effect(
506                tool_call_id.as_deref(),
507                ToolEffectLedgerPhase::Outcome,
508                ToolEffectLedgerStatus::Blocked,
509                &args,
510                None,
511                None,
512                Some(&reason),
513            );
514            return Ok(Some(reason));
515        }
516
517        let mut effective_args = args.clone();
518        if matches!(rule, PermissionAction::Ask) {
519            let auto_approve_permissions = self
520                .session_auto_approve_permissions
521                .read()
522                .await
523                .get(session_id)
524                .copied()
525                .unwrap_or(false);
526            if auto_approve_permissions {
527                // Governance audit: if args were recovered via heuristics and the tool is
528                // mutating, log a WARN so recovered writes are never silent in automation
529                // mode. Does not block — operators must opt out via TANDEM_AUTO_APPROVE_RECOVERED_ARGS=false
530                // if they want a hard block (reserved for strict automation policy).
531                if normalized.args_integrity == "recovered" && is_workspace_write_tool(&tool) {
532                    tracing::warn!(
533                        session_id = %session_id,
534                        message_id = %message_id,
535                        tool = %tool,
536                        args_source = %normalized.args_source,
537                        "auto-approve granted for mutating tool with recovered args; verify intent"
538                    );
539                    self.event_bus.publish(EngineEvent::new(
540                        "tool.args.recovered_write_auto_approved",
541                        json!({
542                            "sessionID": session_id,
543                            "messageID": message_id,
544                            "tool": tool,
545                            "argsSource": normalized.args_source,
546                            "argsIntegrity": normalized.args_integrity,
547                        }),
548                    ));
549                }
550                self.event_bus.publish(EngineEvent::new(
551                    "permission.auto_approved",
552                    json!({
553                        "sessionID": session_id,
554                        "messageID": message_id,
555                        "tool": tool,
556                    }),
557                ));
558                effective_args = args;
559            } else {
560                let pending = self
561                    .permissions
562                    .ask_for_session_with_context(
563                        Some(session_id),
564                        &tool,
565                        args.clone(),
566                        Some(crate::PermissionArgsContext {
567                            args_source: normalized.args_source.clone(),
568                            args_integrity: normalized.args_integrity.clone(),
569                            query: normalized.query.clone(),
570                        }),
571                    )
572                    .await;
573                let mut pending_part = WireMessagePart::tool_invocation(
574                    session_id,
575                    message_id,
576                    tool.clone(),
577                    args.clone(),
578                );
579                pending_part.id = Some(pending.id.clone());
580                tool_call_id = Some(pending.id.clone());
581                pending_part.state = Some("pending".to_string());
582                self.event_bus.publish(EngineEvent::new(
583                    "message.part.updated",
584                    json!({"part": pending_part}),
585                ));
586                let reply = self
587                    .permissions
588                    .wait_for_reply_with_timeout(
589                        &pending.id,
590                        cancel.clone(),
591                        Some(Duration::from_millis(permission_wait_timeout_ms() as u64)),
592                    )
593                    .await;
594                let (reply, timed_out) = reply;
595                if cancel.is_cancelled() {
596                    return Ok(None);
597                }
598                if timed_out {
599                    let timeout_ms = permission_wait_timeout_ms();
600                    self.event_bus.publish(EngineEvent::new(
601                        "permission.wait.timeout",
602                        json!({
603                            "sessionID": session_id,
604                            "messageID": message_id,
605                            "tool": tool,
606                            "requestID": pending.id,
607                            "timeoutMs": timeout_ms,
608                        }),
609                    ));
610                    let mut timeout_part = WireMessagePart::tool_result(
611                        session_id,
612                        message_id,
613                        tool.clone(),
614                        Some(args.clone()),
615                        json!(null),
616                    );
617                    timeout_part.id = Some(pending.id);
618                    timeout_part.state = Some("failed".to_string());
619                    timeout_part.error = Some(format!(
620                        "Permission request timed out after {} ms",
621                        timeout_ms
622                    ));
623                    self.event_bus.publish(EngineEvent::new(
624                        "message.part.updated",
625                        json!({"part": timeout_part}),
626                    ));
627                    let timeout_reason = format!(
628                        "Permission request for tool `{tool}` timed out after {timeout_ms} ms."
629                    );
630                    publish_tool_effect(
631                        tool_call_id.as_deref(),
632                        ToolEffectLedgerPhase::Outcome,
633                        ToolEffectLedgerStatus::Blocked,
634                        &args,
635                        None,
636                        None,
637                        Some(&timeout_reason),
638                    );
639                    return Ok(Some(format!(
640                        "Permission request for tool `{tool}` timed out after {timeout_ms} ms."
641                    )));
642                }
643                let approved = matches!(reply.as_deref(), Some("once" | "always" | "allow"));
644                if !approved {
645                    let mut denied_part = WireMessagePart::tool_result(
646                        session_id,
647                        message_id,
648                        tool.clone(),
649                        Some(args.clone()),
650                        json!(null),
651                    );
652                    denied_part.id = Some(pending.id);
653                    denied_part.state = Some("denied".to_string());
654                    denied_part.error = Some("Permission denied by user".to_string());
655                    self.event_bus.publish(EngineEvent::new(
656                        "message.part.updated",
657                        json!({"part": denied_part}),
658                    ));
659                    let denied_reason = format!("Permission denied for tool `{tool}` by user.");
660                    publish_tool_effect(
661                        tool_call_id.as_deref(),
662                        ToolEffectLedgerPhase::Outcome,
663                        ToolEffectLedgerStatus::Blocked,
664                        &args,
665                        None,
666                        None,
667                        Some(&denied_reason),
668                    );
669                    return Ok(Some(format!(
670                        "Permission denied for tool `{tool}` by user."
671                    )));
672                }
673                effective_args = args;
674            }
675        }
676
677        let mut args = self.plugins.inject_tool_args(&tool, effective_args).await;
678        let session = self.storage.get_session(session_id).await;
679        if let (Some(obj), Some(session)) = (args.as_object_mut(), session.as_ref()) {
680            obj.insert(
681                "__session_id".to_string(),
682                Value::String(session_id.to_string()),
683            );
684            if let Some(project_id) = session.project_id.clone() {
685                obj.insert(
686                    "__project_id".to_string(),
687                    Value::String(project_id.clone()),
688                );
689                if project_id.starts_with("channel-public::") {
690                    obj.insert(
691                        "__memory_max_visible_scope".to_string(),
692                        Value::String("project".to_string()),
693                    );
694                }
695            }
696        }
697        let tool_context = self.resolve_tool_execution_context(session_id).await;
698        if let Some((workspace_root, effective_cwd, project_id)) = tool_context.as_ref() {
699            args = rewrite_workspace_alias_tool_args(&tool, args, workspace_root);
700            if let Some(obj) = args.as_object_mut() {
701                obj.insert(
702                    "__workspace_root".to_string(),
703                    Value::String(workspace_root.clone()),
704                );
705                obj.insert(
706                    "__effective_cwd".to_string(),
707                    Value::String(effective_cwd.clone()),
708                );
709                obj.insert(
710                    "__session_id".to_string(),
711                    Value::String(session_id.to_string()),
712                );
713                if let Some(project_id) = project_id.clone() {
714                    obj.insert("__project_id".to_string(), Value::String(project_id));
715                }
716            }
717            tracing::info!(
718                "tool execution context session_id={} tool={} workspace_root={} effective_cwd={} project_id={}",
719                session_id,
720                tool,
721                workspace_root,
722                effective_cwd,
723                project_id.clone().unwrap_or_default()
724            );
725        }
726        let mut invoke_part =
727            WireMessagePart::tool_invocation(session_id, message_id, tool.clone(), args.clone());
728        if let Some(call_id) = tool_call_id.clone() {
729            invoke_part.id = Some(call_id);
730        }
731        let invoke_part_id = invoke_part.id.clone();
732        self.event_bus.publish(EngineEvent::new(
733            "message.part.updated",
734            json!({"part": invoke_part}),
735        ));
736        let args_for_side_events = args.clone();
737        let mutation_checkpoint = prepare_mutation_checkpoint(&tool, &args_for_side_events);
738        let progress_sink: SharedToolProgressSink = std::sync::Arc::new(EngineToolProgressSink {
739            event_bus: self.event_bus.clone(),
740            session_id: session_id.to_string(),
741            message_id: message_id.to_string(),
742            tool_call_id: invoke_part_id.clone(),
743            source_tool: tool.clone(),
744        });
745        publish_tool_effect(
746            invoke_part_id.as_deref(),
747            ToolEffectLedgerPhase::Invocation,
748            ToolEffectLedgerStatus::Started,
749            &args_for_side_events,
750            None,
751            None,
752            None,
753        );
754        let publish_mutation_checkpoint =
755            |tool_call_id: Option<&str>, outcome: MutationCheckpointOutcome| {
756                if let Some(baseline) = mutation_checkpoint.as_ref() {
757                    self.event_bus.publish(mutation_checkpoint_event(
758                        finalize_mutation_checkpoint_record(
759                            session_id,
760                            message_id,
761                            tool_call_id,
762                            baseline,
763                            outcome,
764                        ),
765                    ));
766                }
767            };
768        if tool == "spawn_agent" {
769            let hook = self.spawn_agent_hook.read().await.clone();
770            if let Some(hook) = hook {
771                let spawned = hook
772                    .spawn_agent(SpawnAgentToolContext {
773                        session_id: session_id.to_string(),
774                        message_id: message_id.to_string(),
775                        tool_call_id: invoke_part_id.clone(),
776                        args: args_for_side_events.clone(),
777                    })
778                    .await?;
779                let output = self.plugins.transform_tool_output(spawned.output).await;
780                let output = truncate_text(&output, 16_000);
781                emit_tool_side_events(
782                    self.storage.clone(),
783                    &self.event_bus,
784                    ToolSideEventContext {
785                        session_id,
786                        message_id,
787                        tool: &tool,
788                        args: &args_for_side_events,
789                        metadata: &spawned.metadata,
790                        workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
791                        effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
792                    },
793                )
794                .await;
795                let mut result_part = WireMessagePart::tool_result(
796                    session_id,
797                    message_id,
798                    tool.clone(),
799                    Some(args_for_side_events.clone()),
800                    json!(output.clone()),
801                );
802                result_part.id = invoke_part_id.clone();
803                self.event_bus.publish(EngineEvent::new(
804                    "message.part.updated",
805                    json!({"part": result_part}),
806                ));
807                publish_tool_effect(
808                    invoke_part_id.as_deref(),
809                    ToolEffectLedgerPhase::Outcome,
810                    ToolEffectLedgerStatus::Succeeded,
811                    &args_for_side_events,
812                    Some(&spawned.metadata),
813                    Some(&output),
814                    None,
815                );
816                publish_mutation_checkpoint(
817                    invoke_part_id.as_deref(),
818                    MutationCheckpointOutcome::Succeeded,
819                );
820                return Ok(Some(truncate_text(
821                    &format!("Tool `{tool}` result:\n{output}"),
822                    16_000,
823                )));
824            }
825            let output = "spawn_agent is unavailable in this runtime (no spawn hook installed).";
826            let mut failed_part = WireMessagePart::tool_result(
827                session_id,
828                message_id,
829                tool.clone(),
830                Some(args_for_side_events.clone()),
831                json!(null),
832            );
833            failed_part.id = invoke_part_id.clone();
834            failed_part.state = Some("failed".to_string());
835            failed_part.error = Some(output.to_string());
836            self.event_bus.publish(EngineEvent::new(
837                "message.part.updated",
838                json!({"part": failed_part}),
839            ));
840            publish_tool_effect(
841                invoke_part_id.as_deref(),
842                ToolEffectLedgerPhase::Outcome,
843                ToolEffectLedgerStatus::Failed,
844                &args_for_side_events,
845                None,
846                None,
847                Some(output),
848            );
849            publish_mutation_checkpoint(
850                invoke_part_id.as_deref(),
851                MutationCheckpointOutcome::Failed,
852            );
853            return Ok(Some(output.to_string()));
854        }
855        // Batch governance: validate sub-calls against engine policy and inject execution context
856        // before delegating to BatchTool. This ensures sub-calls cannot bypass permissions,
857        // sandbox checks, or allowed-tool lists, and that they receive the correct workspace
858        // context (__workspace_root, __effective_cwd, __session_id, __project_id).
859        //
860        // By this point `args` already has those keys injected (see context injection above).
861        if tool == "batch" {
862            let allowed_tools = self
863                .session_allowed_tools
864                .read()
865                .await
866                .get(session_id)
867                .cloned()
868                .unwrap_or_default();
869
870            // Extract parent execution context from already-injected batch args.
871            let ctx_workspace_root = args
872                .get("__workspace_root")
873                .and_then(|v| v.as_str())
874                .map(ToString::to_string);
875            let ctx_effective_cwd = args
876                .get("__effective_cwd")
877                .and_then(|v| v.as_str())
878                .map(ToString::to_string);
879            let ctx_session_id = args
880                .get("__session_id")
881                .and_then(|v| v.as_str())
882                .map(ToString::to_string);
883            let ctx_project_id = args
884                .get("__project_id")
885                .and_then(|v| v.as_str())
886                .map(ToString::to_string);
887
888            // Process each sub-call: check governance, inject context.
889            let raw_calls = args
890                .get("tool_calls")
891                .and_then(|v| v.as_array())
892                .cloned()
893                .unwrap_or_default();
894
895            let mut governed_calls: Vec<Value> = Vec::new();
896            for mut call in raw_calls {
897                let (sub_tool, mut sub_args) = {
898                    let obj = match call.as_object() {
899                        Some(o) => o,
900                        None => {
901                            governed_calls.push(call);
902                            continue;
903                        }
904                    };
905                    let tool_raw = non_empty_string_at(obj, "tool")
906                        .or_else(|| nested_non_empty_string_at(obj, "function", "name"))
907                        .or_else(|| nested_non_empty_string_at(obj, "tool", "name"))
908                        .or_else(|| non_empty_string_at(obj, "name"));
909                    let sub_tool = match tool_raw {
910                        Some(t) => normalize_tool_name(t),
911                        None => {
912                            governed_calls.push(call);
913                            continue;
914                        }
915                    };
916                    let sub_args = obj.get("args").cloned().unwrap_or_else(|| json!({}));
917                    (sub_tool, sub_args)
918                };
919
920                // 1. Allowed-tools check.
921                if !allowed_tools.is_empty() && !any_policy_matches(&allowed_tools, &sub_tool) {
922                    // Strip this sub-call: replace it with an explanatory result.
923                    if let Some(obj) = call.as_object_mut() {
924                        obj.insert(
925                            "_blocked".to_string(),
926                            Value::String(format!(
927                                "batch sub-call skipped: tool `{sub_tool}` is not in the allowed list for this run"
928                            )),
929                        );
930                    }
931                    governed_calls.push(call);
932                    continue;
933                }
934
935                // 2. Workspace sandbox check.
936                if let Some(violation) = self
937                    .workspace_sandbox_violation(session_id, &sub_tool, &sub_args)
938                    .await
939                {
940                    if let Some(obj) = call.as_object_mut() {
941                        obj.insert(
942                            "_blocked".to_string(),
943                            Value::String(format!("batch sub-call skipped: {violation}")),
944                        );
945                    }
946                    governed_calls.push(call);
947                    continue;
948                }
949
950                // 3. Inject parent execution context into sub-call args.
951                if let Some(sub_obj) = sub_args.as_object_mut() {
952                    if let Some(ref v) = ctx_workspace_root {
953                        sub_obj
954                            .entry("__workspace_root")
955                            .or_insert_with(|| Value::String(v.clone()));
956                    }
957                    if let Some(ref v) = ctx_effective_cwd {
958                        sub_obj
959                            .entry("__effective_cwd")
960                            .or_insert_with(|| Value::String(v.clone()));
961                    }
962                    if let Some(ref v) = ctx_session_id {
963                        sub_obj
964                            .entry("__session_id")
965                            .or_insert_with(|| Value::String(v.clone()));
966                    }
967                    if let Some(ref v) = ctx_project_id {
968                        sub_obj
969                            .entry("__project_id")
970                            .or_insert_with(|| Value::String(v.clone()));
971                    }
972                }
973
974                // Write enriched args back into the call object.
975                if let Some(obj) = call.as_object_mut() {
976                    obj.insert("args".to_string(), sub_args);
977                }
978                governed_calls.push(call);
979            }
980
981            // Rebuild batch args with the governed sub-calls.
982            if let Some(obj) = args.as_object_mut() {
983                obj.insert("tool_calls".to_string(), Value::Array(governed_calls));
984            }
985        }
986        let result = match self
987            .execute_tool_with_timeout(&tool, args, cancel.clone(), Some(progress_sink))
988            .await
989        {
990            Ok(result) => result,
991            Err(err) => {
992                let err_text = err.to_string();
993                if err_text.contains("TOOL_EXEC_TIMEOUT_MS_EXCEEDED(") {
994                    let timeout_ms = tool_exec_timeout_ms();
995                    let timeout_output = format!(
996                        "Tool `{tool}` timed out after {timeout_ms} ms. It was stopped to keep this run responsive."
997                    );
998                    let mut failed_part = WireMessagePart::tool_result(
999                        session_id,
1000                        message_id,
1001                        tool.clone(),
1002                        Some(args_for_side_events.clone()),
1003                        json!(null),
1004                    );
1005                    failed_part.id = invoke_part_id.clone();
1006                    failed_part.state = Some("failed".to_string());
1007                    failed_part.error = Some(timeout_output.clone());
1008                    self.event_bus.publish(EngineEvent::new(
1009                        "message.part.updated",
1010                        json!({"part": failed_part}),
1011                    ));
1012                    publish_tool_effect(
1013                        invoke_part_id.as_deref(),
1014                        ToolEffectLedgerPhase::Outcome,
1015                        ToolEffectLedgerStatus::Failed,
1016                        &args_for_side_events,
1017                        None,
1018                        None,
1019                        Some(&timeout_output),
1020                    );
1021                    publish_mutation_checkpoint(
1022                        invoke_part_id.as_deref(),
1023                        MutationCheckpointOutcome::Failed,
1024                    );
1025                    return Ok(Some(timeout_output));
1026                }
1027                if let Some(auth) = extract_mcp_auth_required_from_error_text(&tool, &err_text) {
1028                    self.event_bus.publish(EngineEvent::new(
1029                        "mcp.auth.required",
1030                        json!({
1031                            "sessionID": session_id,
1032                            "messageID": message_id,
1033                            "tool": tool.clone(),
1034                            "server": auth.server,
1035                            "authorizationUrl": auth.authorization_url,
1036                            "message": auth.message,
1037                            "challengeId": auth.challenge_id
1038                        }),
1039                    ));
1040                    let auth_output = format!(
1041                        "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
1042                        tool, auth.message, auth.authorization_url
1043                    );
1044                    let mut result_part = WireMessagePart::tool_result(
1045                        session_id,
1046                        message_id,
1047                        tool.clone(),
1048                        Some(args_for_side_events.clone()),
1049                        json!(auth_output.clone()),
1050                    );
1051                    result_part.id = invoke_part_id.clone();
1052                    self.event_bus.publish(EngineEvent::new(
1053                        "message.part.updated",
1054                        json!({"part": result_part}),
1055                    ));
1056                    publish_tool_effect(
1057                        invoke_part_id.as_deref(),
1058                        ToolEffectLedgerPhase::Outcome,
1059                        ToolEffectLedgerStatus::Blocked,
1060                        &args_for_side_events,
1061                        None,
1062                        Some(&auth_output),
1063                        Some(&auth.message),
1064                    );
1065                    publish_mutation_checkpoint(
1066                        invoke_part_id.as_deref(),
1067                        MutationCheckpointOutcome::Blocked,
1068                    );
1069                    return Ok(Some(truncate_text(
1070                        &format!("Tool `{tool}` result:\n{auth_output}"),
1071                        16_000,
1072                    )));
1073                }
1074                let mut failed_part = WireMessagePart::tool_result(
1075                    session_id,
1076                    message_id,
1077                    tool.clone(),
1078                    Some(args_for_side_events.clone()),
1079                    json!(null),
1080                );
1081                failed_part.id = invoke_part_id.clone();
1082                failed_part.state = Some("failed".to_string());
1083                failed_part.error = Some(err_text.clone());
1084                self.event_bus.publish(EngineEvent::new(
1085                    "message.part.updated",
1086                    json!({"part": failed_part}),
1087                ));
1088                publish_tool_effect(
1089                    invoke_part_id.as_deref(),
1090                    ToolEffectLedgerPhase::Outcome,
1091                    ToolEffectLedgerStatus::Failed,
1092                    &args_for_side_events,
1093                    None,
1094                    None,
1095                    Some(&err_text),
1096                );
1097                publish_mutation_checkpoint(
1098                    invoke_part_id.as_deref(),
1099                    MutationCheckpointOutcome::Failed,
1100                );
1101                return Err(err);
1102            }
1103        };
1104        if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
1105            let event_name = if auth.pending && auth.blocked {
1106                "mcp.auth.pending"
1107            } else {
1108                "mcp.auth.required"
1109            };
1110            self.event_bus.publish(EngineEvent::new(
1111                event_name,
1112                json!({
1113                    "sessionID": session_id,
1114                    "messageID": message_id,
1115                    "tool": tool.clone(),
1116                    "server": auth.server,
1117                    "authorizationUrl": auth.authorization_url,
1118                    "message": auth.message,
1119                    "challengeId": auth.challenge_id,
1120                    "pending": auth.pending,
1121                    "blocked": auth.blocked,
1122                    "retryAfterMs": auth.retry_after_ms
1123                }),
1124            ));
1125        }
1126        emit_tool_side_events(
1127            self.storage.clone(),
1128            &self.event_bus,
1129            ToolSideEventContext {
1130                session_id,
1131                message_id,
1132                tool: &tool,
1133                args: &args_for_side_events,
1134                metadata: &result.metadata,
1135                workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
1136                effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
1137            },
1138        )
1139        .await;
1140        let output = if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
1141            if auth.pending && auth.blocked {
1142                let retry_after_secs = auth.retry_after_ms.unwrap_or(0).div_ceil(1000);
1143                format!(
1144                    "Authorization pending for `{}`.\n{}\n\nAuthorize here: {}\nRetry after {}s.",
1145                    tool, auth.message, auth.authorization_url, retry_after_secs
1146                )
1147            } else {
1148                format!(
1149                    "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
1150                    tool, auth.message, auth.authorization_url
1151                )
1152            }
1153        } else {
1154            self.plugins.transform_tool_output(result.output).await
1155        };
1156        let output = truncate_text(&output, 16_000);
1157        let mut result_part = WireMessagePart::tool_result(
1158            session_id,
1159            message_id,
1160            tool.clone(),
1161            Some(args_for_side_events.clone()),
1162            json!(output.clone()),
1163        );
1164        result_part.id = invoke_part_id.clone();
1165        self.event_bus.publish(EngineEvent::new(
1166            "message.part.updated",
1167            json!({"part": result_part}),
1168        ));
1169        publish_tool_effect(
1170            invoke_part_id.as_deref(),
1171            ToolEffectLedgerPhase::Outcome,
1172            ToolEffectLedgerStatus::Succeeded,
1173            &args_for_side_events,
1174            Some(&result.metadata),
1175            Some(&output),
1176            None,
1177        );
1178        publish_mutation_checkpoint(
1179            invoke_part_id.as_deref(),
1180            MutationCheckpointOutcome::Succeeded,
1181        );
1182        Ok(Some(truncate_text(
1183            &format!("Tool `{tool}` result:\n{output}"),
1184            16_000,
1185        )))
1186    }
1187}
1188
1189#[cfg(test)]
1190mod tests;