Skip to main content

car_engine/
executor.rs

1//! Core execution engine — propose → validate → execute → commit.
2
3use crate::cache::ResultCache;
4use crate::capabilities::CapabilitySet;
5use crate::checkpoint::Checkpoint;
6use crate::rate_limit::{RateLimit, RateLimiter};
7use car_eventlog::{EventKind, EventLog, SpanStatus};
8use car_ir::{
9    build_dag, Action, ActionProposal, ActionResult, ActionStatus, ActionType, CostSummary,
10    FailureBehavior, ProposalResult, ToolSchema,
11};
12use car_policy::PolicyEngine;
13use car_state::StateStore;
14use car_validator::validate_action;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
20use tokio::time::timeout;
21use tracing::instrument;
22use uuid::Uuid;
23
24/// Retry backoff constants.
25const RETRY_BASE_DELAY_MS: u64 = 100;
26const RETRY_BACKOFF_FACTOR: u64 = 2;
27
28// ---------------------------------------------------------------------------
29// Replan types — failure recovery via model callback
30// ---------------------------------------------------------------------------
31
32/// Callback trait for replanning failed proposals.
33/// Implement this to let the runtime ask the model for an alternative plan
34/// when a proposal aborts.
35#[async_trait::async_trait]
36pub trait ReplanCallback: Send + Sync {
37    async fn replan(&self, ctx: &ReplanContext) -> Result<ActionProposal, String>;
38}
39
40/// Context provided to the replan callback so the model can generate an alternative.
41#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42pub struct ReplanContext {
43    /// Original proposal ID.
44    pub proposal_id: String,
45    /// Which replan attempt this is (1-indexed; attempt 1 = first replan after initial failure).
46    pub attempt: u32,
47    /// Actions that failed and caused the abort.
48    pub failed_actions: Vec<FailedActionSummary>,
49    /// Action IDs that succeeded before the abort (now rolled back).
50    pub completed_action_ids: Vec<String>,
51    /// State snapshot after rollback.
52    pub state_snapshot: HashMap<String, Value>,
53    /// How many replans remain.
54    pub replans_remaining: u32,
55    /// Original proposal source (model name, agent, etc.).
56    pub original_source: String,
57    /// Total actions in the original proposal.
58    pub original_action_count: usize,
59    /// The original goal/context from the proposal (for generating alternatives).
60    pub original_context: HashMap<String, Value>,
61}
62
63/// Summary of a failed action, included in ReplanContext.
64#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
65pub struct FailedActionSummary {
66    pub action_id: String,
67    pub tool: Option<String>,
68    pub error: String,
69    pub parameters: HashMap<String, Value>,
70}
71
72/// Configuration for the replan loop.
73#[derive(Debug, Clone)]
74pub struct ReplanConfig {
75    /// Maximum number of replan attempts. 0 = disabled (default).
76    pub max_replans: u32,
77    /// Delay in milliseconds between replan attempts. Prevents burning through
78    /// attempts instantly if the model returns garbage fast. 0 = no delay.
79    pub delay_ms: u64,
80    /// If true, replan proposals are scored via car-planner's verify() before
81    /// execution. Proposals with errors are rejected without executing.
82    /// Prevents the engine from running a worse plan than the one that failed.
83    pub verify_before_execute: bool,
84}
85
86impl Default for ReplanConfig {
87    fn default() -> Self {
88        Self {
89            max_replans: 0,
90            delay_ms: 0,
91            verify_before_execute: true,
92        }
93    }
94}
95
96/// Trait for tool execution. Implement this to provide tools to the runtime.
97///
98/// In-process: implement directly with function calls.
99/// Daemon mode: implement by sending JSON-RPC to the client.
100#[async_trait::async_trait]
101pub trait ToolExecutor: Send + Sync {
102    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String>;
103
104    /// Variant that also carries the originating proposal `Action.id`.
105    /// WS-based executors (`car-server-core::WsToolExecutor`) use it so
106    /// the daemon-initiated `tools.execute` request to the client
107    /// carries the same id the host's process-wide handler is keyed on
108    /// — without this round-trip the host can't disambiguate concurrent
109    /// callbacks for the same tool (Parslee-ai/car-releases#43
110    /// follow-up). In-process executors that don't need it can keep the
111    /// default forward to [`execute`].
112    async fn execute_with_action(
113        &self,
114        tool: &str,
115        params: &Value,
116        _action_id: &str,
117    ) -> Result<Value, String> {
118        self.execute(tool, params).await
119    }
120}
121
122/// Deterministic key for idempotency deduplication.
123fn idempotency_key(action: &Action) -> String {
124    let sorted: std::collections::BTreeMap<_, _> = action.parameters.iter().collect();
125    let params = serde_json::to_string(&sorted).unwrap_or_default();
126    format!(
127        "{}:{}:{}",
128        serde_json::to_string(&action.action_type).unwrap_or_default(),
129        action.tool.as_deref().unwrap_or(""),
130        params
131    )
132}
133
134fn rejected_result(action_id: &str, error: String) -> ActionResult {
135    ActionResult {
136        action_id: action_id.to_string(),
137        status: ActionStatus::Rejected,
138        output: None,
139        error: Some(error),
140        state_changes: HashMap::new(),
141        duration_ms: None,
142        timestamp: chrono::Utc::now(),
143    }
144}
145
146/// Capture only the state keys relevant to an action (state_dependencies + expected_effects).
147/// Returns an empty map if the action declares no relevant keys (e.g., tool calls
148/// that don't interact with state).
149fn snapshot_relevant_keys(
150    state: &car_state::StateStore,
151    action: &Action,
152) -> HashMap<String, Value> {
153    let mut keys: std::collections::HashSet<&str> = std::collections::HashSet::new();
154    for dep in &action.state_dependencies {
155        keys.insert(dep.as_str());
156    }
157    for key in action.expected_effects.keys() {
158        keys.insert(key.as_str());
159    }
160    // For state_write actions, capture the key being written
161    if action.action_type == ActionType::StateWrite {
162        if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
163            keys.insert(key);
164        }
165    }
166
167    if keys.is_empty() {
168        // No declared state interaction — skip snapshot (empty map)
169        return HashMap::new();
170    }
171
172    keys.iter()
173        .filter_map(|&k| state.get(k).map(|v| (k.to_string(), v)))
174        .collect()
175}
176
177fn skipped_result(action_id: &str, reason: &str) -> ActionResult {
178    ActionResult {
179        action_id: action_id.to_string(),
180        status: ActionStatus::Skipped,
181        output: None,
182        error: Some(reason.to_string()),
183        state_changes: HashMap::new(),
184        duration_ms: None,
185        timestamp: chrono::Utc::now(),
186    }
187}
188
189/// Prefix on the `error` field of an `ActionResult` that distinguishes
190/// "the user pulled the plug" from "earlier abort cascaded." The
191/// `ActionStatus` itself is `Skipped` in both cases (introducing a
192/// new variant ripples through every IR consumer + FFI binding); the
193/// prefix lets callers like the A2A bridge tell the cases apart
194/// without string-matching a magic literal.
195pub const CANCELED_PREFIX: &str = "canceled: ";
196
197/// Result for an action that didn't run because the proposal was
198/// cancelled mid-flight. Distinct from `Skipped` so callers can tell
199/// "didn't run because of an earlier abort" from "didn't run because
200/// the user pulled the plug."
201fn canceled_result(action_id: &str, reason: &str) -> ActionResult {
202    ActionResult {
203        action_id: action_id.to_string(),
204        status: ActionStatus::Skipped,
205        output: None,
206        error: Some(format!("{}{}", CANCELED_PREFIX, reason)),
207        state_changes: HashMap::new(),
208        duration_ms: None,
209        timestamp: chrono::Utc::now(),
210    }
211}
212
213/// Format a tool result for feeding back to a model.
214pub fn format_tool_result(result: &ActionResult) -> String {
215    match result.status {
216        ActionStatus::Succeeded => match &result.output {
217            Some(v) => serde_json::to_string(v).unwrap_or_else(|_| v.to_string()),
218            None => String::new(),
219        },
220        ActionStatus::Rejected => format!("[REJECTED] {}", result.error.as_deref().unwrap_or("")),
221        ActionStatus::Failed => format!("[FAILED] {}", result.error.as_deref().unwrap_or("")),
222        _ => format!(
223            "[{:?}] {}",
224            result.status,
225            result.error.as_deref().unwrap_or("")
226        ),
227    }
228}
229
230/// Budget constraints for proposal execution.
231#[derive(Debug, Clone)]
232pub struct CostBudget {
233    pub max_tool_calls: Option<u32>,
234    pub max_duration_ms: Option<f64>,
235    pub max_actions: Option<u32>,
236}
237
238/// Common Agent Runtime — deterministic execution layer.
239///
240/// Lock ordering discipline (never hold multiple simultaneously, never hold sync locks across .await):
241/// 1. capabilities (RwLock, read-only during execution)
242/// 2. tools (RwLock, read-only during execution)
243/// 3. policies (RwLock, read-only during execution)
244/// 4. session_policies (RwLock to find the per-session engine, then read inner Arc)
245/// 5. cost_budget (RwLock, read-only during execution)
246/// 6. log (TokioMutex, acquired/released per event)
247/// 7. tool_executor (TokioMutex, clone Arc and drop before await)
248/// 8. idempotency_cache (TokioMutex, acquired/released per check)
249///
250/// StateStore uses parking_lot::Mutex (sync) — NEVER hold across .await points.
251pub struct Runtime {
252    pub state: Arc<StateStore>,
253    pub tools: Arc<TokioRwLock<HashMap<String, ToolSchema>>>,
254    pub policies: Arc<TokioRwLock<PolicyEngine>>,
255    /// Per-session policy registries. Hosts that multiplex multiple
256    /// concurrent agent sessions over a single `Runtime` (IDE-style
257    /// frontends with per-project rules, multi-tenant servers) call
258    /// [`Runtime::open_session`] to mint an id, then
259    /// [`Runtime::register_policy_in_session`] to attach session-scoped
260    /// rules. Validation under that session walks the global registry
261    /// AND the session's — both must pass. Sessions can deny what
262    /// global allows; sessions cannot allow what global denies.
263    /// Closing a session drops its registry and any closures it holds.
264    /// See `docs/proposals/per-session-policy-scoping.md`.
265    pub session_policies: Arc<TokioRwLock<HashMap<String, Arc<TokioRwLock<PolicyEngine>>>>>,
266    pub log: Arc<TokioMutex<EventLog>>,
267    pub rate_limiter: Arc<RateLimiter>,
268    pub result_cache: Arc<ResultCache>,
269    tool_executor: TokioMutex<Option<Arc<dyn ToolExecutor>>>,
270    idempotency_cache: TokioMutex<HashMap<String, ActionResult>>,
271    cost_budget: TokioRwLock<Option<CostBudget>>,
272    capabilities: TokioRwLock<Option<CapabilitySet>>,
273    inference_engine: Option<Arc<car_inference::InferenceEngine>>,
274    /// Optional memgine for skill learning (auto-distillation after execution).
275    memgine: Option<Arc<TokioMutex<car_memgine::MemgineEngine>>>,
276    /// Whether to auto-distill skills after each proposal execution.
277    auto_distill: bool,
278    /// Optional trajectory store for persisting execution traces.
279    trajectory_store: Option<Arc<car_memgine::TrajectoryStore>>,
280    /// Optional replan callback for failure recovery.
281    replan_callback: TokioMutex<Option<Arc<dyn ReplanCallback>>>,
282    /// Replan configuration.
283    replan_config: TokioRwLock<ReplanConfig>,
284    /// Canonical tool registry (optional — new code should use this).
285    pub registry: Arc<crate::registry::ToolRegistry>,
286}
287
288impl Runtime {
289    pub fn new() -> Self {
290        Self {
291            state: Arc::new(StateStore::new()),
292            tools: Arc::new(TokioRwLock::new(HashMap::new())),
293            policies: Arc::new(TokioRwLock::new(PolicyEngine::new())),
294            session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
295            log: Arc::new(TokioMutex::new(EventLog::new())),
296            rate_limiter: Arc::new(RateLimiter::new()),
297            result_cache: Arc::new(ResultCache::new()),
298            tool_executor: TokioMutex::new(None),
299            idempotency_cache: TokioMutex::new(HashMap::new()),
300            cost_budget: TokioRwLock::new(None),
301            capabilities: TokioRwLock::new(None),
302            inference_engine: None,
303            memgine: None,
304            auto_distill: false,
305            trajectory_store: None,
306            replan_callback: TokioMutex::new(None),
307            replan_config: TokioRwLock::new(ReplanConfig::default()),
308            registry: Arc::new(crate::registry::ToolRegistry::new()),
309        }
310    }
311
312    /// Create a runtime with shared state, event log, and policies.
313    /// Each runtime gets its own tool set, executor, and idempotency cache.
314    pub fn with_shared(
315        state: Arc<StateStore>,
316        log: Arc<TokioMutex<EventLog>>,
317        policies: Arc<TokioRwLock<PolicyEngine>>,
318    ) -> Self {
319        Self {
320            state,
321            tools: Arc::new(TokioRwLock::new(HashMap::new())),
322            policies,
323            // Session-policy registries are per-runtime — sharing them
324            // across embedders that share global policies would defeat
325            // the isolation point. Hosts that genuinely want shared
326            // sessions should drive them through one shared Runtime.
327            session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
328            log,
329            rate_limiter: Arc::new(RateLimiter::new()),
330            result_cache: Arc::new(ResultCache::new()),
331            tool_executor: TokioMutex::new(None),
332            idempotency_cache: TokioMutex::new(HashMap::new()),
333            cost_budget: TokioRwLock::new(None),
334            capabilities: TokioRwLock::new(None),
335            inference_engine: None,
336            memgine: None,
337            auto_distill: false,
338            trajectory_store: None,
339            replan_callback: TokioMutex::new(None),
340            replan_config: TokioRwLock::new(ReplanConfig::default()),
341            registry: Arc::new(crate::registry::ToolRegistry::new()),
342        }
343    }
344
345    // ─── Session policy lifecycle ───────────────────────────────────
346    //
347    // Per-session policy scoping. Policies registered against a
348    // session apply to proposals executed under that session id (via
349    // [`Self::execute_with_session`] / [`Self::execute_with_session_and_cancel`]).
350    // Policies registered globally always apply, on top of any
351    // session-scoped layer. See `docs/proposals/per-session-policy-scoping.md`.
352
353    /// Mint a new session id and pre-register an empty policy engine
354    /// under it. Hosts call this once per concurrent agent context
355    /// (an IDE project window, a multi-tenant client, etc.) and pair
356    /// it with [`Self::close_session`] when the context ends.
357    ///
358    /// Returns the opaque id to pass to subsequent
359    /// [`Self::register_policy_in_session`] / [`Self::execute_with_session`]
360    /// calls. Ids are UUIDs so collisions across concurrent calls
361    /// don't matter.
362    pub async fn open_session(&self) -> String {
363        let id = Uuid::new_v4().to_string();
364        let mut sessions = self.session_policies.write().await;
365        sessions.insert(id.clone(), Arc::new(TokioRwLock::new(PolicyEngine::new())));
366        id
367    }
368
369    /// Drop the session and every policy scoped to it. Returns true
370    /// if a session by that id existed; false if it didn't (already
371    /// closed, never opened, etc.). Idempotent in effect — closing a
372    /// missing session is a no-op the caller is free to ignore.
373    pub async fn close_session(&self, session_id: &str) -> bool {
374        let mut sessions = self.session_policies.write().await;
375        sessions.remove(session_id).is_some()
376    }
377
378    /// Register a policy under a specific session id. The policy
379    /// applies only when a proposal is executed under that session;
380    /// proposals executed without a session (the default) only see
381    /// global policies.
382    ///
383    /// Returns `Err(...)` if the session is unknown — callers either
384    /// forgot to call [`Self::open_session`] or are using a
385    /// stale/closed id.
386    pub async fn register_policy_in_session(
387        &self,
388        session_id: &str,
389        name: &str,
390        check: car_policy::PolicyCheck,
391        description: &str,
392    ) -> Result<(), String> {
393        let engine = {
394            let sessions = self.session_policies.read().await;
395            sessions
396                .get(session_id)
397                .cloned()
398                .ok_or_else(|| format!("unknown session id '{session_id}'"))?
399        };
400        let mut engine = engine.write().await;
401        engine.register(name, check, description);
402        Ok(())
403    }
404
405    /// True if a session with this id is currently open. Mostly for
406    /// tests and FFI surface validation — production code should
407    /// trust the id it just opened.
408    pub async fn session_exists(&self, session_id: &str) -> bool {
409        self.session_policies.read().await.contains_key(session_id)
410    }
411
412    /// Attach a local inference engine. Registers `infer`, `embed`, `classify`
413    /// as built-in tools with real implementations.
414    pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
415        self.inference_engine = Some(engine);
416        // Register inference tool schemas (non-async init, use try_lock)
417        if let Ok(mut tools) = self.tools.try_write() {
418            for schema in car_inference::service::all_schemas() {
419                tools.insert(schema.name.clone(), schema);
420            }
421        }
422        self
423    }
424
425    /// Attach a memgine for automatic skill learning after execution.
426    /// When `auto_distill` is true, execution traces are automatically distilled
427    /// into skills and domains are evolved when underperforming.
428    pub fn with_learning(
429        mut self,
430        memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>,
431        auto_distill: bool,
432    ) -> Self {
433        self.memgine = Some(memgine);
434        self.auto_distill = auto_distill;
435        self
436    }
437
438    /// Attach a memgine with auto-distillation enabled (recommended default).
439    pub fn with_memgine(self, memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>) -> Self {
440        self.with_learning(memgine, true)
441    }
442
443    /// Attach a trajectory store for persisting execution traces.
444    pub fn with_trajectory_store(mut self, store: Arc<car_memgine::TrajectoryStore>) -> Self {
445        self.trajectory_store = Some(store);
446        self
447    }
448
449    pub fn with_executor(self, executor: Arc<dyn ToolExecutor>) -> Self {
450        // Use try_lock for non-async init context. Safe because we just created the mutex.
451        if let Ok(mut guard) = self.tool_executor.try_lock() {
452            *guard = Some(executor);
453        }
454        self
455    }
456
457    /// Set a tool executor for the next execute() call.
458    /// Used by NAPI bindings where executor varies per call.
459    pub async fn set_executor(&self, executor: Arc<dyn ToolExecutor>) {
460        *self.tool_executor.lock().await = Some(executor);
461    }
462
463    pub fn with_event_log(mut self, log: EventLog) -> Self {
464        self.log = Arc::new(TokioMutex::new(log));
465        self
466    }
467
468    /// Attach a replan callback for failure recovery (builder).
469    pub fn with_replan(self, callback: Arc<dyn ReplanCallback>, config: ReplanConfig) -> Self {
470        if let Ok(mut guard) = self.replan_callback.try_lock() {
471            *guard = Some(callback);
472        }
473        if let Ok(mut guard) = self.replan_config.try_write() {
474            *guard = config;
475        }
476        self
477    }
478
479    /// Set a replan callback at runtime.
480    pub async fn set_replan_callback(&self, callback: Arc<dyn ReplanCallback>) {
481        *self.replan_callback.lock().await = Some(callback);
482    }
483
484    /// Set replan configuration at runtime.
485    pub async fn set_replan_config(&self, config: ReplanConfig) {
486        *self.replan_config.write().await = config;
487    }
488
489    /// Register a tool with just a name (backward compatible).
490    pub async fn register_tool(&self, name: &str) {
491        let schema = ToolSchema {
492            name: name.to_string(),
493            description: String::new(),
494            parameters: serde_json::Value::Object(Default::default()),
495            returns: None,
496            idempotent: false,
497            cache_ttl_secs: None,
498            rate_limit: None,
499        };
500        self.register_tool_schema(schema).await;
501    }
502
503    /// Register a tool with full schema.
504    pub async fn register_tool_schema(&self, schema: ToolSchema) {
505        // Auto-configure cache if schema specifies it
506        if let Some(ttl) = schema.cache_ttl_secs {
507            self.result_cache.enable_caching(&schema.name, ttl).await;
508        }
509        // Auto-configure rate limit if schema specifies it
510        if let Some(ref rl) = schema.rate_limit {
511            self.rate_limiter
512                .set_limit(
513                    &schema.name,
514                    RateLimit {
515                        max_calls: rl.max_calls,
516                        interval_secs: rl.interval_secs,
517                    },
518                )
519                .await;
520        }
521        self.tools.write().await.insert(schema.name.clone(), schema);
522    }
523
524    /// Register a tool via the canonical registry.
525    /// This is the preferred way to register tools — it updates both the
526    /// registry and the legacy tools HashMap for backward compatibility.
527    pub async fn register_tool_entry(&self, entry: crate::registry::ToolEntry) {
528        let schema = entry.schema.clone();
529        self.registry.register(entry).await;
530        self.register_tool_schema(schema).await;
531    }
532
533    /// Register CAR's built-in agent utility stdlib.
534    ///
535    /// This is an opt-in convenience layer for common local-file and text tools.
536    /// Existing runtimes remain unchanged until this is called.
537    pub async fn register_agent_basics(&self) {
538        for entry in crate::agent_basics::entries() {
539            self.register_tool_entry(entry).await;
540        }
541    }
542
543    /// Get all registered tool schemas (for model prompt generation).
544    pub async fn tool_schemas(&self) -> Vec<ToolSchema> {
545        self.tools.read().await.values().cloned().collect()
546    }
547
548    /// Set a cost budget that limits proposal execution.
549    pub async fn set_cost_budget(&self, budget: CostBudget) {
550        *self.cost_budget.write().await = Some(budget);
551    }
552
553    /// Set per-agent capability permissions that restrict tools, state keys, and action count.
554    pub async fn set_capabilities(&self, caps: CapabilitySet) {
555        *self.capabilities.write().await = Some(caps);
556    }
557
558    /// Set a per-tool rate limit (token bucket).
559    ///
560    /// `max_calls` tokens are available per `interval_secs` window.
561    /// When the bucket is empty, `dispatch()` applies backpressure by
562    /// waiting until a token refills.
563    pub async fn set_rate_limit(&self, tool: &str, max_calls: u32, interval_secs: f64) {
564        self.rate_limiter
565            .set_limit(
566                tool,
567                RateLimit {
568                    max_calls,
569                    interval_secs,
570                },
571            )
572            .await;
573    }
574
575    /// Enable cross-proposal result caching for a tool with a TTL in seconds.
576    pub async fn enable_tool_cache(&self, tool: &str, ttl_secs: u64) {
577        self.result_cache.enable_caching(tool, ttl_secs).await;
578    }
579
580    /// Execute a proposal with automatic replanning on failure.
581    ///
582    /// If a `ReplanCallback` is registered and `max_replans > 0`, the runtime
583    /// will catch abort failures, roll back state, ask the model for an
584    /// alternative proposal via the callback, and re-execute. This transforms
585    /// "execute-and-hope" into "execute-and-recover."
586    ///
587    /// If no callback is registered or `max_replans == 0`, behaves identically
588    /// to a single `execute_inner()` call (zero overhead, fully backward compatible).
589    #[instrument(
590        name = "proposal.execute",
591        skip_all,
592        fields(
593            proposal_id = %proposal.id,
594            action_count = proposal.actions.len(),
595        )
596    )]
597    pub async fn execute(&self, proposal: &ActionProposal) -> ProposalResult {
598        // Forward to the cancel-aware variant with a never-cancelled
599        // token. Existing callers see no behaviour change.
600        let token = tokio_util::sync::CancellationToken::new();
601        self.execute_with_cancel(proposal, &token).await
602    }
603
604    /// Execute a proposal scoped to a specific session id.
605    ///
606    /// Validation walks the global policy registry plus the session's
607    /// own registry — both must pass for an action to run. Session
608    /// policies can deny what global allows; they cannot allow what
609    /// global denies (validation is conjunctive).
610    ///
611    /// Returns the same [`ProposalResult`] shape as [`Self::execute`].
612    /// Errors with an action-level rejection if the session id is
613    /// unknown — callers should check via [`Self::session_exists`] or
614    /// trust an id they minted via [`Self::open_session`].
615    pub async fn execute_with_session(
616        &self,
617        proposal: &ActionProposal,
618        session_id: &str,
619    ) -> ProposalResult {
620        let token = tokio_util::sync::CancellationToken::new();
621        self.execute_with_session_and_cancel(proposal, session_id, &token)
622            .await
623    }
624
625    /// Combined session-scoped + cancellable execute. The session id
626    /// is passed verbatim to the per-action policy check; the cancel
627    /// token behaves identically to [`Self::execute_with_cancel`].
628    pub async fn execute_with_session_and_cancel(
629        &self,
630        proposal: &ActionProposal,
631        session_id: &str,
632        cancel: &tokio_util::sync::CancellationToken,
633    ) -> ProposalResult {
634        self.execute_with_optional_session(proposal, Some(session_id), None, cancel)
635            .await
636    }
637
638    /// Execute a proposal with cooperative cancellation.
639    ///
640    /// The runtime checks `token.is_cancelled()` at each DAG level
641    /// boundary. When set, every action that hadn't yet started runs
642    /// is reported as `Skipped` with `error = "canceled: ..."` so
643    /// callers can distinguish "user pulled the plug" from "earlier
644    /// abort cascaded." Actions already in flight continue to
645    /// completion — tool calls dispatched to user-provided executors
646    /// can't be safely interrupted from the engine.
647    ///
648    /// The CAR A2A bridge uses this so `tasks/cancel` produces a
649    /// `ProposalResult` with clean partial state rather than relying
650    /// on `JoinHandle::abort` to interrupt mid-await (which leaves
651    /// no record of which actions actually ran).
652    ///
653    /// **FFI exposure:** this method is intentionally not surfaced
654    /// through the NAPI / PyO3 / `car-server-core` JSON-RPC bindings.
655    /// Those consumers (Node, Python, WebSocket) don't currently
656    /// expose long-running async-task surfaces that need
657    /// cancellation; the bridge is the lone consumer. When a binding
658    /// gains a long-running task surface, the path is clear: add a
659    /// per-binding token registry keyed by some caller-provided id,
660    /// expose `cancelExecution(id)` / `cancel_execution(id)` /
661    /// `proposal.cancel { id }`, and have the runtime call
662    /// `execute_with_cancel` with the matching token. Skipping that
663    /// today avoids speculative API surface that bloats bindings
664    /// without a consumer.
665    pub async fn execute_with_cancel(
666        &self,
667        proposal: &ActionProposal,
668        cancel: &tokio_util::sync::CancellationToken,
669    ) -> ProposalResult {
670        self.execute_with_optional_session(proposal, None, None, cancel)
671            .await
672    }
673
674    /// Execute a proposal with an attached [`RuntimeScope`]
675    /// (Parslee-ai/car#187 phase 3).
676    ///
677    /// Same contract as [`Self::execute`] plus a per-execution
678    /// identity surface — typically built by the car-a2a dispatcher
679    /// from the verified `Identity` and cooperative `a2a_caller`
680    /// metadata on the inbound `ActionProposal`. The scope is
681    /// recorded on the event log so downstream audit / log analysis
682    /// can see which caller / tenant issued each action.
683    ///
684    /// **What this enforces today**: scope is captured + logged.
685    /// Memgine queries and state-store ops still hit global
686    /// namespaces — those follow-ups are tracked under #187.
687    /// Tool / policy code that needs per-tenant behaviour right now
688    /// should keep reading `proposal.context["a2a_caller_verified"]`
689    /// directly (the phase 1 / 2 surface).
690    pub async fn execute_scoped(
691        &self,
692        proposal: &ActionProposal,
693        scope: &crate::scope::RuntimeScope,
694    ) -> ProposalResult {
695        let token = tokio_util::sync::CancellationToken::new();
696        self.execute_scoped_with_cancel(proposal, scope, &token)
697            .await
698    }
699
700    /// Combined scoped + cancellable execute. Mirrors the shape of
701    /// [`Self::execute_with_session_and_cancel`] for symmetry — both
702    /// add a side-channel (session id / scope) on top of the
703    /// cancellable form.
704    pub async fn execute_scoped_with_cancel(
705        &self,
706        proposal: &ActionProposal,
707        scope: &crate::scope::RuntimeScope,
708        cancel: &tokio_util::sync::CancellationToken,
709    ) -> ProposalResult {
710        self.execute_with_optional_session(proposal, None, Some(scope), cancel)
711            .await
712    }
713
714    /// Internal entry point that backs both
715    /// [`Self::execute_with_cancel`] (no session) and
716    /// [`Self::execute_with_session_and_cancel`]. Holds the replan
717    /// loop and threads `session_id` into the per-action validation
718    /// path so session-scoped policies stack on top of global ones.
719    async fn execute_with_optional_session(
720        &self,
721        proposal: &ActionProposal,
722        session_id: Option<&str>,
723        scope: Option<&crate::scope::RuntimeScope>,
724        cancel: &tokio_util::sync::CancellationToken,
725    ) -> ProposalResult {
726        let config = self.replan_config.read().await.clone();
727        let mut current_proposal = proposal.clone();
728        let mut attempt: u32 = 0;
729
730        // Phase 3 foundation (Parslee-ai/car#187): record the scope
731        // on the event log so audit / log analysis can correlate
732        // actions to the caller / tenant that triggered them. Only
733        // logged when at least one identity field is set — keeps
734        // the existing in-process call sites free of noise.
735        if let Some(s) = scope {
736            if !s.is_unscoped() {
737                let mut props: HashMap<String, Value> = HashMap::new();
738                if let Some(cid) = &s.caller_id {
739                    props.insert("caller_id".to_string(), Value::from(cid.as_str()));
740                }
741                if let Some(tid) = &s.tenant_id {
742                    props.insert("tenant_id".to_string(), Value::from(tid.as_str()));
743                }
744                if !s.claims.is_empty() {
745                    if let Ok(claims_json) = serde_json::to_value(&s.claims) {
746                        props.insert("claims".to_string(), claims_json);
747                    }
748                }
749                let mut log = self.log.lock().await;
750                log.append(EventKind::SessionScope, None, Some(&proposal.id), props);
751            }
752        }
753
754        loop {
755            let (result, state_before_map) = self
756                .execute_inner_with_cancel(&current_proposal, Some(cancel), session_id, scope)
757                .await;
758
759            // Check if we aborted
760            let aborted = result
761                .results
762                .iter()
763                .any(|r| r.status == ActionStatus::Failed);
764            if !aborted || attempt >= config.max_replans {
765                if aborted && attempt > 0 {
766                    // Exhausted all replan attempts
767                    let mut log = self.log.lock().await;
768                    log.append(
769                        EventKind::ReplanExhausted,
770                        None,
771                        Some(&proposal.id),
772                        [("attempts".to_string(), Value::from(attempt))].into(),
773                    );
774                }
775
776                // Persist trajectory
777                let outcome = if !aborted {
778                    if attempt > 0 {
779                        car_memgine::TrajectoryOutcome::ReplanSuccess
780                    } else {
781                        car_memgine::TrajectoryOutcome::Success
782                    }
783                } else if attempt > 0 {
784                    car_memgine::TrajectoryOutcome::ReplanExhausted
785                } else {
786                    car_memgine::TrajectoryOutcome::Failed
787                };
788                if let Some(err) = self.persist_trajectory(
789                    proposal,
790                    &current_proposal,
791                    &result,
792                    outcome,
793                    attempt,
794                    &state_before_map,
795                ) {
796                    let mut log = self.log.lock().await;
797                    log.append(
798                        EventKind::ActionFailed,
799                        None,
800                        Some(&proposal.id),
801                        [(
802                            "trajectory_persist_error".to_string(),
803                            Value::from(err.as_str()),
804                        )]
805                        .into(),
806                    );
807                }
808
809                return result;
810            }
811
812            // Get replan callback (clone Arc, drop lock immediately)
813            let callback = {
814                let guard = self.replan_callback.lock().await;
815                guard.clone()
816            };
817            let Some(callback) = callback else {
818                // No callback registered — persist trajectory and return
819                if let Some(err) = self.persist_trajectory(
820                    proposal,
821                    &current_proposal,
822                    &result,
823                    car_memgine::TrajectoryOutcome::Failed,
824                    attempt,
825                    &state_before_map,
826                ) {
827                    let mut log = self.log.lock().await;
828                    log.append(
829                        EventKind::ActionFailed,
830                        None,
831                        Some(&proposal.id),
832                        [(
833                            "trajectory_persist_error".to_string(),
834                            Value::from(err.as_str()),
835                        )]
836                        .into(),
837                    );
838                }
839                return result;
840            };
841
842            // Build replan context
843            let failed_actions: Vec<FailedActionSummary> = result
844                .results
845                .iter()
846                .filter(|r| r.status == ActionStatus::Failed)
847                .map(|r| {
848                    let action = current_proposal
849                        .actions
850                        .iter()
851                        .find(|a| a.id == r.action_id);
852                    FailedActionSummary {
853                        action_id: r.action_id.clone(),
854                        tool: action.and_then(|a| a.tool.clone()),
855                        error: r.error.clone().unwrap_or_default(),
856                        parameters: action.map(|a| a.parameters.clone()).unwrap_or_default(),
857                    }
858                })
859                .collect();
860
861            let completed_action_ids: Vec<String> = result
862                .results
863                .iter()
864                .filter(|r| r.status == ActionStatus::Succeeded)
865                .map(|r| r.action_id.clone())
866                .collect();
867
868            let ctx = ReplanContext {
869                proposal_id: proposal.id.clone(),
870                attempt: attempt + 1,
871                failed_actions,
872                completed_action_ids,
873                state_snapshot: self.state.snapshot(),
874                replans_remaining: config.max_replans.saturating_sub(attempt + 1),
875                original_source: proposal.source.clone(),
876                original_action_count: proposal.actions.len(),
877                original_context: proposal.context.clone(),
878            };
879
880            // Backoff delay between replan attempts
881            if config.delay_ms > 0 {
882                tokio::time::sleep(Duration::from_millis(config.delay_ms)).await;
883            }
884
885            // Log replan attempt
886            {
887                let mut log = self.log.lock().await;
888                log.append(
889                    EventKind::ReplanAttempted,
890                    None,
891                    Some(&proposal.id),
892                    [
893                        ("attempt".to_string(), Value::from(attempt + 1)),
894                        (
895                            "failed_count".to_string(),
896                            Value::from(ctx.failed_actions.len()),
897                        ),
898                    ]
899                    .into(),
900                );
901            }
902
903            // Call the model for a new plan
904            match callback.replan(&ctx).await {
905                Ok(new_proposal) => {
906                    // Quality gate: verify replan proposal before executing
907                    if config.verify_before_execute {
908                        let tools_guard = self.tools.read().await;
909                        let tool_names: std::collections::HashSet<String> =
910                            tools_guard.keys().cloned().collect();
911                        drop(tools_guard);
912
913                        let current_state = self.state.snapshot();
914                        let vr = car_verify::verify(
915                            &new_proposal,
916                            Some(&current_state),
917                            Some(&tool_names),
918                            100,
919                        );
920                        if !vr.valid {
921                            let error_msgs: Vec<String> = vr
922                                .issues
923                                .iter()
924                                .filter(|i| i.severity == "error")
925                                .map(|i| i.message.clone())
926                                .collect();
927                            let mut log = self.log.lock().await;
928                            log.append(
929                                EventKind::ReplanRejected,
930                                None,
931                                Some(&proposal.id),
932                                [
933                                    ("errors".to_string(), Value::from(error_msgs.join("; "))),
934                                    ("attempt".to_string(), Value::from(attempt + 1)),
935                                ]
936                                .into(),
937                            );
938                            // Don't execute a broken replan — count as failed attempt
939                            attempt += 1;
940                            continue;
941                        }
942                    }
943
944                    // Log accepted proposal
945                    {
946                        let mut log = self.log.lock().await;
947                        log.append(
948                            EventKind::ReplanProposalReceived,
949                            None,
950                            Some(&proposal.id),
951                            [
952                                ("attempt".to_string(), Value::from(attempt + 1)),
953                                (
954                                    "new_action_count".to_string(),
955                                    Value::from(new_proposal.actions.len()),
956                                ),
957                            ]
958                            .into(),
959                        );
960                    }
961                    current_proposal = new_proposal;
962                    attempt += 1;
963                }
964                Err(e) => {
965                    // Replan callback itself failed — log and return original failure
966                    let mut log = self.log.lock().await;
967                    log.append(
968                        EventKind::ReplanExhausted,
969                        None,
970                        Some(&proposal.id),
971                        [
972                            ("reason".to_string(), Value::from("callback_error")),
973                            ("error".to_string(), Value::from(e.as_str())),
974                            ("attempt".to_string(), Value::from(attempt + 1)),
975                        ]
976                        .into(),
977                    );
978                    if let Some(err) = self.persist_trajectory(
979                        proposal,
980                        &current_proposal,
981                        &result,
982                        car_memgine::TrajectoryOutcome::Failed,
983                        attempt,
984                        &state_before_map,
985                    ) {
986                        log.append(
987                            EventKind::ActionFailed,
988                            None,
989                            Some(&proposal.id),
990                            [(
991                                "trajectory_persist_error".to_string(),
992                                Value::from(err.as_str()),
993                            )]
994                            .into(),
995                        );
996                    }
997                    return result;
998                }
999            }
1000        }
1001    }
1002
1003    /// Persist a trajectory to the store if configured.
1004    fn persist_trajectory(
1005        &self,
1006        proposal: &ActionProposal,
1007        current_proposal: &ActionProposal,
1008        result: &ProposalResult,
1009        outcome: car_memgine::TrajectoryOutcome,
1010        attempt: u32,
1011        state_before_map: &HashMap<String, HashMap<String, Value>>,
1012    ) -> Option<String> {
1013        let store = self.trajectory_store.as_ref()?;
1014
1015        let trace_events: Vec<car_memgine::TraceEvent> = result
1016            .results
1017            .iter()
1018            .map(|r| {
1019                let kind = match r.status {
1020                    ActionStatus::Succeeded => "action_succeeded",
1021                    ActionStatus::Failed => "action_failed",
1022                    ActionStatus::Rejected => "action_rejected",
1023                    ActionStatus::Skipped => "action_skipped",
1024                    _ => "unknown",
1025                };
1026                let tool = current_proposal
1027                    .actions
1028                    .iter()
1029                    .find(|a| a.id == r.action_id)
1030                    .and_then(|a| a.tool.clone());
1031                let reward = match r.status {
1032                    ActionStatus::Succeeded => Some(1.0),
1033                    ActionStatus::Failed => Some(0.0),
1034                    ActionStatus::Rejected => Some(0.0),
1035                    ActionStatus::Skipped => None,
1036                    _ => None,
1037                };
1038                car_memgine::TraceEvent {
1039                    kind: kind.to_string(),
1040                    action_id: Some(r.action_id.clone()),
1041                    tool,
1042                    data: r
1043                        .error
1044                        .as_ref()
1045                        .map(|e| serde_json::json!({"error": e}))
1046                        .unwrap_or(serde_json::json!({})),
1047                    duration_ms: r.duration_ms,
1048                    state_before: state_before_map.get(&r.action_id).cloned(),
1049                    state_after: if !r.state_changes.is_empty() {
1050                        Some(r.state_changes.clone())
1051                    } else {
1052                        None
1053                    },
1054                    reward,
1055                }
1056            })
1057            .collect();
1058
1059        let trajectory = car_memgine::Trajectory {
1060            proposal_id: proposal.id.clone(),
1061            source: proposal.source.clone(),
1062            action_count: current_proposal.actions.len(),
1063            events: trace_events,
1064            outcome,
1065            timestamp: chrono::Utc::now(),
1066            duration_ms: result.cost.total_duration_ms,
1067            replan_attempts: attempt,
1068        };
1069
1070        match store.append(&trajectory) {
1071            Ok(()) => None,
1072            Err(e) => Some(e.to_string()),
1073        }
1074    }
1075
1076    /// Score N candidate proposals, execute the best valid one, fall back to
1077    /// next-best on failure. Combines car-planner scoring with engine execution.
1078    ///
1079    /// Returns the result from whichever proposal was executed (best or fallback).
1080    /// If all candidates fail verification, returns an error result for the first.
1081    pub async fn plan_and_execute(
1082        &self,
1083        candidates: &[ActionProposal],
1084        planner_config: Option<car_planner::PlannerConfig>,
1085        feedback: Option<&car_planner::ToolFeedback>,
1086    ) -> ProposalResult {
1087        if candidates.is_empty() {
1088            return ProposalResult {
1089                proposal_id: "empty".to_string(),
1090                results: vec![],
1091                cost: car_ir::CostSummary::default(),
1092            };
1093        }
1094
1095        // Score all candidates
1096        let planner = car_planner::Planner::new(planner_config.unwrap_or_default());
1097        let tools_guard = self.tools.read().await;
1098        let tool_names: std::collections::HashSet<String> = tools_guard.keys().cloned().collect();
1099        drop(tools_guard);
1100
1101        let pre_plan_snapshot = self.state.snapshot();
1102        let pre_plan_transitions = self.state.transition_count();
1103        let ranked = planner.rank_with_feedback(
1104            candidates,
1105            Some(&pre_plan_snapshot),
1106            Some(&tool_names),
1107            feedback,
1108        );
1109
1110        // Try each valid candidate in score order
1111        let mut first_failure: Option<ProposalResult> = None;
1112        for scored in &ranked {
1113            if !scored.valid {
1114                continue;
1115            }
1116
1117            // Restore clean state before each candidate (don't rely on execute's
1118            // internal rollback — it only fires on Abort, not Skip/Retry failures)
1119            self.state
1120                .restore(pre_plan_snapshot.clone(), pre_plan_transitions);
1121
1122            let proposal = &candidates[scored.index];
1123            let result = self.execute(proposal).await;
1124
1125            if result.all_succeeded() {
1126                return result;
1127            }
1128
1129            tracing::info!(
1130                proposal_id = %proposal.id,
1131                score = scored.score,
1132                "plan_and_execute: proposal failed, trying next candidate"
1133            );
1134
1135            if first_failure.is_none() {
1136                first_failure = Some(result);
1137            }
1138        }
1139
1140        // Return the first failure result (don't re-execute — avoids duplicate side effects)
1141        first_failure.unwrap_or_else(|| ProposalResult {
1142            proposal_id: candidates[0].id.clone(),
1143            results: vec![],
1144            cost: car_ir::CostSummary::default(),
1145        })
1146    }
1147
1148    /// Execute a single proposal through the runtime loop (no replanning).
1149    /// Returns (result, state_before_map) where state_before_map has per-action snapshots.
1150    ///
1151    /// `session_id`, when `Some`, scopes per-action policy validation
1152    /// to the named session in addition to global policies. Both
1153    /// layers must pass for an action to run; the session layer cannot
1154    /// loosen what global denies.
1155    async fn execute_inner_with_cancel(
1156        &self,
1157        proposal: &ActionProposal,
1158        cancel: Option<&tokio_util::sync::CancellationToken>,
1159        session_id: Option<&str>,
1160        scope: Option<&crate::scope::RuntimeScope>,
1161    ) -> (ProposalResult, HashMap<String, HashMap<String, Value>>) {
1162        // Generate trace_id for this proposal execution
1163        let trace_id = Uuid::new_v4().to_string();
1164
1165        // Begin root span for proposal execution
1166        let root_span_id = {
1167            let mut log = self.log.lock().await;
1168            log.begin_span(
1169                "proposal.execute",
1170                &trace_id,
1171                None,
1172                [("proposal_id".to_string(), Value::from(proposal.id.as_str()))].into(),
1173            )
1174        };
1175
1176        // Log proposal received
1177        {
1178            let mut log = self.log.lock().await;
1179            log.append(
1180                EventKind::ProposalReceived,
1181                None,
1182                Some(&proposal.id),
1183                [
1184                    ("source".to_string(), Value::from(proposal.source.as_str())),
1185                    (
1186                        "action_count".to_string(),
1187                        Value::from(proposal.actions.len()),
1188                    ),
1189                ]
1190                .into(),
1191            );
1192        }
1193
1194        // Capability check: max_actions budget for entire proposal
1195        {
1196            let caps = self.capabilities.read().await;
1197            if let Some(ref cap) = *caps {
1198                if !cap.actions_within_budget(proposal.actions.len() as u32) {
1199                    let mut action_results = Vec::new();
1200                    for action in &proposal.actions {
1201                        action_results.push(rejected_result(
1202                            &action.id,
1203                            format!(
1204                                "capability denied: proposal has {} actions, max allowed is {:?}",
1205                                proposal.actions.len(),
1206                                cap.max_actions
1207                            ),
1208                        ));
1209                    }
1210                    return (
1211                        ProposalResult {
1212                            proposal_id: proposal.id.clone(),
1213                            results: action_results,
1214                            cost: CostSummary::default(),
1215                        },
1216                        HashMap::new(),
1217                    );
1218                }
1219            }
1220        }
1221
1222        // Snapshot for rollback
1223        let snapshot = self.state.snapshot();
1224        let transition_count = self.state.transition_count();
1225
1226        let mut results: Vec<ActionResult> = Vec::new();
1227        // Per-action state snapshots captured before execution (for TraceEvent.state_before).
1228        let mut state_before_map: HashMap<String, HashMap<String, Value>> = HashMap::new();
1229        let mut aborted = false;
1230        let mut budget_exceeded = false;
1231        let mut total_retries: u32 = 0;
1232
1233        // Running cost counters for budget enforcement
1234        let mut running_tool_calls: u32 = 0;
1235        let mut running_actions: u32 = 0;
1236        let mut running_duration_ms: f64 = 0.0;
1237
1238        // Snapshot the budget once
1239        let budget = self.cost_budget.read().await.clone();
1240
1241        // Build DAG
1242        let levels = build_dag(&proposal.actions);
1243
1244        let mut canceled = false;
1245        for level in &levels {
1246            // Cooperative cancellation check at the level boundary.
1247            // Actions already in flight aren't interrupted (we can't
1248            // safely cancel a tool call dispatched to a user-provided
1249            // executor), but every action that hadn't started runs
1250            // is recorded as canceled with a clear reason.
1251            if !canceled {
1252                if let Some(token) = cancel {
1253                    if token.is_cancelled() {
1254                        canceled = true;
1255                    }
1256                }
1257            }
1258            if canceled {
1259                for &idx in level {
1260                    results.push(canceled_result(
1261                        &proposal.actions[idx].id,
1262                        "cancellation requested by caller",
1263                    ));
1264                }
1265                continue;
1266            }
1267            if aborted || budget_exceeded {
1268                let skip_reason = if budget_exceeded {
1269                    "cost budget exceeded"
1270                } else {
1271                    "skipped due to earlier abort"
1272                };
1273                for &idx in level {
1274                    results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1275                }
1276                continue;
1277            }
1278
1279            // Check if any action in this level has ABORT behavior
1280            let has_abort = level
1281                .iter()
1282                .any(|&i| proposal.actions[i].failure_behavior == FailureBehavior::Abort);
1283
1284            if level.len() == 1 || has_abort {
1285                // Sequential execution
1286                for &idx in level {
1287                    if aborted || budget_exceeded {
1288                        let skip_reason = if budget_exceeded {
1289                            "cost budget exceeded"
1290                        } else {
1291                            "skipped due to abort"
1292                        };
1293                        results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1294                        continue;
1295                    }
1296
1297                    // Budget check before execution
1298                    if let Some(ref b) = budget {
1299                        if let Some(max) = b.max_actions {
1300                            if running_actions >= max {
1301                                budget_exceeded = true;
1302                                results.push(skipped_result(
1303                                    &proposal.actions[idx].id,
1304                                    "cost budget exceeded",
1305                                ));
1306                                continue;
1307                            }
1308                        }
1309                        if let Some(max) = b.max_tool_calls {
1310                            if proposal.actions[idx].action_type == ActionType::ToolCall
1311                                && running_tool_calls >= max
1312                            {
1313                                budget_exceeded = true;
1314                                results.push(skipped_result(
1315                                    &proposal.actions[idx].id,
1316                                    "cost budget exceeded",
1317                                ));
1318                                continue;
1319                            }
1320                        }
1321                        if let Some(max) = b.max_duration_ms {
1322                            if running_duration_ms >= max {
1323                                budget_exceeded = true;
1324                                results.push(skipped_result(
1325                                    &proposal.actions[idx].id,
1326                                    "cost budget exceeded",
1327                                ));
1328                                continue;
1329                            }
1330                        }
1331                    }
1332
1333                    state_before_map.insert(
1334                        proposal.actions[idx].id.clone(),
1335                        snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1336                    );
1337                    let (ar, action_retries) = self
1338                        .process_action(
1339                            &proposal.actions[idx],
1340                            &proposal.id,
1341                            &trace_id,
1342                            &root_span_id,
1343                            session_id,
1344                            scope,
1345                        )
1346                        .await;
1347                    total_retries += action_retries;
1348
1349                    // Update running counters
1350                    if ar.status == ActionStatus::Succeeded
1351                        && proposal.actions[idx].action_type == ActionType::ToolCall
1352                    {
1353                        running_tool_calls += 1;
1354                    }
1355                    if ar.status != ActionStatus::Skipped {
1356                        running_actions += 1;
1357                    }
1358                    if let Some(d) = ar.duration_ms {
1359                        running_duration_ms += d;
1360                    }
1361
1362                    if ar.status == ActionStatus::Failed
1363                        && proposal.actions[idx].failure_behavior == FailureBehavior::Abort
1364                    {
1365                        aborted = true;
1366                    }
1367                    results.push(ar);
1368                }
1369            } else {
1370                // Concurrent execution via futures::join_all
1371                // Snapshot only relevant keys per action (all see same pre-level state)
1372                for &idx in level {
1373                    state_before_map.insert(
1374                        proposal.actions[idx].id.clone(),
1375                        snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1376                    );
1377                }
1378                let futs: Vec<_> = level
1379                    .iter()
1380                    .map(|&idx| {
1381                        self.process_action(
1382                            &proposal.actions[idx],
1383                            &proposal.id,
1384                            &trace_id,
1385                            &root_span_id,
1386                            session_id,
1387                            scope,
1388                        )
1389                    })
1390                    .collect();
1391                let level_results = futures::future::join_all(futs).await;
1392
1393                for (i, (ar, action_retries)) in level_results.into_iter().enumerate() {
1394                    let idx = level[i];
1395                    total_retries += action_retries;
1396                    if ar.status == ActionStatus::Succeeded
1397                        && proposal.actions[idx].action_type == ActionType::ToolCall
1398                    {
1399                        running_tool_calls += 1;
1400                    }
1401                    if ar.status != ActionStatus::Skipped {
1402                        running_actions += 1;
1403                    }
1404                    if let Some(d) = ar.duration_ms {
1405                        running_duration_ms += d;
1406                    }
1407                    results.push(ar);
1408                }
1409            }
1410        }
1411
1412        // Handle rollback
1413        if aborted {
1414            self.state.restore(snapshot.clone(), transition_count);
1415
1416            let mut log = self.log.lock().await;
1417            log.append(
1418                EventKind::StateSnapshot,
1419                None,
1420                Some(&proposal.id),
1421                [(
1422                    "state".to_string(),
1423                    serde_json::to_value(&snapshot).unwrap_or_default(),
1424                )]
1425                .into(),
1426            );
1427            log.append(
1428                EventKind::StateRollback,
1429                None,
1430                Some(&proposal.id),
1431                [(
1432                    "rolled_back_to".to_string(),
1433                    Value::from("pre-proposal snapshot"),
1434                )]
1435                .into(),
1436            );
1437
1438            // Clear idempotency cache for rolled-back actions
1439            let mut cache = self.idempotency_cache.lock().await;
1440            for r in &results {
1441                if r.status == ActionStatus::Succeeded {
1442                    for action in &proposal.actions {
1443                        if action.id == r.action_id && action.idempotent {
1444                            cache.remove(&idempotency_key(action));
1445                        }
1446                    }
1447                }
1448            }
1449        }
1450
1451        // Sort results to match original action order
1452        let action_order: HashMap<String, usize> = proposal
1453            .actions
1454            .iter()
1455            .enumerate()
1456            .map(|(i, a)| (a.id.clone(), i))
1457            .collect();
1458        results.sort_by_key(|r| {
1459            action_order
1460                .get(&r.action_id)
1461                .copied()
1462                .unwrap_or(usize::MAX)
1463        });
1464
1465        // Compute cost summary from results
1466        let mut cost = CostSummary::default();
1467        for r in &results {
1468            let action = action_order
1469                .get(&r.action_id)
1470                .and_then(|&i| proposal.actions.get(i));
1471            match r.status {
1472                ActionStatus::Succeeded => {
1473                    cost.actions_executed += 1;
1474                    if let Some(a) = action {
1475                        if a.action_type == ActionType::ToolCall {
1476                            cost.tool_calls += 1;
1477                        }
1478                    }
1479                }
1480                ActionStatus::Failed | ActionStatus::Rejected => {
1481                    cost.actions_executed += 1;
1482                }
1483                ActionStatus::Skipped => {
1484                    cost.actions_skipped += 1;
1485                }
1486                _ => {}
1487            }
1488            if let Some(d) = r.duration_ms {
1489                cost.total_duration_ms += d;
1490            }
1491        }
1492
1493        // Set retries from inline counter
1494        cost.retries = total_retries;
1495
1496        // End root span — Ok if no abort, Error if aborted
1497        {
1498            let span_status = if aborted {
1499                SpanStatus::Error
1500            } else {
1501                SpanStatus::Ok
1502            };
1503            let mut log = self.log.lock().await;
1504            log.end_span(&root_span_id, span_status);
1505        }
1506
1507        let proposal_result = ProposalResult {
1508            proposal_id: proposal.id.clone(),
1509            results,
1510            cost,
1511        };
1512
1513        // Post-execution: auto-distill skills from this execution trace
1514        if self.auto_distill {
1515            if let Some(ref memgine) = self.memgine {
1516                // Convert results to TraceEvents for distillation
1517                let trace_events: Vec<car_memgine::TraceEvent> = proposal_result
1518                    .results
1519                    .iter()
1520                    .map(|r| {
1521                        let kind = match r.status {
1522                            ActionStatus::Succeeded => "action_succeeded",
1523                            ActionStatus::Failed => "action_failed",
1524                            ActionStatus::Rejected => "action_rejected",
1525                            ActionStatus::Skipped => "action_skipped",
1526                            _ => "unknown",
1527                        };
1528                        // Find the matching action to get the tool name
1529                        let tool = proposal
1530                            .actions
1531                            .iter()
1532                            .find(|a| a.id == r.action_id)
1533                            .and_then(|a| a.tool.clone());
1534                        let mut data = serde_json::Map::new();
1535                        if let Some(ref e) = r.error {
1536                            data.insert("error".into(), Value::from(e.as_str()));
1537                        }
1538                        if let Some(ref o) = r.output {
1539                            data.insert("output".into(), o.clone());
1540                        }
1541                        car_memgine::TraceEvent {
1542                            kind: kind.to_string(),
1543                            action_id: Some(r.action_id.clone()),
1544                            tool,
1545                            data: Value::Object(data),
1546                            duration_ms: r.duration_ms,
1547                            reward: match r.status {
1548                                ActionStatus::Succeeded => Some(1.0),
1549                                ActionStatus::Failed | ActionStatus::Rejected => Some(0.0),
1550                                _ => None,
1551                            },
1552                            ..Default::default()
1553                        }
1554                    })
1555                    .collect();
1556
1557                let mut engine = memgine.lock().await;
1558                let skills = engine.distill_skills(&trace_events).await;
1559                if !skills.is_empty() {
1560                    let count = skills.len();
1561                    // Tenant-aware ingest (Parslee-ai/car#187 phase 3-D).
1562                    // When the proposal carried a tenant, distilled
1563                    // skills get stamped so the same tenant's next
1564                    // build_context can find them. Unscoped proposals
1565                    // continue to land in the global skill pool.
1566                    let tenant = scope.and_then(|s| s.tenant_id.as_deref());
1567                    engine.scoped(tenant).ingest_distilled_skills(&skills);
1568
1569                    // Log the distillation event
1570                    let mut log = self.log.lock().await;
1571                    log.append(
1572                        EventKind::SkillDistilled,
1573                        None,
1574                        Some(&proposal_result.proposal_id),
1575                        [
1576                            ("skills_count".to_string(), Value::from(count)),
1577                            (
1578                                "skill_names".to_string(),
1579                                Value::from(
1580                                    skills.iter().map(|s| s.name.as_str()).collect::<Vec<_>>(),
1581                                ),
1582                            ),
1583                        ]
1584                        .into(),
1585                    );
1586
1587                    // Check if any domains need evolution
1588                    let threshold = engine.evolution_threshold();
1589                    let domains = engine.domains_needing_evolution(threshold);
1590                    for domain in &domains {
1591                        // Collect failed events for this domain
1592                        let failed: Vec<car_memgine::TraceEvent> = trace_events
1593                            .iter()
1594                            .filter(|e| {
1595                                matches!(e.kind.as_str(), "action_failed" | "action_rejected")
1596                            })
1597                            .cloned()
1598                            .collect();
1599                        if !failed.is_empty() {
1600                            let evolved = engine.evolve_skills(&failed, domain).await;
1601                            if !evolved.is_empty() {
1602                                log.append(
1603                                    EventKind::EvolutionTriggered,
1604                                    None,
1605                                    Some(&proposal_result.proposal_id),
1606                                    [
1607                                        ("domain".to_string(), Value::from(domain.as_str())),
1608                                        ("new_skills".to_string(), Value::from(evolved.len())),
1609                                    ]
1610                                    .into(),
1611                                );
1612                            }
1613                        }
1614                    }
1615                }
1616            }
1617        }
1618
1619        (proposal_result, state_before_map)
1620    }
1621
1622    /// Process a single action: idempotency → validate → policy → execute.
1623    /// Returns (ActionResult, retries_count).
1624    async fn process_action(
1625        &self,
1626        action: &Action,
1627        proposal_id: &str,
1628        trace_id: &str,
1629        parent_span_id: &str,
1630        session_id: Option<&str>,
1631        scope: Option<&crate::scope::RuntimeScope>,
1632    ) -> (ActionResult, u32) {
1633        // Derive action type name for span naming
1634        let action_type_name = serde_json::to_string(&action.action_type)
1635            .unwrap_or_default()
1636            .trim_matches('"')
1637            .to_string();
1638        let span_name = format!("action.{}", action_type_name);
1639
1640        // Begin child span for this action
1641        let action_span_id = {
1642            let mut attrs: HashMap<String, Value> = HashMap::new();
1643            attrs.insert("action_id".to_string(), Value::from(action.id.as_str()));
1644            if let Some(ref tool) = action.tool {
1645                attrs.insert("tool".to_string(), Value::from(tool.as_str()));
1646            }
1647            let mut log = self.log.lock().await;
1648            log.begin_span(&span_name, trace_id, Some(parent_span_id), attrs)
1649        };
1650
1651        // Execute the action pipeline and capture result
1652        let (result, retries) = self
1653            .process_action_inner(action, proposal_id, session_id, scope)
1654            .await;
1655
1656        // End action span based on result status
1657        let span_status = match result.status {
1658            ActionStatus::Succeeded => SpanStatus::Ok,
1659            ActionStatus::Failed | ActionStatus::Rejected => SpanStatus::Error,
1660            _ => SpanStatus::Unset,
1661        };
1662        {
1663            let mut log = self.log.lock().await;
1664            log.end_span(&action_span_id, span_status);
1665        }
1666
1667        (result, retries)
1668    }
1669
1670    /// Inner action processing: idempotency -> validate -> policy -> execute.
1671    /// Returns (ActionResult, retries_count).
1672    #[instrument(
1673        name = "action.process",
1674        skip_all,
1675        fields(
1676            action_id = %action.id,
1677            action_type = ?action.action_type,
1678            tool = action.tool.as_deref().unwrap_or("none"),
1679        )
1680    )]
1681    async fn process_action_inner(
1682        &self,
1683        action: &Action,
1684        proposal_id: &str,
1685        session_id: Option<&str>,
1686        scope: Option<&crate::scope::RuntimeScope>,
1687    ) -> (ActionResult, u32) {
1688        // Idempotency check
1689        if action.idempotent {
1690            let key = idempotency_key(action);
1691            let cache = self.idempotency_cache.lock().await;
1692            if let Some(cached) = cache.get(&key) {
1693                let mut log = self.log.lock().await;
1694                log.append(
1695                    EventKind::ActionDeduplicated,
1696                    Some(&action.id),
1697                    Some(proposal_id),
1698                    [(
1699                        "cached_action_id".to_string(),
1700                        Value::from(cached.action_id.as_str()),
1701                    )]
1702                    .into(),
1703                );
1704                return (
1705                    ActionResult {
1706                        action_id: action.id.clone(),
1707                        status: cached.status.clone(),
1708                        output: cached.output.clone(),
1709                        error: cached.error.clone(),
1710                        state_changes: cached.state_changes.clone(),
1711                        duration_ms: Some(0.0),
1712                        timestamp: chrono::Utc::now(),
1713                    },
1714                    0,
1715                );
1716            }
1717        }
1718
1719        // Capability check
1720        {
1721            let caps = self.capabilities.read().await;
1722            if let Some(ref cap) = *caps {
1723                // Check tool capability for ToolCall actions
1724                if action.action_type == ActionType::ToolCall {
1725                    if let Some(ref tool_name) = action.tool {
1726                        if !cap.tool_allowed(tool_name) {
1727                            let mut log = self.log.lock().await;
1728                            log.append(
1729                                EventKind::ActionRejected,
1730                                Some(&action.id),
1731                                Some(proposal_id),
1732                                HashMap::new(),
1733                            );
1734                            return (
1735                                rejected_result(
1736                                    &action.id,
1737                                    format!("capability denied: tool '{}' not allowed", tool_name),
1738                                ),
1739                                0,
1740                            );
1741                        }
1742                    }
1743                }
1744
1745                // Check state key capability for StateWrite/StateRead actions
1746                if action.action_type == ActionType::StateWrite
1747                    || action.action_type == ActionType::StateRead
1748                {
1749                    if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
1750                        if !cap.state_key_allowed(key) {
1751                            let mut log = self.log.lock().await;
1752                            log.append(
1753                                EventKind::ActionRejected,
1754                                Some(&action.id),
1755                                Some(proposal_id),
1756                                HashMap::new(),
1757                            );
1758                            return (
1759                                rejected_result(
1760                                    &action.id,
1761                                    format!("capability denied: state key '{}' not allowed", key),
1762                                ),
1763                                0,
1764                            );
1765                        }
1766                    }
1767                }
1768            }
1769        }
1770
1771        // Validate
1772        let tools = self.tools.read().await;
1773        let validation = validate_action(action, &self.state, &tools);
1774        drop(tools);
1775
1776        if !validation.valid() {
1777            let error = validation
1778                .errors
1779                .iter()
1780                .map(|e| e.reason.as_str())
1781                .collect::<Vec<_>>()
1782                .join("; ");
1783            let mut log = self.log.lock().await;
1784            log.append(
1785                EventKind::ActionRejected,
1786                Some(&action.id),
1787                Some(proposal_id),
1788                HashMap::new(),
1789            );
1790            return (rejected_result(&action.id, error), 0);
1791        }
1792
1793        // Policy check — global registry plus, when the proposal is
1794        // executed under a session, that session's registry. Both
1795        // layers must pass for the action to proceed; session
1796        // policies are additive deny rules — they can deny what
1797        // global allows but cannot allow what global denies.
1798        {
1799            let mut violations = {
1800                let policies = self.policies.read().await;
1801                policies.check(action, &self.state)
1802            };
1803            if let Some(sid) = session_id {
1804                // Snapshot the per-session engine handle out from under
1805                // the outer registry lock so the inner check holds
1806                // only the engine's own RwLock — preserves the
1807                // documented lock-ordering discipline.
1808                let session_engine = {
1809                    let sessions = self.session_policies.read().await;
1810                    sessions.get(sid).cloned()
1811                };
1812                if let Some(engine) = session_engine {
1813                    let engine = engine.read().await;
1814                    violations.extend(engine.check(action, &self.state));
1815                } else {
1816                    // Unknown session id — refuse the action rather
1817                    // than silently fall back to global-only. A
1818                    // proposal submitted under a closed session
1819                    // shouldn't run with looser rules than the caller
1820                    // intended.
1821                    let mut log = self.log.lock().await;
1822                    log.append(
1823                        EventKind::PolicyViolation,
1824                        Some(&action.id),
1825                        Some(proposal_id),
1826                        HashMap::new(),
1827                    );
1828                    return (
1829                        rejected_result(
1830                            &action.id,
1831                            format!(
1832                                "unknown session id '{sid}' — open one via Runtime::open_session before executing under a session"
1833                            ),
1834                        ),
1835                        0,
1836                    );
1837                }
1838            }
1839            if !violations.is_empty() {
1840                let error = violations
1841                    .iter()
1842                    .map(|v| format!("policy '{}': {}", v.policy_name, v.reason))
1843                    .collect::<Vec<_>>()
1844                    .join("; ");
1845                let mut log = self.log.lock().await;
1846                log.append(
1847                    EventKind::PolicyViolation,
1848                    Some(&action.id),
1849                    Some(proposal_id),
1850                    HashMap::new(),
1851                );
1852                return (rejected_result(&action.id, error), 0);
1853            }
1854        }
1855
1856        // Validated
1857        {
1858            let mut log = self.log.lock().await;
1859            log.append(
1860                EventKind::ActionValidated,
1861                Some(&action.id),
1862                Some(proposal_id),
1863                HashMap::new(),
1864            );
1865        }
1866
1867        // Execute with retry
1868        let (result, retries) = self.execute_with_retry(action, proposal_id, scope).await;
1869
1870        // Cache idempotent results
1871        if action.idempotent && result.status == ActionStatus::Succeeded {
1872            let mut cache = self.idempotency_cache.lock().await;
1873            cache.insert(idempotency_key(action), result.clone());
1874        }
1875
1876        tracing::info!(
1877            status = ?result.status,
1878            duration_ms = result.duration_ms,
1879            "action completed"
1880        );
1881
1882        (result, retries)
1883    }
1884
1885    /// Execute with retry logic and timeout.
1886    /// Returns (ActionResult, retries_count).
1887    async fn execute_with_retry(
1888        &self,
1889        action: &Action,
1890        proposal_id: &str,
1891        scope: Option<&crate::scope::RuntimeScope>,
1892    ) -> (ActionResult, u32) {
1893        let max_attempts = if action.failure_behavior == FailureBehavior::Retry {
1894            action.max_retries + 1
1895        } else {
1896            1
1897        };
1898
1899        let mut last_error: Option<String> = None;
1900        let mut retries: u32 = 0;
1901
1902        for attempt in 0..max_attempts {
1903            if attempt > 0 {
1904                retries += 1;
1905                let delay = RETRY_BASE_DELAY_MS * RETRY_BACKOFF_FACTOR.pow(attempt as u32 - 1);
1906                tokio::time::sleep(Duration::from_millis(delay)).await;
1907                let mut log = self.log.lock().await;
1908                log.append(
1909                    EventKind::ActionRetrying,
1910                    Some(&action.id),
1911                    Some(proposal_id),
1912                    [("attempt".to_string(), Value::from(attempt + 1))].into(),
1913                );
1914            }
1915
1916            {
1917                let mut log = self.log.lock().await;
1918                log.append(
1919                    EventKind::ActionExecuting,
1920                    Some(&action.id),
1921                    Some(proposal_id),
1922                    HashMap::new(),
1923                );
1924            }
1925
1926            let start = std::time::Instant::now();
1927            let transitions_before = self.state.transition_count();
1928
1929            // Execute with optional timeout
1930            let exec_result = if let Some(timeout_ms) = action.timeout_ms {
1931                match timeout(
1932                    Duration::from_millis(timeout_ms),
1933                    self.dispatch(action, scope),
1934                )
1935                .await
1936                {
1937                    Ok(r) => r,
1938                    Err(_) => Err(format!("action timed out after {}ms", timeout_ms)),
1939                }
1940            } else {
1941                self.dispatch(action, scope).await
1942            };
1943
1944            let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
1945
1946            match exec_result {
1947                Ok(output) => {
1948                    // Commit post-effects. Tenant-scoped when the
1949                    // proposal carries a scope (Parslee-ai/car#187
1950                    // phase 3) — distinct tenants write to disjoint
1951                    // namespaces so concurrent multi-tenant proposals
1952                    // don't trample each other's keys.
1953                    let tenant_for_effects = scope.and_then(|s| s.tenant_id.as_deref());
1954                    let scoped_state = self.state.scoped(tenant_for_effects);
1955                    let mut state_changes: HashMap<String, Value> = HashMap::new();
1956                    for (key, value) in &action.expected_effects {
1957                        scoped_state.set(key, value.clone(), &action.id);
1958                        state_changes.insert(key.clone(), value.clone());
1959                    }
1960
1961                    // Capture state changes from dispatch
1962                    for t in self.state.transitions_since(transitions_before) {
1963                        if !state_changes.contains_key(&t.key) {
1964                            if let Some(v) = t.new_value {
1965                                state_changes.insert(t.key.clone(), v);
1966                            }
1967                        }
1968                    }
1969
1970                    let mut log = self.log.lock().await;
1971                    log.append(
1972                        EventKind::ActionSucceeded,
1973                        Some(&action.id),
1974                        Some(proposal_id),
1975                        [("duration_ms".to_string(), Value::from(duration_ms))].into(),
1976                    );
1977
1978                    if !state_changes.is_empty() {
1979                        log.append(
1980                            EventKind::StateChanged,
1981                            Some(&action.id),
1982                            Some(proposal_id),
1983                            [(
1984                                "changes".to_string(),
1985                                serde_json::to_value(&state_changes).unwrap_or_default(),
1986                            )]
1987                            .into(),
1988                        );
1989                    }
1990
1991                    return (
1992                        ActionResult {
1993                            action_id: action.id.clone(),
1994                            status: ActionStatus::Succeeded,
1995                            output: Some(output),
1996                            error: None,
1997                            state_changes,
1998                            duration_ms: Some(duration_ms),
1999                            timestamp: chrono::Utc::now(),
2000                        },
2001                        retries,
2002                    );
2003                }
2004                Err(e) => {
2005                    last_error = Some(e.clone());
2006                    let mut log = self.log.lock().await;
2007                    log.append(
2008                        EventKind::ActionFailed,
2009                        Some(&action.id),
2010                        Some(proposal_id),
2011                        [
2012                            ("error".to_string(), Value::from(e.as_str())),
2013                            ("attempt".to_string(), Value::from(attempt + 1)),
2014                        ]
2015                        .into(),
2016                    );
2017                }
2018            }
2019        }
2020
2021        // All attempts exhausted
2022        if action.failure_behavior == FailureBehavior::Skip {
2023            return (
2024                skipped_result(
2025                    &action.id,
2026                    last_error.as_deref().unwrap_or("all attempts exhausted"),
2027                ),
2028                retries,
2029            );
2030        }
2031
2032        (
2033            ActionResult {
2034                action_id: action.id.clone(),
2035                status: ActionStatus::Failed,
2036                output: None,
2037                error: last_error,
2038                state_changes: HashMap::new(),
2039                duration_ms: None,
2040                timestamp: chrono::Utc::now(),
2041            },
2042            retries,
2043        )
2044    }
2045
2046    /// Dispatch an action to the appropriate handler.
2047    ///
2048    /// `scope` is the per-execution caller / tenant surface
2049    /// (Parslee-ai/car#187 phase 3). When the scope carries a
2050    /// tenant id, state R/W operations (`StateWrite`, `StateRead`,
2051    /// `Assertion`) route through `StateStore::scoped(tenant_id)`
2052    /// so distinct tenants can't see each other's keys. Unscoped
2053    /// proposals get the legacy flat-namespace behaviour
2054    /// automatically.
2055    async fn dispatch(
2056        &self,
2057        action: &Action,
2058        scope: Option<&crate::scope::RuntimeScope>,
2059    ) -> Result<Value, String> {
2060        match action.action_type {
2061            ActionType::ToolCall => {
2062                let tool_name = action.tool.as_deref().ok_or("tool_call has no tool")?;
2063                let params = Value::Object(
2064                    action
2065                        .parameters
2066                        .iter()
2067                        .map(|(k, v)| (k.clone(), v.clone()))
2068                        .collect(),
2069                );
2070
2071                // Check cross-proposal result cache.
2072                if let Some(cached) = self.result_cache.get(tool_name, &params).await {
2073                    return Ok(cached);
2074                }
2075
2076                // Apply rate limit backpressure before executing.
2077                self.rate_limiter.acquire(tool_name).await;
2078
2079                // Try built-in inference tools first when the inference engine is available.
2080                if matches!(
2081                    tool_name,
2082                    "infer" | "infer.grounded" | "embed" | "classify" | "transcribe" | "synthesize"
2083                ) {
2084                    if let Some(ref engine) = self.inference_engine {
2085                        // For "infer.grounded" or "infer" with memgine available,
2086                        // build context from memory and attach it to the request.
2087                        let params = {
2088                            let should_ground =
2089                                tool_name == "infer.grounded" || tool_name == "infer";
2090                            if should_ground {
2091                                if let Some(ref memgine) = self.memgine {
2092                                    if let Some(prompt) =
2093                                        params.get("prompt").and_then(|v| v.as_str())
2094                                    {
2095                                        let ctx = {
2096                                            let mut m = memgine.lock().await;
2097                                            m.build_context(prompt)
2098                                        };
2099                                        if !ctx.is_empty() {
2100                                            let mut p = params.clone();
2101                                            if let Some(obj) = p.as_object_mut() {
2102                                                obj.insert("context".to_string(), Value::from(ctx));
2103                                            }
2104                                            p
2105                                        } else {
2106                                            params
2107                                        }
2108                                    } else {
2109                                        params
2110                                    }
2111                                } else {
2112                                    params
2113                                }
2114                            } else {
2115                                params
2116                            }
2117                        };
2118
2119                        // Route "infer.grounded" to "infer" for the service layer
2120                        let effective_tool = if tool_name == "infer.grounded" {
2121                            "infer"
2122                        } else {
2123                            tool_name
2124                        };
2125                        let result =
2126                            car_inference::service::execute_tool(engine, effective_tool, &params)
2127                                .await
2128                                .map_err(|e| e.to_string());
2129
2130                        if let Ok(ref value) = result {
2131                            self.result_cache
2132                                .put(tool_name, &params, value.clone())
2133                                .await;
2134                        }
2135
2136                        return result;
2137                    }
2138                }
2139
2140                // Built-in memory consolidation tool.
2141                if tool_name == "memory.consolidate" {
2142                    if let Some(ref memgine) = self.memgine {
2143                        let report = {
2144                            let mut m = memgine.lock().await;
2145                            m.consolidate().await
2146                        };
2147                        // Log the consolidation event
2148                        {
2149                            let mut log = self.log.lock().await;
2150                            log.append(
2151                                EventKind::Consolidated,
2152                                None,
2153                                None,
2154                                [
2155                                    (
2156                                        "expired_pruned".to_string(),
2157                                        Value::from(report.expired_pruned),
2158                                    ),
2159                                    (
2160                                        "superseded_gc".to_string(),
2161                                        Value::from(report.superseded_gc),
2162                                    ),
2163                                    (
2164                                        "stale_embeddings_removed".to_string(),
2165                                        Value::from(report.stale_embeddings_removed),
2166                                    ),
2167                                    (
2168                                        "nodes_embedded".to_string(),
2169                                        Value::from(report.nodes_embedded),
2170                                    ),
2171                                    (
2172                                        "domains_evolved".to_string(),
2173                                        Value::from(report.domains_evolved.clone()),
2174                                    ),
2175                                    ("total_nodes".to_string(), Value::from(report.total_nodes)),
2176                                    ("total_edges".to_string(), Value::from(report.total_edges)),
2177                                ]
2178                                .into(),
2179                            );
2180                        }
2181                        return Ok(serde_json::to_value(&report).unwrap_or(Value::Null));
2182                    } else {
2183                        return Err(
2184                            "memory.consolidate requires memgine (attach with with_learning)"
2185                                .into(),
2186                        );
2187                    }
2188                }
2189
2190                // Prefer a configured tool_executor for any tool it claims to handle.
2191                // Fall through to agent_basics only when the configured executor is absent
2192                // or explicitly returns "unknown tool" — this prevents agent_basics' built-in
2193                // read_file/write_file (which resolve paths via std::env::current_dir) from
2194                // silently overriding an executor that carries its own working_dir.
2195                let configured = {
2196                    let guard = self.tool_executor.lock().await;
2197                    guard.as_ref().cloned()
2198                };
2199
2200                if let Some(ref executor) = configured {
2201                    let result = executor
2202                        .execute_with_action(tool_name, &params, &action.id)
2203                        .await;
2204                    let fall_through = matches!(&result, Err(e) if e.starts_with("unknown tool"));
2205                    if !fall_through {
2206                        if let Ok(ref value) = result {
2207                            self.result_cache
2208                                .put(tool_name, &params, value.clone())
2209                                .await;
2210                        }
2211                        return result;
2212                    }
2213                }
2214
2215                if let Some(result) = crate::agent_basics::execute(tool_name, &params).await {
2216                    if let Ok(ref value) = result {
2217                        self.result_cache
2218                            .put(tool_name, &params, value.clone())
2219                            .await;
2220                    }
2221                    return result;
2222                }
2223
2224                Err(format!("no handler for tool '{}'", tool_name))
2225            }
2226            ActionType::StateWrite => {
2227                let key = action
2228                    .parameters
2229                    .get("key")
2230                    .and_then(|v| v.as_str())
2231                    .ok_or("state_write requires 'key' parameter")?;
2232                let value = action
2233                    .parameters
2234                    .get("value")
2235                    .cloned()
2236                    .unwrap_or(Value::Null);
2237                let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2238                self.state.scoped(tenant).set(key, value, &action.id);
2239                Ok(Value::from(format!("written: {}", key)))
2240            }
2241            ActionType::StateRead => {
2242                let key = action
2243                    .parameters
2244                    .get("key")
2245                    .and_then(|v| v.as_str())
2246                    .ok_or("state_read requires 'key' parameter")?;
2247                let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2248                Ok(self.state.scoped(tenant).get(key).unwrap_or(Value::Null))
2249            }
2250            ActionType::Assertion => {
2251                let key = action
2252                    .parameters
2253                    .get("key")
2254                    .and_then(|v| v.as_str())
2255                    .ok_or("assertion requires 'key' parameter")?;
2256                let expected = action
2257                    .parameters
2258                    .get("expected")
2259                    .cloned()
2260                    .unwrap_or(Value::Null);
2261                let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2262                let actual = self.state.scoped(tenant).get(key).unwrap_or(Value::Null);
2263                if actual != expected {
2264                    Err(format!(
2265                        "assertion failed: state['{}'] = {:?}, expected {:?}",
2266                        key, actual, expected
2267                    ))
2268                } else {
2269                    Ok(serde_json::json!({"asserted": key, "value": actual}))
2270                }
2271            }
2272        }
2273    }
2274
2275    // --- Checkpoint and resume ---
2276
2277    /// Save a checkpoint of the current runtime state.
2278    pub async fn save_checkpoint(&self) -> Checkpoint {
2279        let state = self.state.snapshot();
2280        let tools: Vec<String> = self.tools.read().await.keys().cloned().collect();
2281        let log = self.log.lock().await;
2282        let events: Vec<Value> = log
2283            .events()
2284            .iter()
2285            .map(|e| serde_json::to_value(e).unwrap_or_default())
2286            .collect();
2287
2288        Checkpoint {
2289            checkpoint_id: Uuid::new_v4().to_string(),
2290            created_at: chrono::Utc::now(),
2291            state,
2292            events,
2293            tools,
2294            metadata: HashMap::new(),
2295        }
2296    }
2297
2298    /// Save checkpoint to a JSON file.
2299    pub async fn save_checkpoint_to_file(&self, path: &str) -> Result<(), String> {
2300        let checkpoint = self.save_checkpoint().await;
2301        let json = serde_json::to_string_pretty(&checkpoint)
2302            .map_err(|e| format!("serialize error: {}", e))?;
2303        tokio::fs::write(path, json)
2304            .await
2305            .map_err(|e| format!("write error: {}", e))?;
2306        Ok(())
2307    }
2308
2309    /// Load a checkpoint from a JSON file and restore state.
2310    pub async fn load_checkpoint_from_file(&self, path: &str) -> Result<Checkpoint, String> {
2311        let json = tokio::fs::read_to_string(path)
2312            .await
2313            .map_err(|e| format!("read error: {}", e))?;
2314        let checkpoint: Checkpoint =
2315            serde_json::from_str(&json).map_err(|e| format!("deserialize error: {}", e))?;
2316        self.restore_checkpoint(&checkpoint).await;
2317        Ok(checkpoint)
2318    }
2319
2320    /// Restore runtime state from a checkpoint.
2321    pub async fn restore_checkpoint(&self, checkpoint: &Checkpoint) {
2322        // Replace state completely — don't merge, don't create synthetic transitions
2323        self.state.replace_all(checkpoint.state.clone());
2324        // Clear idempotency cache — stale results from pre-checkpoint execution
2325        // must not bypass validation/policy on the restored state
2326        self.idempotency_cache.lock().await.clear();
2327        // Restore tools (as name-only schemas; full schemas are not persisted in checkpoint)
2328        let mut tools = self.tools.write().await;
2329        tools.clear();
2330        for tool_name in &checkpoint.tools {
2331            let schema = ToolSchema {
2332                name: tool_name.clone(),
2333                description: String::new(),
2334                parameters: serde_json::Value::Object(Default::default()),
2335                returns: None,
2336                idempotent: false,
2337                cache_ttl_secs: None,
2338                rate_limit: None,
2339            };
2340            tools.insert(tool_name.clone(), schema);
2341        }
2342    }
2343
2344    /// Register a subprocess tool and set up the subprocess executor.
2345    /// If no executor exists, creates a new SubprocessToolExecutor.
2346    /// If one already exists, creates a new SubprocessToolExecutor with the
2347    /// existing executor as fallback.
2348    pub async fn register_subprocess_tool(
2349        &self,
2350        name: &str,
2351        tool: crate::subprocess::SubprocessTool,
2352    ) {
2353        use crate::subprocess::SubprocessToolExecutor;
2354
2355        let schema = ToolSchema {
2356            name: name.to_string(),
2357            description: format!("Subprocess tool: {}", tool.command),
2358            parameters: serde_json::Value::Object(Default::default()),
2359            returns: None,
2360            idempotent: false,
2361            cache_ttl_secs: None,
2362            rate_limit: None,
2363        };
2364        self.register_tool_schema(schema).await;
2365
2366        let mut guard = self.tool_executor.lock().await;
2367        let mut executor = match guard.take() {
2368            Some(existing) => {
2369                let mut sub = SubprocessToolExecutor::new();
2370                sub = sub.with_fallback(existing);
2371                sub
2372            }
2373            None => SubprocessToolExecutor::new(),
2374        };
2375        executor.register(name, tool);
2376        *guard = Some(std::sync::Arc::new(executor));
2377    }
2378}
2379
2380impl Default for Runtime {
2381    fn default() -> Self {
2382        Self::new()
2383    }
2384}