Skip to main content

tandem_core/
engine_loop.rs

1use chrono::Utc;
2use futures::StreamExt;
3use serde_json::{json, Value};
4use std::collections::{hash_map::DefaultHasher, HashMap, HashSet};
5use std::hash::{Hash, Hasher};
6use std::path::{Path, PathBuf};
7use std::time::Duration;
8use tandem_observability::{emit_event, ObservabilityEvent, ProcessKind};
9use tandem_providers::{ChatMessage, ProviderRegistry, StreamChunk, TokenUsage};
10use tandem_tools::{validate_tool_schemas, ToolRegistry};
11use tandem_types::{
12    ContextMode, EngineEvent, HostRuntimeContext, Message, MessagePart, MessagePartInput,
13    MessageRole, ModelSpec, SendMessageRequest, SharedToolProgressSink, ToolMode, ToolSchema,
14};
15use tandem_wire::WireMessagePart;
16use tokio_util::sync::CancellationToken;
17use tracing::Level;
18
19mod loop_guards;
20mod loop_tuning;
21mod prewrite_gate;
22mod prewrite_mode;
23mod prompt_context;
24mod prompt_execution;
25mod prompt_helpers;
26mod prompt_runtime;
27mod tool_execution;
28mod tool_output;
29mod tool_parsing;
30mod types;
31mod write_targets;
32
33use loop_guards::{
34    duplicate_signature_limit_for, tool_budget_for, websearch_duplicate_signature_limit,
35};
36use loop_tuning::{
37    max_tool_iterations, permission_wait_timeout_ms, prompt_context_hook_timeout_ms,
38    provider_stream_connect_timeout_ms, provider_stream_decode_retry_attempts,
39    provider_stream_idle_timeout_ms, strict_write_retry_max_attempts, tool_exec_timeout_ms,
40};
41use prewrite_gate::{evaluate_prewrite_gate, PrewriteProgress};
42use prewrite_mode::*;
43use prompt_context::{
44    format_context_mode, mcp_catalog_in_system_prompt_enabled, semantic_tool_retrieval_enabled,
45    semantic_tool_retrieval_k, tandem_runtime_system_prompt,
46};
47use prompt_helpers::*;
48use prompt_runtime::*;
49use tool_output::*;
50use tool_parsing::*;
51use types::{EngineToolProgressSink, StreamedToolCall, WritePathRecoveryMode};
52
53pub use prewrite_mode::prewrite_repair_retry_max_attempts;
54pub use types::{
55    KnowledgebaseGroundingPolicy, PromptContextHook, PromptContextHookContext, SpawnAgentHook,
56    SpawnAgentToolContext, SpawnAgentToolResult, ToolPolicyContext, ToolPolicyDecision,
57    ToolPolicyHook,
58};
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum SessionWritePolicyMode {
62    ArtifactOnly,
63    ExplicitTargets,
64    RepoEdit,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct SessionWritePolicy {
69    pub mode: SessionWritePolicyMode,
70    pub allowed_paths: Vec<String>,
71    pub reason: String,
72}
73
74use crate::tool_router::{
75    classify_intent, default_mode_name, is_short_simple_prompt, select_tool_subset,
76    should_escalate_auto_tools, tool_router_enabled, ToolIntent, ToolRoutingDecision,
77};
78use crate::{
79    any_policy_matches, derive_session_title_from_prompt, title_needs_repair,
80    tool_name_matches_policy, AgentDefinition, AgentRegistry, CancellationRegistry, EventBus,
81    PermissionAction, PermissionManager, PluginRegistry, Storage,
82};
83use crate::{
84    build_tool_effect_ledger_record, finalize_mutation_checkpoint_record,
85    mutation_checkpoint_event, prepare_mutation_checkpoint, tool_effect_ledger_event,
86    MutationCheckpointOutcome, ToolEffectLedgerPhase, ToolEffectLedgerStatus,
87};
88use tokio::sync::RwLock;
89
90#[derive(Clone)]
91pub struct EngineLoop {
92    storage: std::sync::Arc<Storage>,
93    event_bus: EventBus,
94    providers: ProviderRegistry,
95    plugins: PluginRegistry,
96    agents: AgentRegistry,
97    permissions: PermissionManager,
98    tools: ToolRegistry,
99    cancellations: CancellationRegistry,
100    host_runtime_context: HostRuntimeContext,
101    workspace_overrides: std::sync::Arc<RwLock<HashMap<String, u64>>>,
102    session_allowed_tools: std::sync::Arc<RwLock<HashMap<String, Vec<String>>>>,
103    session_write_policies: std::sync::Arc<RwLock<HashMap<String, SessionWritePolicy>>>,
104    session_kb_grounding_policies:
105        std::sync::Arc<RwLock<HashMap<String, KnowledgebaseGroundingPolicy>>>,
106    session_auto_approve_permissions: std::sync::Arc<RwLock<HashMap<String, bool>>>,
107    spawn_agent_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn SpawnAgentHook>>>>,
108    tool_policy_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn ToolPolicyHook>>>>,
109    prompt_context_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn PromptContextHook>>>>,
110}
111
112impl EngineLoop {
113    #[allow(clippy::too_many_arguments)]
114    pub fn new(
115        storage: std::sync::Arc<Storage>,
116        event_bus: EventBus,
117        providers: ProviderRegistry,
118        plugins: PluginRegistry,
119        agents: AgentRegistry,
120        permissions: PermissionManager,
121        tools: ToolRegistry,
122        cancellations: CancellationRegistry,
123        host_runtime_context: HostRuntimeContext,
124    ) -> Self {
125        Self {
126            storage,
127            event_bus,
128            providers,
129            plugins,
130            agents,
131            permissions,
132            tools,
133            cancellations,
134            host_runtime_context,
135            workspace_overrides: std::sync::Arc::new(RwLock::new(HashMap::new())),
136            session_allowed_tools: std::sync::Arc::new(RwLock::new(HashMap::new())),
137            session_write_policies: std::sync::Arc::new(RwLock::new(HashMap::new())),
138            session_kb_grounding_policies: std::sync::Arc::new(RwLock::new(HashMap::new())),
139            session_auto_approve_permissions: std::sync::Arc::new(RwLock::new(HashMap::new())),
140            spawn_agent_hook: std::sync::Arc::new(RwLock::new(None)),
141            tool_policy_hook: std::sync::Arc::new(RwLock::new(None)),
142            prompt_context_hook: std::sync::Arc::new(RwLock::new(None)),
143        }
144    }
145
146    pub async fn set_spawn_agent_hook(&self, hook: std::sync::Arc<dyn SpawnAgentHook>) {
147        *self.spawn_agent_hook.write().await = Some(hook);
148    }
149
150    pub async fn set_tool_policy_hook(&self, hook: std::sync::Arc<dyn ToolPolicyHook>) {
151        *self.tool_policy_hook.write().await = Some(hook);
152    }
153
154    pub async fn set_prompt_context_hook(&self, hook: std::sync::Arc<dyn PromptContextHook>) {
155        *self.prompt_context_hook.write().await = Some(hook);
156    }
157
158    pub async fn set_session_allowed_tools(&self, session_id: &str, allowed_tools: Vec<String>) {
159        let normalized = allowed_tools
160            .into_iter()
161            .map(|tool| normalize_tool_name(&tool))
162            .filter(|tool| !tool.trim().is_empty())
163            .collect::<Vec<_>>();
164        self.session_allowed_tools
165            .write()
166            .await
167            .insert(session_id.to_string(), normalized);
168    }
169
170    pub async fn clear_session_allowed_tools(&self, session_id: &str) {
171        self.session_allowed_tools.write().await.remove(session_id);
172    }
173
174    pub async fn get_session_allowed_tools(&self, session_id: &str) -> Vec<String> {
175        self.session_allowed_tools
176            .read()
177            .await
178            .get(session_id)
179            .cloned()
180            .unwrap_or_default()
181    }
182
183    pub async fn set_session_write_policy(&self, session_id: &str, policy: SessionWritePolicy) {
184        let mut seen = HashSet::new();
185        let allowed_paths = policy
186            .allowed_paths
187            .into_iter()
188            .map(|path| path.trim().to_string())
189            .filter(|path| !path.is_empty())
190            .filter(|path| seen.insert(path.clone()))
191            .collect::<Vec<_>>();
192        self.session_write_policies.write().await.insert(
193            session_id.to_string(),
194            SessionWritePolicy {
195                mode: policy.mode,
196                allowed_paths,
197                reason: policy.reason,
198            },
199        );
200    }
201
202    pub async fn clear_session_write_policy(&self, session_id: &str) {
203        self.session_write_policies.write().await.remove(session_id);
204    }
205
206    pub async fn get_session_write_policy(&self, session_id: &str) -> Option<SessionWritePolicy> {
207        self.session_write_policies
208            .read()
209            .await
210            .get(session_id)
211            .cloned()
212    }
213
214    pub async fn set_session_kb_grounding_policy(
215        &self,
216        session_id: &str,
217        policy: KnowledgebaseGroundingPolicy,
218    ) {
219        let mut seen_servers = HashSet::new();
220        let server_names = policy
221            .server_names
222            .into_iter()
223            .map(|server| server.trim().to_ascii_lowercase())
224            .filter(|server| !server.is_empty())
225            .filter(|server| seen_servers.insert(server.clone()))
226            .collect::<Vec<_>>();
227        let mut seen_patterns = HashSet::new();
228        let tool_patterns = policy
229            .tool_patterns
230            .into_iter()
231            .map(|tool| normalize_tool_name(&tool))
232            .filter(|tool| !tool.trim().is_empty())
233            .filter(|tool| seen_patterns.insert(tool.clone()))
234            .collect::<Vec<_>>();
235        if !policy.required || tool_patterns.is_empty() {
236            self.clear_session_kb_grounding_policy(session_id).await;
237            return;
238        }
239        self.session_kb_grounding_policies.write().await.insert(
240            session_id.to_string(),
241            KnowledgebaseGroundingPolicy {
242                required: true,
243                strict: policy.strict,
244                server_names,
245                tool_patterns,
246            },
247        );
248    }
249
250    pub async fn clear_session_kb_grounding_policy(&self, session_id: &str) {
251        self.session_kb_grounding_policies
252            .write()
253            .await
254            .remove(session_id);
255    }
256
257    pub async fn get_session_kb_grounding_policy(
258        &self,
259        session_id: &str,
260    ) -> Option<KnowledgebaseGroundingPolicy> {
261        self.session_kb_grounding_policies
262            .read()
263            .await
264            .get(session_id)
265            .cloned()
266    }
267
268    pub async fn set_session_auto_approve_permissions(&self, session_id: &str, enabled: bool) {
269        if enabled {
270            self.session_auto_approve_permissions
271                .write()
272                .await
273                .insert(session_id.to_string(), true);
274        } else {
275            self.session_auto_approve_permissions
276                .write()
277                .await
278                .remove(session_id);
279        }
280    }
281
282    pub async fn clear_session_auto_approve_permissions(&self, session_id: &str) {
283        self.session_auto_approve_permissions
284            .write()
285            .await
286            .remove(session_id);
287    }
288
289    pub async fn grant_workspace_override_for_session(
290        &self,
291        session_id: &str,
292        ttl_seconds: u64,
293    ) -> u64 {
294        // Cap the override TTL to prevent indefinite sandbox bypass.
295        const MAX_WORKSPACE_OVERRIDE_TTL_SECONDS: u64 = 600; // 10 minutes
296        let capped_ttl = ttl_seconds.min(MAX_WORKSPACE_OVERRIDE_TTL_SECONDS);
297        if capped_ttl < ttl_seconds {
298            tracing::warn!(
299                session_id = %session_id,
300                requested_ttl_s = %ttl_seconds,
301                capped_ttl_s = %capped_ttl,
302                "workspace override TTL capped to maximum allowed value"
303            );
304        }
305        let expires_at = chrono::Utc::now()
306            .timestamp_millis()
307            .max(0)
308            .saturating_add((capped_ttl as i64).saturating_mul(1000))
309            as u64;
310        self.workspace_overrides
311            .write()
312            .await
313            .insert(session_id.to_string(), expires_at);
314        self.event_bus.publish(EngineEvent::new(
315            "workspace.override.activated",
316            json!({
317                "sessionID": session_id,
318                "requestedTtlSeconds": ttl_seconds,
319                "cappedTtlSeconds": capped_ttl,
320                "expiresAt": expires_at,
321            }),
322        ));
323        expires_at
324    }
325
326    pub async fn run_prompt_async(
327        &self,
328        session_id: String,
329        req: SendMessageRequest,
330    ) -> anyhow::Result<()> {
331        self.run_prompt_async_with_context(session_id, req, None)
332            .await
333    }
334
335    pub async fn run_oneshot(&self, prompt: String) -> anyhow::Result<String> {
336        self.providers.default_complete(&prompt).await
337    }
338
339    pub async fn run_oneshot_for_provider(
340        &self,
341        prompt: String,
342        provider_id: Option<&str>,
343    ) -> anyhow::Result<String> {
344        self.providers
345            .complete_for_provider(provider_id, &prompt, None)
346            .await
347    }
348
349    #[allow(clippy::too_many_arguments)]
350    async fn execute_tool_with_permission(
351        &self,
352        session_id: &str,
353        message_id: &str,
354        tool: String,
355        args: Value,
356        initial_tool_call_id: Option<String>,
357        equipped_skills: Option<&[String]>,
358        latest_user_text: &str,
359        write_required: bool,
360        latest_assistant_context: Option<&str>,
361        cancel: CancellationToken,
362    ) -> anyhow::Result<Option<String>> {
363        let tool = normalize_tool_name(&tool);
364        let raw_args = args.clone();
365        let publish_tool_effect = |tool_call_id: Option<&str>,
366                                   phase: ToolEffectLedgerPhase,
367                                   status: ToolEffectLedgerStatus,
368                                   args: &Value,
369                                   metadata: Option<&Value>,
370                                   output: Option<&str>,
371                                   error: Option<&str>| {
372            self.event_bus
373                .publish(tool_effect_ledger_event(build_tool_effect_ledger_record(
374                    session_id,
375                    message_id,
376                    tool_call_id,
377                    &tool,
378                    phase,
379                    status,
380                    args,
381                    metadata,
382                    output,
383                    error,
384                )));
385        };
386        let normalized = normalize_tool_args_with_mode(
387            &tool,
388            args,
389            latest_user_text,
390            latest_assistant_context.unwrap_or_default(),
391            if write_required {
392                WritePathRecoveryMode::OutputTargetOnly
393            } else {
394                WritePathRecoveryMode::Heuristic
395            },
396        );
397        let raw_args_preview = truncate_text(&raw_args.to_string(), 2_000);
398        let normalized_args_preview = truncate_text(&normalized.args.to_string(), 2_000);
399        self.event_bus.publish(EngineEvent::new(
400            "tool.args.normalized",
401            json!({
402                "sessionID": session_id,
403                "messageID": message_id,
404                "tool": tool,
405                "argsSource": normalized.args_source,
406                "argsIntegrity": normalized.args_integrity,
407                "rawArgsState": normalized.raw_args_state.as_str(),
408                "rawArgsPreview": raw_args_preview,
409                "normalizedArgsPreview": normalized_args_preview,
410                "query": normalized.query,
411                "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
412                "requestID": Value::Null
413            }),
414        ));
415        if normalized.args_integrity == "recovered" {
416            self.event_bus.publish(EngineEvent::new(
417                "tool.args.recovered",
418                json!({
419                    "sessionID": session_id,
420                    "messageID": message_id,
421                    "tool": tool,
422                    "argsSource": normalized.args_source,
423                    "rawArgsPreview": raw_args_preview,
424                    "normalizedArgsPreview": normalized_args_preview,
425                    "query": normalized.query,
426                    "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
427                    "requestID": Value::Null
428                }),
429            ));
430        }
431        if normalized.missing_terminal {
432            let missing_reason = normalized
433                .missing_terminal_reason
434                .clone()
435                .unwrap_or_else(|| "TOOL_ARGUMENTS_MISSING".to_string());
436            let latest_user_preview = truncate_text(latest_user_text, 500);
437            let latest_assistant_preview =
438                truncate_text(latest_assistant_context.unwrap_or_default(), 500);
439            self.event_bus.publish(EngineEvent::new(
440                "tool.args.missing_terminal",
441                json!({
442                    "sessionID": session_id,
443                    "messageID": message_id,
444                    "tool": tool,
445                    "argsSource": normalized.args_source,
446                    "argsIntegrity": normalized.args_integrity,
447                    "rawArgsState": normalized.raw_args_state.as_str(),
448                    "requestID": Value::Null,
449                    "error": missing_reason,
450                    "rawArgsPreview": raw_args_preview,
451                    "normalizedArgsPreview": normalized_args_preview,
452                    "latestUserPreview": latest_user_preview,
453                    "latestAssistantPreview": latest_assistant_preview,
454                }),
455            ));
456            if tool == "write" {
457                tracing::warn!(
458                    session_id = %session_id,
459                    message_id = %message_id,
460                    tool = %tool,
461                    reason = %missing_reason,
462                    args_source = %normalized.args_source,
463                    args_integrity = %normalized.args_integrity,
464                    raw_args_state = %normalized.raw_args_state.as_str(),
465                    raw_args = %raw_args_preview,
466                    normalized_args = %normalized_args_preview,
467                    latest_user = %latest_user_preview,
468                    latest_assistant = %latest_assistant_preview,
469                    "write tool arguments missing terminal field"
470                );
471            }
472            let best_effort_args = persisted_failed_tool_args(&raw_args, &normalized.args);
473            let mut failed_part = WireMessagePart::tool_result(
474                session_id,
475                message_id,
476                tool.clone(),
477                Some(best_effort_args),
478                json!(null),
479            );
480            failed_part.state = Some("failed".to_string());
481            let surfaced_reason =
482                provider_specific_write_reason(&tool, &missing_reason, normalized.raw_args_state)
483                    .unwrap_or_else(|| missing_reason.clone());
484            failed_part.error = Some(surfaced_reason.clone());
485            self.event_bus.publish(EngineEvent::new(
486                "message.part.updated",
487                json!({"part": failed_part}),
488            ));
489            publish_tool_effect(
490                None,
491                ToolEffectLedgerPhase::Outcome,
492                ToolEffectLedgerStatus::Blocked,
493                &normalized.args,
494                None,
495                None,
496                Some(&surfaced_reason),
497            );
498            return Ok(Some(surfaced_reason));
499        }
500
501        let args = match enforce_skill_scope(&tool, normalized.args, equipped_skills) {
502            Ok(args) => args,
503            Err(message) => {
504                publish_tool_effect(
505                    None,
506                    ToolEffectLedgerPhase::Outcome,
507                    ToolEffectLedgerStatus::Blocked,
508                    &raw_args,
509                    None,
510                    None,
511                    Some(&message),
512                );
513                return Ok(Some(message));
514            }
515        };
516        if let Some(allowed_tools) = self
517            .session_allowed_tools
518            .read()
519            .await
520            .get(session_id)
521            .cloned()
522        {
523            if !allowed_tools.is_empty() && !any_policy_matches(&allowed_tools, &tool) {
524                let reason = format!("Tool `{tool}` is not allowed for this run.");
525                publish_tool_effect(
526                    None,
527                    ToolEffectLedgerPhase::Outcome,
528                    ToolEffectLedgerStatus::Blocked,
529                    &args,
530                    None,
531                    None,
532                    Some(&reason),
533                );
534                return Ok(Some(reason));
535            }
536        }
537        if let Some(hook) = self.tool_policy_hook.read().await.clone() {
538            let decision = hook
539                .evaluate_tool(ToolPolicyContext {
540                    session_id: session_id.to_string(),
541                    message_id: message_id.to_string(),
542                    tool: tool.clone(),
543                    args: args.clone(),
544                })
545                .await?;
546            if !decision.allowed {
547                let reason = decision
548                    .reason
549                    .unwrap_or_else(|| "Tool denied by runtime policy".to_string());
550                let mut blocked_part = WireMessagePart::tool_result(
551                    session_id,
552                    message_id,
553                    tool.clone(),
554                    Some(args.clone()),
555                    json!(null),
556                );
557                blocked_part.state = Some("failed".to_string());
558                blocked_part.error = Some(reason.clone());
559                self.event_bus.publish(EngineEvent::new(
560                    "message.part.updated",
561                    json!({"part": blocked_part}),
562                ));
563                publish_tool_effect(
564                    None,
565                    ToolEffectLedgerPhase::Outcome,
566                    ToolEffectLedgerStatus::Blocked,
567                    &args,
568                    None,
569                    None,
570                    Some(&reason),
571                );
572                return Ok(Some(reason));
573            }
574        }
575        let mut tool_call_id: Option<String> = initial_tool_call_id;
576        if let Some(violation) = self
577            .session_write_policy_violation(session_id, &tool, &args)
578            .await
579        {
580            let mut blocked_part = WireMessagePart::tool_result(
581                session_id,
582                message_id,
583                tool.clone(),
584                Some(args.clone()),
585                json!(null),
586            );
587            blocked_part.state = Some("failed".to_string());
588            blocked_part.error = Some(violation.clone());
589            self.event_bus.publish(EngineEvent::new(
590                "message.part.updated",
591                json!({"part": blocked_part}),
592            ));
593            self.event_bus.publish(EngineEvent::new(
594                "tool.call.rejected_write_policy",
595                json!({
596                    "sessionID": session_id,
597                    "messageID": message_id,
598                    "tool": tool,
599                    "error": violation.clone(),
600                }),
601            ));
602            publish_tool_effect(
603                tool_call_id.as_deref(),
604                ToolEffectLedgerPhase::Outcome,
605                ToolEffectLedgerStatus::Blocked,
606                &args,
607                None,
608                None,
609                Some(&violation),
610            );
611            return Ok(Some(violation));
612        }
613        if let Some(violation) = self
614            .workspace_sandbox_violation(session_id, &tool, &args)
615            .await
616        {
617            let mut blocked_part = WireMessagePart::tool_result(
618                session_id,
619                message_id,
620                tool.clone(),
621                Some(args.clone()),
622                json!(null),
623            );
624            blocked_part.state = Some("failed".to_string());
625            blocked_part.error = Some(violation.clone());
626            self.event_bus.publish(EngineEvent::new(
627                "message.part.updated",
628                json!({"part": blocked_part}),
629            ));
630            publish_tool_effect(
631                tool_call_id.as_deref(),
632                ToolEffectLedgerPhase::Outcome,
633                ToolEffectLedgerStatus::Blocked,
634                &args,
635                None,
636                None,
637                Some(&violation),
638            );
639            return Ok(Some(violation));
640        }
641        let rule = self
642            .plugins
643            .permission_override(&tool)
644            .await
645            .unwrap_or(self.permissions.evaluate(&tool, &tool).await);
646        if matches!(rule, PermissionAction::Deny) {
647            let reason = format!("Permission denied for tool `{tool}` by policy.");
648            publish_tool_effect(
649                tool_call_id.as_deref(),
650                ToolEffectLedgerPhase::Outcome,
651                ToolEffectLedgerStatus::Blocked,
652                &args,
653                None,
654                None,
655                Some(&reason),
656            );
657            return Ok(Some(reason));
658        }
659
660        let mut effective_args = args.clone();
661        if matches!(rule, PermissionAction::Ask) {
662            let auto_approve_permissions = self
663                .session_auto_approve_permissions
664                .read()
665                .await
666                .get(session_id)
667                .copied()
668                .unwrap_or(false);
669            if auto_approve_permissions {
670                // Governance audit: if args were recovered via heuristics and the tool is
671                // mutating, log a WARN so recovered writes are never silent in automation
672                // mode. Does not block — operators must opt out via TANDEM_AUTO_APPROVE_RECOVERED_ARGS=false
673                // if they want a hard block (reserved for strict automation policy).
674                if normalized.args_integrity == "recovered" && is_workspace_write_tool(&tool) {
675                    tracing::warn!(
676                        session_id = %session_id,
677                        message_id = %message_id,
678                        tool = %tool,
679                        args_source = %normalized.args_source,
680                        "auto-approve granted for mutating tool with recovered args; verify intent"
681                    );
682                    self.event_bus.publish(EngineEvent::new(
683                        "tool.args.recovered_write_auto_approved",
684                        json!({
685                            "sessionID": session_id,
686                            "messageID": message_id,
687                            "tool": tool,
688                            "argsSource": normalized.args_source,
689                            "argsIntegrity": normalized.args_integrity,
690                        }),
691                    ));
692                }
693                self.event_bus.publish(EngineEvent::new(
694                    "permission.auto_approved",
695                    json!({
696                        "sessionID": session_id,
697                        "messageID": message_id,
698                        "tool": tool,
699                    }),
700                ));
701                effective_args = args;
702            } else {
703                let pending = self
704                    .permissions
705                    .ask_for_session_with_context(
706                        Some(session_id),
707                        &tool,
708                        args.clone(),
709                        Some(crate::PermissionArgsContext {
710                            args_source: normalized.args_source.clone(),
711                            args_integrity: normalized.args_integrity.clone(),
712                            query: normalized.query.clone(),
713                        }),
714                    )
715                    .await;
716                let mut pending_part = WireMessagePart::tool_invocation(
717                    session_id,
718                    message_id,
719                    tool.clone(),
720                    args.clone(),
721                );
722                pending_part.id = Some(pending.id.clone());
723                tool_call_id = Some(pending.id.clone());
724                pending_part.state = Some("pending".to_string());
725                self.event_bus.publish(EngineEvent::new(
726                    "message.part.updated",
727                    json!({"part": pending_part}),
728                ));
729                let reply = self
730                    .permissions
731                    .wait_for_reply_with_timeout(
732                        &pending.id,
733                        cancel.clone(),
734                        Some(Duration::from_millis(permission_wait_timeout_ms() as u64)),
735                    )
736                    .await;
737                let (reply, timed_out) = reply;
738                if cancel.is_cancelled() {
739                    return Ok(None);
740                }
741                if timed_out {
742                    let timeout_ms = permission_wait_timeout_ms();
743                    self.event_bus.publish(EngineEvent::new(
744                        "permission.wait.timeout",
745                        json!({
746                            "sessionID": session_id,
747                            "messageID": message_id,
748                            "tool": tool,
749                            "requestID": pending.id,
750                            "timeoutMs": timeout_ms,
751                        }),
752                    ));
753                    let mut timeout_part = WireMessagePart::tool_result(
754                        session_id,
755                        message_id,
756                        tool.clone(),
757                        Some(args.clone()),
758                        json!(null),
759                    );
760                    timeout_part.id = Some(pending.id);
761                    timeout_part.state = Some("failed".to_string());
762                    timeout_part.error = Some(format!(
763                        "Permission request timed out after {} ms",
764                        timeout_ms
765                    ));
766                    self.event_bus.publish(EngineEvent::new(
767                        "message.part.updated",
768                        json!({"part": timeout_part}),
769                    ));
770                    let timeout_reason = format!(
771                        "Permission request for tool `{tool}` timed out after {timeout_ms} ms."
772                    );
773                    publish_tool_effect(
774                        tool_call_id.as_deref(),
775                        ToolEffectLedgerPhase::Outcome,
776                        ToolEffectLedgerStatus::Blocked,
777                        &args,
778                        None,
779                        None,
780                        Some(&timeout_reason),
781                    );
782                    return Ok(Some(format!(
783                        "Permission request for tool `{tool}` timed out after {timeout_ms} ms."
784                    )));
785                }
786                let approved = matches!(reply.as_deref(), Some("once" | "always" | "allow"));
787                if !approved {
788                    let mut denied_part = WireMessagePart::tool_result(
789                        session_id,
790                        message_id,
791                        tool.clone(),
792                        Some(args.clone()),
793                        json!(null),
794                    );
795                    denied_part.id = Some(pending.id);
796                    denied_part.state = Some("denied".to_string());
797                    denied_part.error = Some("Permission denied by user".to_string());
798                    self.event_bus.publish(EngineEvent::new(
799                        "message.part.updated",
800                        json!({"part": denied_part}),
801                    ));
802                    let denied_reason = format!("Permission denied for tool `{tool}` by user.");
803                    publish_tool_effect(
804                        tool_call_id.as_deref(),
805                        ToolEffectLedgerPhase::Outcome,
806                        ToolEffectLedgerStatus::Blocked,
807                        &args,
808                        None,
809                        None,
810                        Some(&denied_reason),
811                    );
812                    return Ok(Some(format!(
813                        "Permission denied for tool `{tool}` by user."
814                    )));
815                }
816                effective_args = args;
817            }
818        }
819
820        let mut args = self.plugins.inject_tool_args(&tool, effective_args).await;
821        let session = self.storage.get_session(session_id).await;
822        if let (Some(obj), Some(session)) = (args.as_object_mut(), session.as_ref()) {
823            obj.insert(
824                "__session_id".to_string(),
825                Value::String(session_id.to_string()),
826            );
827            if let Some(project_id) = session.project_id.clone() {
828                obj.insert(
829                    "__project_id".to_string(),
830                    Value::String(project_id.clone()),
831                );
832                if project_id.starts_with("channel-public::") {
833                    obj.insert(
834                        "__memory_max_visible_scope".to_string(),
835                        Value::String("project".to_string()),
836                    );
837                }
838            }
839        }
840        let tool_context = self.resolve_tool_execution_context(session_id).await;
841        if let Some((workspace_root, effective_cwd, project_id)) = tool_context.as_ref() {
842            args = rewrite_workspace_alias_tool_args(&tool, args, workspace_root);
843            if let Some(obj) = args.as_object_mut() {
844                obj.insert(
845                    "__workspace_root".to_string(),
846                    Value::String(workspace_root.clone()),
847                );
848                obj.insert(
849                    "__effective_cwd".to_string(),
850                    Value::String(effective_cwd.clone()),
851                );
852                obj.insert(
853                    "__session_id".to_string(),
854                    Value::String(session_id.to_string()),
855                );
856                if let Some(project_id) = project_id.clone() {
857                    obj.insert("__project_id".to_string(), Value::String(project_id));
858                }
859            }
860            tracing::info!(
861                "tool execution context session_id={} tool={} workspace_root={} effective_cwd={} project_id={}",
862                session_id,
863                tool,
864                workspace_root,
865                effective_cwd,
866                project_id.clone().unwrap_or_default()
867            );
868        }
869        let mut invoke_part =
870            WireMessagePart::tool_invocation(session_id, message_id, tool.clone(), args.clone());
871        if let Some(call_id) = tool_call_id.clone() {
872            invoke_part.id = Some(call_id);
873        }
874        let invoke_part_id = invoke_part.id.clone();
875        self.event_bus.publish(EngineEvent::new(
876            "message.part.updated",
877            json!({"part": invoke_part}),
878        ));
879        let args_for_side_events = args.clone();
880        let mutation_checkpoint = prepare_mutation_checkpoint(&tool, &args_for_side_events);
881        let progress_sink: SharedToolProgressSink = std::sync::Arc::new(EngineToolProgressSink {
882            event_bus: self.event_bus.clone(),
883            session_id: session_id.to_string(),
884            message_id: message_id.to_string(),
885            tool_call_id: invoke_part_id.clone(),
886            source_tool: tool.clone(),
887        });
888        publish_tool_effect(
889            invoke_part_id.as_deref(),
890            ToolEffectLedgerPhase::Invocation,
891            ToolEffectLedgerStatus::Started,
892            &args_for_side_events,
893            None,
894            None,
895            None,
896        );
897        let publish_mutation_checkpoint =
898            |tool_call_id: Option<&str>, outcome: MutationCheckpointOutcome| {
899                if let Some(baseline) = mutation_checkpoint.as_ref() {
900                    self.event_bus.publish(mutation_checkpoint_event(
901                        finalize_mutation_checkpoint_record(
902                            session_id,
903                            message_id,
904                            tool_call_id,
905                            baseline,
906                            outcome,
907                        ),
908                    ));
909                }
910            };
911        if tool == "spawn_agent" {
912            let hook = self.spawn_agent_hook.read().await.clone();
913            if let Some(hook) = hook {
914                let spawned = hook
915                    .spawn_agent(SpawnAgentToolContext {
916                        session_id: session_id.to_string(),
917                        message_id: message_id.to_string(),
918                        tool_call_id: invoke_part_id.clone(),
919                        args: args_for_side_events.clone(),
920                    })
921                    .await?;
922                let output = self.plugins.transform_tool_output(spawned.output).await;
923                let output = truncate_text(&output, 16_000);
924                emit_tool_side_events(
925                    self.storage.clone(),
926                    &self.event_bus,
927                    ToolSideEventContext {
928                        session_id,
929                        message_id,
930                        tool: &tool,
931                        args: &args_for_side_events,
932                        metadata: &spawned.metadata,
933                        workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
934                        effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
935                    },
936                )
937                .await;
938                let mut result_part = WireMessagePart::tool_result(
939                    session_id,
940                    message_id,
941                    tool.clone(),
942                    Some(args_for_side_events.clone()),
943                    json!(output.clone()),
944                );
945                result_part.id = invoke_part_id.clone();
946                self.event_bus.publish(EngineEvent::new(
947                    "message.part.updated",
948                    json!({"part": result_part}),
949                ));
950                publish_tool_effect(
951                    invoke_part_id.as_deref(),
952                    ToolEffectLedgerPhase::Outcome,
953                    ToolEffectLedgerStatus::Succeeded,
954                    &args_for_side_events,
955                    Some(&spawned.metadata),
956                    Some(&output),
957                    None,
958                );
959                publish_mutation_checkpoint(
960                    invoke_part_id.as_deref(),
961                    MutationCheckpointOutcome::Succeeded,
962                );
963                return Ok(Some(truncate_text(
964                    &format!("Tool `{tool}` result:\n{output}"),
965                    16_000,
966                )));
967            }
968            let output = "spawn_agent is unavailable in this runtime (no spawn hook installed).";
969            let mut failed_part = WireMessagePart::tool_result(
970                session_id,
971                message_id,
972                tool.clone(),
973                Some(args_for_side_events.clone()),
974                json!(null),
975            );
976            failed_part.id = invoke_part_id.clone();
977            failed_part.state = Some("failed".to_string());
978            failed_part.error = Some(output.to_string());
979            self.event_bus.publish(EngineEvent::new(
980                "message.part.updated",
981                json!({"part": failed_part}),
982            ));
983            publish_tool_effect(
984                invoke_part_id.as_deref(),
985                ToolEffectLedgerPhase::Outcome,
986                ToolEffectLedgerStatus::Failed,
987                &args_for_side_events,
988                None,
989                None,
990                Some(output),
991            );
992            publish_mutation_checkpoint(
993                invoke_part_id.as_deref(),
994                MutationCheckpointOutcome::Failed,
995            );
996            return Ok(Some(output.to_string()));
997        }
998        // Batch governance: validate sub-calls against engine policy and inject execution context
999        // before delegating to BatchTool. This ensures sub-calls cannot bypass permissions,
1000        // sandbox checks, or allowed-tool lists, and that they receive the correct workspace
1001        // context (__workspace_root, __effective_cwd, __session_id, __project_id).
1002        //
1003        // By this point `args` already has those keys injected (see context injection above).
1004        if tool == "batch" {
1005            let allowed_tools = self
1006                .session_allowed_tools
1007                .read()
1008                .await
1009                .get(session_id)
1010                .cloned()
1011                .unwrap_or_default();
1012
1013            // Extract parent execution context from already-injected batch args.
1014            let ctx_workspace_root = args
1015                .get("__workspace_root")
1016                .and_then(|v| v.as_str())
1017                .map(ToString::to_string);
1018            let ctx_effective_cwd = args
1019                .get("__effective_cwd")
1020                .and_then(|v| v.as_str())
1021                .map(ToString::to_string);
1022            let ctx_session_id = args
1023                .get("__session_id")
1024                .and_then(|v| v.as_str())
1025                .map(ToString::to_string);
1026            let ctx_project_id = args
1027                .get("__project_id")
1028                .and_then(|v| v.as_str())
1029                .map(ToString::to_string);
1030
1031            // Process each sub-call: check governance, inject context.
1032            let raw_calls = args
1033                .get("tool_calls")
1034                .and_then(|v| v.as_array())
1035                .cloned()
1036                .unwrap_or_default();
1037
1038            let mut governed_calls: Vec<Value> = Vec::new();
1039            for mut call in raw_calls {
1040                let (sub_tool, mut sub_args) = {
1041                    let obj = match call.as_object() {
1042                        Some(o) => o,
1043                        None => {
1044                            governed_calls.push(call);
1045                            continue;
1046                        }
1047                    };
1048                    let tool_raw = non_empty_string_at(obj, "tool")
1049                        .or_else(|| nested_non_empty_string_at(obj, "function", "name"))
1050                        .or_else(|| nested_non_empty_string_at(obj, "tool", "name"))
1051                        .or_else(|| non_empty_string_at(obj, "name"));
1052                    let sub_tool = match tool_raw {
1053                        Some(t) => normalize_tool_name(t),
1054                        None => {
1055                            governed_calls.push(call);
1056                            continue;
1057                        }
1058                    };
1059                    let sub_args = obj.get("args").cloned().unwrap_or_else(|| json!({}));
1060                    (sub_tool, sub_args)
1061                };
1062
1063                // 1. Allowed-tools check.
1064                if !allowed_tools.is_empty() && !any_policy_matches(&allowed_tools, &sub_tool) {
1065                    // Strip this sub-call: replace it with an explanatory result.
1066                    if let Some(obj) = call.as_object_mut() {
1067                        obj.insert(
1068                            "_blocked".to_string(),
1069                            Value::String(format!(
1070                                "batch sub-call skipped: tool `{sub_tool}` is not in the allowed list for this run"
1071                            )),
1072                        );
1073                    }
1074                    governed_calls.push(call);
1075                    continue;
1076                }
1077
1078                // 2. Session write policy check.
1079                if let Some(violation) = self
1080                    .session_write_policy_violation(session_id, &sub_tool, &sub_args)
1081                    .await
1082                {
1083                    if let Some(obj) = call.as_object_mut() {
1084                        obj.insert(
1085                            "_blocked".to_string(),
1086                            Value::String(format!("batch sub-call skipped: {violation}")),
1087                        );
1088                    }
1089                    governed_calls.push(call);
1090                    continue;
1091                }
1092
1093                // 3. Workspace sandbox check.
1094                if let Some(violation) = self
1095                    .workspace_sandbox_violation(session_id, &sub_tool, &sub_args)
1096                    .await
1097                {
1098                    if let Some(obj) = call.as_object_mut() {
1099                        obj.insert(
1100                            "_blocked".to_string(),
1101                            Value::String(format!("batch sub-call skipped: {violation}")),
1102                        );
1103                    }
1104                    governed_calls.push(call);
1105                    continue;
1106                }
1107
1108                // 4. Inject parent execution context into sub-call args.
1109                if let Some(sub_obj) = sub_args.as_object_mut() {
1110                    if let Some(ref v) = ctx_workspace_root {
1111                        sub_obj
1112                            .entry("__workspace_root")
1113                            .or_insert_with(|| Value::String(v.clone()));
1114                    }
1115                    if let Some(ref v) = ctx_effective_cwd {
1116                        sub_obj
1117                            .entry("__effective_cwd")
1118                            .or_insert_with(|| Value::String(v.clone()));
1119                    }
1120                    if let Some(ref v) = ctx_session_id {
1121                        sub_obj
1122                            .entry("__session_id")
1123                            .or_insert_with(|| Value::String(v.clone()));
1124                    }
1125                    if let Some(ref v) = ctx_project_id {
1126                        sub_obj
1127                            .entry("__project_id")
1128                            .or_insert_with(|| Value::String(v.clone()));
1129                    }
1130                }
1131
1132                // Write enriched args back into the call object.
1133                if let Some(obj) = call.as_object_mut() {
1134                    obj.insert("args".to_string(), sub_args);
1135                }
1136                governed_calls.push(call);
1137            }
1138
1139            // Rebuild batch args with the governed sub-calls.
1140            if let Some(obj) = args.as_object_mut() {
1141                obj.insert("tool_calls".to_string(), Value::Array(governed_calls));
1142            }
1143        }
1144        let result = match self
1145            .execute_tool_with_timeout(&tool, args, cancel.clone(), Some(progress_sink))
1146            .await
1147        {
1148            Ok(result) => result,
1149            Err(err) => {
1150                let err_text = err.to_string();
1151                if err_text.contains("TOOL_EXEC_TIMEOUT_MS_EXCEEDED(") {
1152                    let timeout_ms = tool_exec_timeout_ms();
1153                    let timeout_output = format!(
1154                        "Tool `{tool}` timed out after {timeout_ms} ms. It was stopped to keep this run responsive."
1155                    );
1156                    let mut failed_part = WireMessagePart::tool_result(
1157                        session_id,
1158                        message_id,
1159                        tool.clone(),
1160                        Some(args_for_side_events.clone()),
1161                        json!(null),
1162                    );
1163                    failed_part.id = invoke_part_id.clone();
1164                    failed_part.state = Some("failed".to_string());
1165                    failed_part.error = Some(timeout_output.clone());
1166                    self.event_bus.publish(EngineEvent::new(
1167                        "message.part.updated",
1168                        json!({"part": failed_part}),
1169                    ));
1170                    publish_tool_effect(
1171                        invoke_part_id.as_deref(),
1172                        ToolEffectLedgerPhase::Outcome,
1173                        ToolEffectLedgerStatus::Failed,
1174                        &args_for_side_events,
1175                        None,
1176                        None,
1177                        Some(&timeout_output),
1178                    );
1179                    publish_mutation_checkpoint(
1180                        invoke_part_id.as_deref(),
1181                        MutationCheckpointOutcome::Failed,
1182                    );
1183                    return Ok(Some(timeout_output));
1184                }
1185                if let Some(auth) = extract_mcp_auth_required_from_error_text(&tool, &err_text) {
1186                    self.event_bus.publish(EngineEvent::new(
1187                        "mcp.auth.required",
1188                        json!({
1189                            "sessionID": session_id,
1190                            "messageID": message_id,
1191                            "tool": tool.clone(),
1192                            "server": auth.server,
1193                            "authorizationUrl": auth.authorization_url,
1194                            "message": auth.message,
1195                            "challengeId": auth.challenge_id
1196                        }),
1197                    ));
1198                    let auth_output = format!(
1199                        "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
1200                        tool, auth.message, auth.authorization_url
1201                    );
1202                    let mut result_part = WireMessagePart::tool_result(
1203                        session_id,
1204                        message_id,
1205                        tool.clone(),
1206                        Some(args_for_side_events.clone()),
1207                        json!(auth_output.clone()),
1208                    );
1209                    result_part.id = invoke_part_id.clone();
1210                    self.event_bus.publish(EngineEvent::new(
1211                        "message.part.updated",
1212                        json!({"part": result_part}),
1213                    ));
1214                    publish_tool_effect(
1215                        invoke_part_id.as_deref(),
1216                        ToolEffectLedgerPhase::Outcome,
1217                        ToolEffectLedgerStatus::Blocked,
1218                        &args_for_side_events,
1219                        None,
1220                        Some(&auth_output),
1221                        Some(&auth.message),
1222                    );
1223                    publish_mutation_checkpoint(
1224                        invoke_part_id.as_deref(),
1225                        MutationCheckpointOutcome::Blocked,
1226                    );
1227                    return Ok(Some(truncate_text(
1228                        &format!("Tool `{tool}` result:\n{auth_output}"),
1229                        16_000,
1230                    )));
1231                }
1232                let mut failed_part = WireMessagePart::tool_result(
1233                    session_id,
1234                    message_id,
1235                    tool.clone(),
1236                    Some(args_for_side_events.clone()),
1237                    json!(null),
1238                );
1239                failed_part.id = invoke_part_id.clone();
1240                failed_part.state = Some("failed".to_string());
1241                failed_part.error = Some(err_text.clone());
1242                self.event_bus.publish(EngineEvent::new(
1243                    "message.part.updated",
1244                    json!({"part": failed_part}),
1245                ));
1246                publish_tool_effect(
1247                    invoke_part_id.as_deref(),
1248                    ToolEffectLedgerPhase::Outcome,
1249                    ToolEffectLedgerStatus::Failed,
1250                    &args_for_side_events,
1251                    None,
1252                    None,
1253                    Some(&err_text),
1254                );
1255                publish_mutation_checkpoint(
1256                    invoke_part_id.as_deref(),
1257                    MutationCheckpointOutcome::Failed,
1258                );
1259                return Err(err);
1260            }
1261        };
1262        if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
1263            let event_name = if auth.pending && auth.blocked {
1264                "mcp.auth.pending"
1265            } else {
1266                "mcp.auth.required"
1267            };
1268            self.event_bus.publish(EngineEvent::new(
1269                event_name,
1270                json!({
1271                    "sessionID": session_id,
1272                    "messageID": message_id,
1273                    "tool": tool.clone(),
1274                    "server": auth.server,
1275                    "authorizationUrl": auth.authorization_url,
1276                    "message": auth.message,
1277                    "challengeId": auth.challenge_id,
1278                    "pending": auth.pending,
1279                    "blocked": auth.blocked,
1280                    "retryAfterMs": auth.retry_after_ms
1281                }),
1282            ));
1283        }
1284        emit_tool_side_events(
1285            self.storage.clone(),
1286            &self.event_bus,
1287            ToolSideEventContext {
1288                session_id,
1289                message_id,
1290                tool: &tool,
1291                args: &args_for_side_events,
1292                metadata: &result.metadata,
1293                workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
1294                effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
1295            },
1296        )
1297        .await;
1298        let output = if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
1299            if auth.pending && auth.blocked {
1300                let retry_after_secs = auth.retry_after_ms.unwrap_or(0).div_ceil(1000);
1301                format!(
1302                    "Authorization pending for `{}`.\n{}\n\nAuthorize here: {}\nRetry after {}s.",
1303                    tool, auth.message, auth.authorization_url, retry_after_secs
1304                )
1305            } else {
1306                format!(
1307                    "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
1308                    tool, auth.message, auth.authorization_url
1309                )
1310            }
1311        } else {
1312            self.plugins.transform_tool_output(result.output).await
1313        };
1314        let output = truncate_text(&output, 16_000);
1315        let mut result_part = WireMessagePart::tool_result(
1316            session_id,
1317            message_id,
1318            tool.clone(),
1319            Some(args_for_side_events.clone()),
1320            json!(output.clone()),
1321        );
1322        result_part.id = invoke_part_id.clone();
1323        self.event_bus.publish(EngineEvent::new(
1324            "message.part.updated",
1325            json!({"part": result_part}),
1326        ));
1327        publish_tool_effect(
1328            invoke_part_id.as_deref(),
1329            ToolEffectLedgerPhase::Outcome,
1330            ToolEffectLedgerStatus::Succeeded,
1331            &args_for_side_events,
1332            Some(&result.metadata),
1333            Some(&output),
1334            None,
1335        );
1336        publish_mutation_checkpoint(
1337            invoke_part_id.as_deref(),
1338            MutationCheckpointOutcome::Succeeded,
1339        );
1340        Ok(Some(truncate_text(
1341            &format!("Tool `{tool}` result:\n{output}"),
1342            16_000,
1343        )))
1344    }
1345}
1346
1347#[cfg(test)]
1348mod tests;