Skip to main content

car_engine/
executor.rs

1//! Core execution engine — propose → validate → execute → commit.
2
3use crate::cache::ResultCache;
4use crate::capabilities::CapabilitySet;
5use crate::checkpoint::Checkpoint;
6use crate::rate_limit::{RateLimit, RateLimiter};
7use car_eventlog::{EventKind, EventLog, SpanStatus};
8use car_ir::{
9    build_dag, Action, ActionProposal, ActionResult, ActionStatus, ActionType, CostSummary,
10    FailureBehavior, ProposalResult, ToolSchema,
11};
12use car_policy::PolicyEngine;
13use car_state::StateStore;
14use car_validator::validate_action;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
20use tokio::time::timeout;
21use tracing::instrument;
22use uuid::Uuid;
23
24/// Retry backoff constants.
25const RETRY_BASE_DELAY_MS: u64 = 100;
26const RETRY_BACKOFF_FACTOR: u64 = 2;
27
28// ---------------------------------------------------------------------------
29// Replan types — failure recovery via model callback
30// ---------------------------------------------------------------------------
31
32/// Callback trait for replanning failed proposals.
33/// Implement this to let the runtime ask the model for an alternative plan
34/// when a proposal aborts.
35#[async_trait::async_trait]
36pub trait ReplanCallback: Send + Sync {
37    async fn replan(&self, ctx: &ReplanContext) -> Result<ActionProposal, String>;
38}
39
40/// Context provided to the replan callback so the model can generate an alternative.
41#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42pub struct ReplanContext {
43    /// Original proposal ID.
44    pub proposal_id: String,
45    /// Which replan attempt this is (1-indexed; attempt 1 = first replan after initial failure).
46    pub attempt: u32,
47    /// Actions that failed and caused the abort.
48    pub failed_actions: Vec<FailedActionSummary>,
49    /// Action IDs that succeeded before the abort (now rolled back).
50    pub completed_action_ids: Vec<String>,
51    /// State snapshot after rollback.
52    pub state_snapshot: HashMap<String, Value>,
53    /// How many replans remain.
54    pub replans_remaining: u32,
55    /// Original proposal source (model name, agent, etc.).
56    pub original_source: String,
57    /// Total actions in the original proposal.
58    pub original_action_count: usize,
59    /// The original goal/context from the proposal (for generating alternatives).
60    pub original_context: HashMap<String, Value>,
61}
62
63/// Summary of a failed action, included in ReplanContext.
64#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
65pub struct FailedActionSummary {
66    pub action_id: String,
67    pub tool: Option<String>,
68    pub error: String,
69    pub parameters: HashMap<String, Value>,
70}
71
72/// Configuration for the replan loop.
73#[derive(Debug, Clone)]
74pub struct ReplanConfig {
75    /// Maximum number of replan attempts. 0 = disabled (default).
76    pub max_replans: u32,
77    /// Delay in milliseconds between replan attempts. Prevents burning through
78    /// attempts instantly if the model returns garbage fast. 0 = no delay.
79    pub delay_ms: u64,
80    /// If true, replan proposals are scored via car-planner's verify() before
81    /// execution. Proposals with errors are rejected without executing.
82    /// Prevents the engine from running a worse plan than the one that failed.
83    pub verify_before_execute: bool,
84}
85
86impl Default for ReplanConfig {
87    fn default() -> Self {
88        Self {
89            max_replans: 0,
90            delay_ms: 0,
91            verify_before_execute: true,
92        }
93    }
94}
95
96/// Trait for tool execution. Implement this to provide tools to the runtime.
97///
98/// In-process: implement directly with function calls.
99/// Daemon mode: implement by sending JSON-RPC to the client.
100#[async_trait::async_trait]
101pub trait ToolExecutor: Send + Sync {
102    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String>;
103
104    /// Variant that also carries the originating proposal `Action.id`.
105    /// WS-based executors (`car-server-core::WsToolExecutor`) use it so
106    /// the daemon-initiated `tools.execute` request to the client
107    /// carries the same id the host's process-wide handler is keyed on
108    /// — without this round-trip the host can't disambiguate concurrent
109    /// callbacks for the same tool (Parslee-ai/car-releases#43
110    /// follow-up). In-process executors that don't need it can keep the
111    /// default forward to [`execute`].
112    async fn execute_with_action(
113        &self,
114        tool: &str,
115        params: &Value,
116        _action_id: &str,
117    ) -> Result<Value, String> {
118        self.execute(tool, params).await
119    }
120}
121
122/// Deterministic key for idempotency deduplication.
123fn idempotency_key(action: &Action) -> String {
124    let sorted: std::collections::BTreeMap<_, _> = action.parameters.iter().collect();
125    let params = serde_json::to_string(&sorted).unwrap_or_default();
126    format!(
127        "{}:{}:{}",
128        serde_json::to_string(&action.action_type).unwrap_or_default(),
129        action.tool.as_deref().unwrap_or(""),
130        params
131    )
132}
133
134fn rejected_result(action_id: &str, error: String) -> ActionResult {
135    ActionResult {
136        action_id: action_id.to_string(),
137        status: ActionStatus::Rejected,
138        output: None,
139        error: Some(error),
140        state_changes: HashMap::new(),
141        duration_ms: None,
142        timestamp: chrono::Utc::now(),
143    }
144}
145
146/// Capture only the state keys relevant to an action (state_dependencies + expected_effects).
147/// Returns an empty map if the action declares no relevant keys (e.g., tool calls
148/// that don't interact with state).
149fn snapshot_relevant_keys(
150    state: &car_state::StateStore,
151    action: &Action,
152) -> HashMap<String, Value> {
153    let mut keys: std::collections::HashSet<&str> = std::collections::HashSet::new();
154    for dep in &action.state_dependencies {
155        keys.insert(dep.as_str());
156    }
157    for key in action.expected_effects.keys() {
158        keys.insert(key.as_str());
159    }
160    // For state_write actions, capture the key being written
161    if action.action_type == ActionType::StateWrite {
162        if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
163            keys.insert(key);
164        }
165    }
166
167    if keys.is_empty() {
168        // No declared state interaction — skip snapshot (empty map)
169        return HashMap::new();
170    }
171
172    keys.iter()
173        .filter_map(|&k| state.get(k).map(|v| (k.to_string(), v)))
174        .collect()
175}
176
177fn skipped_result(action_id: &str, reason: &str) -> ActionResult {
178    ActionResult {
179        action_id: action_id.to_string(),
180        status: ActionStatus::Skipped,
181        output: None,
182        error: Some(reason.to_string()),
183        state_changes: HashMap::new(),
184        duration_ms: None,
185        timestamp: chrono::Utc::now(),
186    }
187}
188
189/// Prefix on the `error` field of an `ActionResult` that distinguishes
190/// "the user pulled the plug" from "earlier abort cascaded." The
191/// `ActionStatus` itself is `Skipped` in both cases (introducing a
192/// new variant ripples through every IR consumer + FFI binding); the
193/// prefix lets callers like the A2A bridge tell the cases apart
194/// without string-matching a magic literal.
195pub const CANCELED_PREFIX: &str = "canceled: ";
196
197/// Result for an action that didn't run because the proposal was
198/// cancelled mid-flight. Distinct from `Skipped` so callers can tell
199/// "didn't run because of an earlier abort" from "didn't run because
200/// the user pulled the plug."
201fn canceled_result(action_id: &str, reason: &str) -> ActionResult {
202    ActionResult {
203        action_id: action_id.to_string(),
204        status: ActionStatus::Skipped,
205        output: None,
206        error: Some(format!("{}{}", CANCELED_PREFIX, reason)),
207        state_changes: HashMap::new(),
208        duration_ms: None,
209        timestamp: chrono::Utc::now(),
210    }
211}
212
213/// Format a tool result for feeding back to a model.
214pub fn format_tool_result(result: &ActionResult) -> String {
215    match result.status {
216        ActionStatus::Succeeded => match &result.output {
217            Some(v) => serde_json::to_string(v).unwrap_or_else(|_| v.to_string()),
218            None => String::new(),
219        },
220        ActionStatus::Rejected => format!("[REJECTED] {}", result.error.as_deref().unwrap_or("")),
221        ActionStatus::Failed => format!("[FAILED] {}", result.error.as_deref().unwrap_or("")),
222        _ => format!(
223            "[{:?}] {}",
224            result.status,
225            result.error.as_deref().unwrap_or("")
226        ),
227    }
228}
229
230/// Budget constraints for proposal execution.
231#[derive(Debug, Clone)]
232pub struct CostBudget {
233    pub max_tool_calls: Option<u32>,
234    pub max_duration_ms: Option<f64>,
235    pub max_actions: Option<u32>,
236}
237
238/// Common Agent Runtime — deterministic execution layer.
239///
240/// Lock ordering discipline (never hold multiple simultaneously, never hold sync locks across .await):
241/// 1. capabilities (RwLock, read-only during execution)
242/// 2. tools (RwLock, read-only during execution)
243/// 3. policies (RwLock, read-only during execution)
244/// 4. session_policies (RwLock to find the per-session engine, then read inner Arc)
245/// 5. cost_budget (RwLock, read-only during execution)
246/// 6. log (TokioMutex, acquired/released per event)
247/// 7. tool_executor (TokioMutex, clone Arc and drop before await)
248/// 8. idempotency_cache (TokioMutex, acquired/released per check)
249///
250/// StateStore uses parking_lot::Mutex (sync) — NEVER hold across .await points.
251pub struct Runtime {
252    pub state: Arc<StateStore>,
253    pub tools: Arc<TokioRwLock<HashMap<String, ToolSchema>>>,
254    pub policies: Arc<TokioRwLock<PolicyEngine>>,
255    /// Per-session policy registries. Hosts that multiplex multiple
256    /// concurrent agent sessions over a single `Runtime` (IDE-style
257    /// frontends with per-project rules, multi-tenant servers) call
258    /// [`Runtime::open_session`] to mint an id, then
259    /// [`Runtime::register_policy_in_session`] to attach session-scoped
260    /// rules. Validation under that session walks the global registry
261    /// AND the session's — both must pass. Sessions can deny what
262    /// global allows; sessions cannot allow what global denies.
263    /// Closing a session drops its registry and any closures it holds.
264    /// See `docs/proposals/per-session-policy-scoping.md`.
265    pub session_policies: Arc<TokioRwLock<HashMap<String, Arc<TokioRwLock<PolicyEngine>>>>>,
266    pub log: Arc<TokioMutex<EventLog>>,
267    pub rate_limiter: Arc<RateLimiter>,
268    pub result_cache: Arc<ResultCache>,
269    tool_executor: TokioMutex<Option<Arc<dyn ToolExecutor>>>,
270    idempotency_cache: TokioMutex<HashMap<String, ActionResult>>,
271    cost_budget: TokioRwLock<Option<CostBudget>>,
272    capabilities: TokioRwLock<Option<CapabilitySet>>,
273    inference_engine: Option<Arc<car_inference::InferenceEngine>>,
274    /// Optional memgine for skill learning (auto-distillation after execution).
275    memgine: Option<Arc<TokioMutex<car_memgine::MemgineEngine>>>,
276    /// Whether to auto-distill skills after each proposal execution.
277    auto_distill: bool,
278    /// Optional trajectory store for persisting execution traces.
279    trajectory_store: Option<Arc<car_memgine::TrajectoryStore>>,
280    /// Optional replan callback for failure recovery.
281    replan_callback: TokioMutex<Option<Arc<dyn ReplanCallback>>>,
282    /// Replan configuration.
283    replan_config: TokioRwLock<ReplanConfig>,
284    /// Canonical tool registry (optional — new code should use this).
285    pub registry: Arc<crate::registry::ToolRegistry>,
286}
287
288impl Runtime {
289    pub fn new() -> Self {
290        Self {
291            state: Arc::new(StateStore::new()),
292            tools: Arc::new(TokioRwLock::new(HashMap::new())),
293            policies: Arc::new(TokioRwLock::new(PolicyEngine::new())),
294            session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
295            log: Arc::new(TokioMutex::new(EventLog::new())),
296            rate_limiter: Arc::new(RateLimiter::new()),
297            result_cache: Arc::new(ResultCache::new()),
298            tool_executor: TokioMutex::new(None),
299            idempotency_cache: TokioMutex::new(HashMap::new()),
300            cost_budget: TokioRwLock::new(None),
301            capabilities: TokioRwLock::new(None),
302            inference_engine: None,
303            memgine: None,
304            auto_distill: false,
305            trajectory_store: None,
306            replan_callback: TokioMutex::new(None),
307            replan_config: TokioRwLock::new(ReplanConfig::default()),
308            registry: Arc::new(crate::registry::ToolRegistry::new()),
309        }
310    }
311
312    /// Create a runtime with shared state, event log, and policies.
313    /// Each runtime gets its own tool set, executor, and idempotency cache.
314    pub fn with_shared(
315        state: Arc<StateStore>,
316        log: Arc<TokioMutex<EventLog>>,
317        policies: Arc<TokioRwLock<PolicyEngine>>,
318    ) -> Self {
319        Self {
320            state,
321            tools: Arc::new(TokioRwLock::new(HashMap::new())),
322            policies,
323            // Session-policy registries are per-runtime — sharing them
324            // across embedders that share global policies would defeat
325            // the isolation point. Hosts that genuinely want shared
326            // sessions should drive them through one shared Runtime.
327            session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
328            log,
329            rate_limiter: Arc::new(RateLimiter::new()),
330            result_cache: Arc::new(ResultCache::new()),
331            tool_executor: TokioMutex::new(None),
332            idempotency_cache: TokioMutex::new(HashMap::new()),
333            cost_budget: TokioRwLock::new(None),
334            capabilities: TokioRwLock::new(None),
335            inference_engine: None,
336            memgine: None,
337            auto_distill: false,
338            trajectory_store: None,
339            replan_callback: TokioMutex::new(None),
340            replan_config: TokioRwLock::new(ReplanConfig::default()),
341            registry: Arc::new(crate::registry::ToolRegistry::new()),
342        }
343    }
344
345    // ─── Session policy lifecycle ───────────────────────────────────
346    //
347    // Per-session policy scoping. Policies registered against a
348    // session apply to proposals executed under that session id (via
349    // [`Self::execute_with_session`] / [`Self::execute_with_session_and_cancel`]).
350    // Policies registered globally always apply, on top of any
351    // session-scoped layer. See `docs/proposals/per-session-policy-scoping.md`.
352
353    /// Mint a new session id and pre-register an empty policy engine
354    /// under it. Hosts call this once per concurrent agent context
355    /// (an IDE project window, a multi-tenant client, etc.) and pair
356    /// it with [`Self::close_session`] when the context ends.
357    ///
358    /// Returns the opaque id to pass to subsequent
359    /// [`Self::register_policy_in_session`] / [`Self::execute_with_session`]
360    /// calls. Ids are UUIDs so collisions across concurrent calls
361    /// don't matter.
362    pub async fn open_session(&self) -> String {
363        let id = Uuid::new_v4().to_string();
364        let mut sessions = self.session_policies.write().await;
365        sessions.insert(id.clone(), Arc::new(TokioRwLock::new(PolicyEngine::new())));
366        id
367    }
368
369    /// Drop the session and every policy scoped to it. Returns true
370    /// if a session by that id existed; false if it didn't (already
371    /// closed, never opened, etc.). Idempotent in effect — closing a
372    /// missing session is a no-op the caller is free to ignore.
373    pub async fn close_session(&self, session_id: &str) -> bool {
374        let mut sessions = self.session_policies.write().await;
375        sessions.remove(session_id).is_some()
376    }
377
378    /// Register a policy under a specific session id. The policy
379    /// applies only when a proposal is executed under that session;
380    /// proposals executed without a session (the default) only see
381    /// global policies.
382    ///
383    /// Returns `Err(...)` if the session is unknown — callers either
384    /// forgot to call [`Self::open_session`] or are using a
385    /// stale/closed id.
386    pub async fn register_policy_in_session(
387        &self,
388        session_id: &str,
389        name: &str,
390        check: car_policy::PolicyCheck,
391        description: &str,
392    ) -> Result<(), String> {
393        let engine = {
394            let sessions = self.session_policies.read().await;
395            sessions
396                .get(session_id)
397                .cloned()
398                .ok_or_else(|| format!("unknown session id '{session_id}'"))?
399        };
400        let mut engine = engine.write().await;
401        engine.register(name, check, description);
402        Ok(())
403    }
404
405    /// True if a session with this id is currently open. Mostly for
406    /// tests and FFI surface validation — production code should
407    /// trust the id it just opened.
408    pub async fn session_exists(&self, session_id: &str) -> bool {
409        self.session_policies.read().await.contains_key(session_id)
410    }
411
412    /// Attach a local inference engine. Registers `infer`, `embed`, `classify`
413    /// as built-in tools with real implementations.
414    pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
415        self.inference_engine = Some(engine);
416        // Register inference tool schemas (non-async init, use try_lock)
417        if let Ok(mut tools) = self.tools.try_write() {
418            for schema in car_inference::service::all_schemas() {
419                tools.insert(schema.name.clone(), schema);
420            }
421        }
422        self
423    }
424
425    /// Attach a memgine for automatic skill learning after execution.
426    /// When `auto_distill` is true, execution traces are automatically distilled
427    /// into skills and domains are evolved when underperforming.
428    pub fn with_learning(
429        mut self,
430        memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>,
431        auto_distill: bool,
432    ) -> Self {
433        self.memgine = Some(memgine);
434        self.auto_distill = auto_distill;
435        self
436    }
437
438    /// Attach a memgine with auto-distillation enabled (recommended default).
439    pub fn with_memgine(self, memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>) -> Self {
440        self.with_learning(memgine, true)
441    }
442
443    /// Attach a trajectory store for persisting execution traces.
444    pub fn with_trajectory_store(mut self, store: Arc<car_memgine::TrajectoryStore>) -> Self {
445        self.trajectory_store = Some(store);
446        self
447    }
448
449    pub fn with_executor(self, executor: Arc<dyn ToolExecutor>) -> Self {
450        // Use try_lock for non-async init context. Safe because we just created the mutex.
451        if let Ok(mut guard) = self.tool_executor.try_lock() {
452            *guard = Some(executor);
453        }
454        self
455    }
456
457    /// Set a tool executor for the next execute() call.
458    /// Used by NAPI bindings where executor varies per call.
459    pub async fn set_executor(&self, executor: Arc<dyn ToolExecutor>) {
460        *self.tool_executor.lock().await = Some(executor);
461    }
462
463    pub fn with_event_log(mut self, log: EventLog) -> Self {
464        self.log = Arc::new(TokioMutex::new(log));
465        self
466    }
467
468    /// Attach a replan callback for failure recovery (builder).
469    pub fn with_replan(self, callback: Arc<dyn ReplanCallback>, config: ReplanConfig) -> Self {
470        if let Ok(mut guard) = self.replan_callback.try_lock() {
471            *guard = Some(callback);
472        }
473        if let Ok(mut guard) = self.replan_config.try_write() {
474            *guard = config;
475        }
476        self
477    }
478
479    /// Set a replan callback at runtime.
480    pub async fn set_replan_callback(&self, callback: Arc<dyn ReplanCallback>) {
481        *self.replan_callback.lock().await = Some(callback);
482    }
483
484    /// Set replan configuration at runtime.
485    pub async fn set_replan_config(&self, config: ReplanConfig) {
486        *self.replan_config.write().await = config;
487    }
488
489    /// Register a tool with just a name (backward compatible).
490    pub async fn register_tool(&self, name: &str) {
491        let schema = ToolSchema {
492            name: name.to_string(),
493            description: String::new(),
494            parameters: serde_json::Value::Object(Default::default()),
495            returns: None,
496            idempotent: false,
497            cache_ttl_secs: None,
498            rate_limit: None,
499        };
500        self.register_tool_schema(schema).await;
501    }
502
503    /// Register a tool with full schema.
504    pub async fn register_tool_schema(&self, schema: ToolSchema) {
505        // Auto-configure cache if schema specifies it
506        if let Some(ttl) = schema.cache_ttl_secs {
507            self.result_cache.enable_caching(&schema.name, ttl).await;
508        }
509        // Auto-configure rate limit if schema specifies it
510        if let Some(ref rl) = schema.rate_limit {
511            self.rate_limiter
512                .set_limit(
513                    &schema.name,
514                    RateLimit {
515                        max_calls: rl.max_calls,
516                        interval_secs: rl.interval_secs,
517                    },
518                )
519                .await;
520        }
521        self.tools.write().await.insert(schema.name.clone(), schema);
522    }
523
524    /// Register a tool via the canonical registry.
525    /// This is the preferred way to register tools — it updates both the
526    /// registry and the legacy tools HashMap for backward compatibility.
527    pub async fn register_tool_entry(&self, entry: crate::registry::ToolEntry) {
528        let schema = entry.schema.clone();
529        self.registry.register(entry).await;
530        self.register_tool_schema(schema).await;
531    }
532
533    /// Register CAR's built-in agent utility stdlib.
534    ///
535    /// This is an opt-in convenience layer for common local-file and text tools.
536    /// Existing runtimes remain unchanged until this is called.
537    pub async fn register_agent_basics(&self) {
538        for entry in crate::agent_basics::entries() {
539            self.register_tool_entry(entry).await;
540        }
541    }
542
543    /// Get all registered tool schemas (for model prompt generation).
544    pub async fn tool_schemas(&self) -> Vec<ToolSchema> {
545        self.tools.read().await.values().cloned().collect()
546    }
547
548    /// Set a cost budget that limits proposal execution.
549    pub async fn set_cost_budget(&self, budget: CostBudget) {
550        *self.cost_budget.write().await = Some(budget);
551    }
552
553    /// Set per-agent capability permissions that restrict tools, state keys, and action count.
554    pub async fn set_capabilities(&self, caps: CapabilitySet) {
555        *self.capabilities.write().await = Some(caps);
556    }
557
558    /// Set a per-tool rate limit (token bucket).
559    ///
560    /// `max_calls` tokens are available per `interval_secs` window.
561    /// When the bucket is empty, `dispatch()` applies backpressure by
562    /// waiting until a token refills.
563    pub async fn set_rate_limit(&self, tool: &str, max_calls: u32, interval_secs: f64) {
564        self.rate_limiter
565            .set_limit(
566                tool,
567                RateLimit {
568                    max_calls,
569                    interval_secs,
570                },
571            )
572            .await;
573    }
574
575    /// Enable cross-proposal result caching for a tool with a TTL in seconds.
576    pub async fn enable_tool_cache(&self, tool: &str, ttl_secs: u64) {
577        self.result_cache.enable_caching(tool, ttl_secs).await;
578    }
579
580    /// Execute a proposal with automatic replanning on failure.
581    ///
582    /// If a `ReplanCallback` is registered and `max_replans > 0`, the runtime
583    /// will catch abort failures, roll back state, ask the model for an
584    /// alternative proposal via the callback, and re-execute. This transforms
585    /// "execute-and-hope" into "execute-and-recover."
586    ///
587    /// If no callback is registered or `max_replans == 0`, behaves identically
588    /// to a single `execute_inner()` call (zero overhead, fully backward compatible).
589    #[instrument(
590        name = "proposal.execute",
591        skip_all,
592        fields(
593            proposal_id = %proposal.id,
594            action_count = proposal.actions.len(),
595        )
596    )]
597    pub async fn execute(&self, proposal: &ActionProposal) -> ProposalResult {
598        // Forward to the cancel-aware variant with a never-cancelled
599        // token. Existing callers see no behaviour change.
600        let token = tokio_util::sync::CancellationToken::new();
601        self.execute_with_cancel(proposal, &token).await
602    }
603
604    /// Execute a proposal scoped to a specific session id.
605    ///
606    /// Validation walks the global policy registry plus the session's
607    /// own registry — both must pass for an action to run. Session
608    /// policies can deny what global allows; they cannot allow what
609    /// global denies (validation is conjunctive).
610    ///
611    /// Returns the same [`ProposalResult`] shape as [`Self::execute`].
612    /// Errors with an action-level rejection if the session id is
613    /// unknown — callers should check via [`Self::session_exists`] or
614    /// trust an id they minted via [`Self::open_session`].
615    pub async fn execute_with_session(
616        &self,
617        proposal: &ActionProposal,
618        session_id: &str,
619    ) -> ProposalResult {
620        let token = tokio_util::sync::CancellationToken::new();
621        self.execute_with_session_and_cancel(proposal, session_id, &token)
622            .await
623    }
624
625    /// Combined session-scoped + cancellable execute. The session id
626    /// is passed verbatim to the per-action policy check; the cancel
627    /// token behaves identically to [`Self::execute_with_cancel`].
628    pub async fn execute_with_session_and_cancel(
629        &self,
630        proposal: &ActionProposal,
631        session_id: &str,
632        cancel: &tokio_util::sync::CancellationToken,
633    ) -> ProposalResult {
634        self.execute_with_optional_session(proposal, Some(session_id), None, cancel)
635            .await
636    }
637
638    /// Execute a proposal with cooperative cancellation.
639    ///
640    /// The runtime checks `token.is_cancelled()` at each DAG level
641    /// boundary. When set, every action that hadn't yet started runs
642    /// is reported as `Skipped` with `error = "canceled: ..."` so
643    /// callers can distinguish "user pulled the plug" from "earlier
644    /// abort cascaded." Actions already in flight continue to
645    /// completion — tool calls dispatched to user-provided executors
646    /// can't be safely interrupted from the engine.
647    ///
648    /// The CAR A2A bridge uses this so `tasks/cancel` produces a
649    /// `ProposalResult` with clean partial state rather than relying
650    /// on `JoinHandle::abort` to interrupt mid-await (which leaves
651    /// no record of which actions actually ran).
652    ///
653    /// **FFI exposure:** this method is intentionally not surfaced
654    /// through the NAPI / PyO3 / `car-server-core` JSON-RPC bindings.
655    /// Those consumers (Node, Python, WebSocket) don't currently
656    /// expose long-running async-task surfaces that need
657    /// cancellation; the bridge is the lone consumer. When a binding
658    /// gains a long-running task surface, the path is clear: add a
659    /// per-binding token registry keyed by some caller-provided id,
660    /// expose `cancelExecution(id)` / `cancel_execution(id)` /
661    /// `proposal.cancel { id }`, and have the runtime call
662    /// `execute_with_cancel` with the matching token. Skipping that
663    /// today avoids speculative API surface that bloats bindings
664    /// without a consumer.
665    pub async fn execute_with_cancel(
666        &self,
667        proposal: &ActionProposal,
668        cancel: &tokio_util::sync::CancellationToken,
669    ) -> ProposalResult {
670        self.execute_with_optional_session(proposal, None, None, cancel)
671            .await
672    }
673
674    /// Execute a proposal with an attached [`RuntimeScope`]
675    /// (Parslee-ai/car#187 phase 3).
676    ///
677    /// Same contract as [`Self::execute`] plus a per-execution
678    /// identity surface — typically built by the car-a2a dispatcher
679    /// from the verified `Identity` and cooperative `a2a_caller`
680    /// metadata on the inbound `ActionProposal`. The scope is
681    /// recorded on the event log so downstream audit / log analysis
682    /// can see which caller / tenant issued each action.
683    ///
684    /// **What this enforces today**: scope is captured + logged.
685    /// Memgine queries and state-store ops still hit global
686    /// namespaces — those follow-ups are tracked under #187.
687    /// Tool / policy code that needs per-tenant behaviour right now
688    /// should keep reading `proposal.context["a2a_caller_verified"]`
689    /// directly (the phase 1 / 2 surface).
690    pub async fn execute_scoped(
691        &self,
692        proposal: &ActionProposal,
693        scope: &crate::scope::RuntimeScope,
694    ) -> ProposalResult {
695        let token = tokio_util::sync::CancellationToken::new();
696        self.execute_scoped_with_cancel(proposal, scope, &token)
697            .await
698    }
699
700    /// Combined scoped + cancellable execute. Mirrors the shape of
701    /// [`Self::execute_with_session_and_cancel`] for symmetry — both
702    /// add a side-channel (session id / scope) on top of the
703    /// cancellable form.
704    pub async fn execute_scoped_with_cancel(
705        &self,
706        proposal: &ActionProposal,
707        scope: &crate::scope::RuntimeScope,
708        cancel: &tokio_util::sync::CancellationToken,
709    ) -> ProposalResult {
710        self.execute_with_optional_session(proposal, None, Some(scope), cancel)
711            .await
712    }
713
714    /// Internal entry point that backs both
715    /// [`Self::execute_with_cancel`] (no session) and
716    /// [`Self::execute_with_session_and_cancel`]. Holds the replan
717    /// loop and threads `session_id` into the per-action validation
718    /// path so session-scoped policies stack on top of global ones.
719    async fn execute_with_optional_session(
720        &self,
721        proposal: &ActionProposal,
722        session_id: Option<&str>,
723        scope: Option<&crate::scope::RuntimeScope>,
724        cancel: &tokio_util::sync::CancellationToken,
725    ) -> ProposalResult {
726        let config = self.replan_config.read().await.clone();
727        let mut current_proposal = proposal.clone();
728        let mut attempt: u32 = 0;
729
730        // Phase 3 foundation (Parslee-ai/car#187): record the scope
731        // on the event log so audit / log analysis can correlate
732        // actions to the caller / tenant that triggered them. Only
733        // logged when at least one identity field is set — keeps
734        // the existing in-process call sites free of noise.
735        if let Some(s) = scope {
736            if !s.is_unscoped() {
737                let mut props: HashMap<String, Value> = HashMap::new();
738                if let Some(cid) = &s.caller_id {
739                    props.insert("caller_id".to_string(), Value::from(cid.as_str()));
740                }
741                if let Some(tid) = &s.tenant_id {
742                    props.insert("tenant_id".to_string(), Value::from(tid.as_str()));
743                }
744                if !s.claims.is_empty() {
745                    if let Ok(claims_json) = serde_json::to_value(&s.claims) {
746                        props.insert("claims".to_string(), claims_json);
747                    }
748                }
749                let mut log = self.log.lock().await;
750                log.append(EventKind::SessionScope, None, Some(&proposal.id), props);
751            }
752        }
753
754        loop {
755            let (result, state_before_map) = self
756                .execute_inner_with_cancel(&current_proposal, Some(cancel), session_id, scope)
757                .await;
758
759            // Check if we aborted
760            let aborted = result
761                .results
762                .iter()
763                .any(|r| r.status == ActionStatus::Failed);
764            if !aborted || attempt >= config.max_replans {
765                if aborted && attempt > 0 {
766                    // Exhausted all replan attempts
767                    let mut log = self.log.lock().await;
768                    log.append(
769                        EventKind::ReplanExhausted,
770                        None,
771                        Some(&proposal.id),
772                        [("attempts".to_string(), Value::from(attempt))].into(),
773                    );
774                }
775
776                // Persist trajectory
777                let outcome = if !aborted {
778                    if attempt > 0 {
779                        car_memgine::TrajectoryOutcome::ReplanSuccess
780                    } else {
781                        car_memgine::TrajectoryOutcome::Success
782                    }
783                } else if attempt > 0 {
784                    car_memgine::TrajectoryOutcome::ReplanExhausted
785                } else {
786                    car_memgine::TrajectoryOutcome::Failed
787                };
788                if let Some(err) = self.persist_trajectory(
789                    proposal,
790                    &current_proposal,
791                    &result,
792                    outcome,
793                    attempt,
794                    &state_before_map,
795                ) {
796                    let mut log = self.log.lock().await;
797                    log.append(
798                        EventKind::ActionFailed,
799                        None,
800                        Some(&proposal.id),
801                        [(
802                            "trajectory_persist_error".to_string(),
803                            Value::from(err.as_str()),
804                        )]
805                        .into(),
806                    );
807                }
808
809                return result;
810            }
811
812            // Get replan callback (clone Arc, drop lock immediately)
813            let callback = {
814                let guard = self.replan_callback.lock().await;
815                guard.clone()
816            };
817            let Some(callback) = callback else {
818                // No callback registered — persist trajectory and return
819                if let Some(err) = self.persist_trajectory(
820                    proposal,
821                    &current_proposal,
822                    &result,
823                    car_memgine::TrajectoryOutcome::Failed,
824                    attempt,
825                    &state_before_map,
826                ) {
827                    let mut log = self.log.lock().await;
828                    log.append(
829                        EventKind::ActionFailed,
830                        None,
831                        Some(&proposal.id),
832                        [(
833                            "trajectory_persist_error".to_string(),
834                            Value::from(err.as_str()),
835                        )]
836                        .into(),
837                    );
838                }
839                return result;
840            };
841
842            // Build replan context
843            let failed_actions: Vec<FailedActionSummary> = result
844                .results
845                .iter()
846                .filter(|r| r.status == ActionStatus::Failed)
847                .map(|r| {
848                    let action = current_proposal
849                        .actions
850                        .iter()
851                        .find(|a| a.id == r.action_id);
852                    FailedActionSummary {
853                        action_id: r.action_id.clone(),
854                        tool: action.and_then(|a| a.tool.clone()),
855                        error: r.error.clone().unwrap_or_default(),
856                        parameters: action.map(|a| a.parameters.clone()).unwrap_or_default(),
857                    }
858                })
859                .collect();
860
861            let completed_action_ids: Vec<String> = result
862                .results
863                .iter()
864                .filter(|r| r.status == ActionStatus::Succeeded)
865                .map(|r| r.action_id.clone())
866                .collect();
867
868            let ctx = ReplanContext {
869                proposal_id: proposal.id.clone(),
870                attempt: attempt + 1,
871                failed_actions,
872                completed_action_ids,
873                state_snapshot: self.state.snapshot(),
874                replans_remaining: config.max_replans.saturating_sub(attempt + 1),
875                original_source: proposal.source.clone(),
876                original_action_count: proposal.actions.len(),
877                original_context: proposal.context.clone(),
878            };
879
880            // Backoff delay between replan attempts
881            if config.delay_ms > 0 {
882                tokio::time::sleep(Duration::from_millis(config.delay_ms)).await;
883            }
884
885            // Log replan attempt
886            {
887                let mut log = self.log.lock().await;
888                log.append(
889                    EventKind::ReplanAttempted,
890                    None,
891                    Some(&proposal.id),
892                    [
893                        ("attempt".to_string(), Value::from(attempt + 1)),
894                        (
895                            "failed_count".to_string(),
896                            Value::from(ctx.failed_actions.len()),
897                        ),
898                    ]
899                    .into(),
900                );
901            }
902
903            // Call the model for a new plan
904            match callback.replan(&ctx).await {
905                Ok(new_proposal) => {
906                    // Quality gate: verify replan proposal before executing
907                    if config.verify_before_execute {
908                        let current_state = self.state.snapshot();
909                        // Verify against the full registered schemas
910                        // (not just names) so a replan with a bad
911                        // parameter type / missing required field is
912                        // rejected here, per register_tool_schema's
913                        // contract (car-releases#56). The read guard is
914                        // held across the synchronous verify call.
915                        let tools_guard = self.tools.read().await;
916                        let vr = car_verify::verify_with_schemas(
917                            &new_proposal,
918                            Some(&current_state),
919                            Some(&tools_guard),
920                            100,
921                        );
922                        drop(tools_guard);
923                        if !vr.valid {
924                            let error_msgs: Vec<String> = vr
925                                .issues
926                                .iter()
927                                .filter(|i| i.severity == "error")
928                                .map(|i| i.message.clone())
929                                .collect();
930                            let mut log = self.log.lock().await;
931                            log.append(
932                                EventKind::ReplanRejected,
933                                None,
934                                Some(&proposal.id),
935                                [
936                                    ("errors".to_string(), Value::from(error_msgs.join("; "))),
937                                    ("attempt".to_string(), Value::from(attempt + 1)),
938                                ]
939                                .into(),
940                            );
941                            // Don't execute a broken replan — count as failed attempt
942                            attempt += 1;
943                            continue;
944                        }
945                    }
946
947                    // Log accepted proposal
948                    {
949                        let mut log = self.log.lock().await;
950                        log.append(
951                            EventKind::ReplanProposalReceived,
952                            None,
953                            Some(&proposal.id),
954                            [
955                                ("attempt".to_string(), Value::from(attempt + 1)),
956                                (
957                                    "new_action_count".to_string(),
958                                    Value::from(new_proposal.actions.len()),
959                                ),
960                            ]
961                            .into(),
962                        );
963                    }
964                    current_proposal = new_proposal;
965                    attempt += 1;
966                }
967                Err(e) => {
968                    // Replan callback itself failed — log and return original failure
969                    let mut log = self.log.lock().await;
970                    log.append(
971                        EventKind::ReplanExhausted,
972                        None,
973                        Some(&proposal.id),
974                        [
975                            ("reason".to_string(), Value::from("callback_error")),
976                            ("error".to_string(), Value::from(e.as_str())),
977                            ("attempt".to_string(), Value::from(attempt + 1)),
978                        ]
979                        .into(),
980                    );
981                    if let Some(err) = self.persist_trajectory(
982                        proposal,
983                        &current_proposal,
984                        &result,
985                        car_memgine::TrajectoryOutcome::Failed,
986                        attempt,
987                        &state_before_map,
988                    ) {
989                        log.append(
990                            EventKind::ActionFailed,
991                            None,
992                            Some(&proposal.id),
993                            [(
994                                "trajectory_persist_error".to_string(),
995                                Value::from(err.as_str()),
996                            )]
997                            .into(),
998                        );
999                    }
1000                    return result;
1001                }
1002            }
1003        }
1004    }
1005
1006    /// Persist a trajectory to the store if configured.
1007    fn persist_trajectory(
1008        &self,
1009        proposal: &ActionProposal,
1010        current_proposal: &ActionProposal,
1011        result: &ProposalResult,
1012        outcome: car_memgine::TrajectoryOutcome,
1013        attempt: u32,
1014        state_before_map: &HashMap<String, HashMap<String, Value>>,
1015    ) -> Option<String> {
1016        let store = self.trajectory_store.as_ref()?;
1017
1018        let trace_events: Vec<car_memgine::TraceEvent> = result
1019            .results
1020            .iter()
1021            .map(|r| {
1022                let kind = match r.status {
1023                    ActionStatus::Succeeded => "action_succeeded",
1024                    ActionStatus::Failed => "action_failed",
1025                    ActionStatus::Rejected => "action_rejected",
1026                    ActionStatus::Skipped => "action_skipped",
1027                    _ => "unknown",
1028                };
1029                let tool = current_proposal
1030                    .actions
1031                    .iter()
1032                    .find(|a| a.id == r.action_id)
1033                    .and_then(|a| a.tool.clone());
1034                let reward = match r.status {
1035                    ActionStatus::Succeeded => Some(1.0),
1036                    ActionStatus::Failed => Some(0.0),
1037                    ActionStatus::Rejected => Some(0.0),
1038                    ActionStatus::Skipped => None,
1039                    _ => None,
1040                };
1041                car_memgine::TraceEvent {
1042                    kind: kind.to_string(),
1043                    action_id: Some(r.action_id.clone()),
1044                    tool,
1045                    data: r
1046                        .error
1047                        .as_ref()
1048                        .map(|e| serde_json::json!({"error": e}))
1049                        .unwrap_or(serde_json::json!({})),
1050                    duration_ms: r.duration_ms,
1051                    state_before: state_before_map.get(&r.action_id).cloned(),
1052                    state_after: if !r.state_changes.is_empty() {
1053                        Some(r.state_changes.clone())
1054                    } else {
1055                        None
1056                    },
1057                    reward,
1058                }
1059            })
1060            .collect();
1061
1062        let trajectory = car_memgine::Trajectory {
1063            proposal_id: proposal.id.clone(),
1064            source: proposal.source.clone(),
1065            action_count: current_proposal.actions.len(),
1066            events: trace_events,
1067            outcome,
1068            timestamp: chrono::Utc::now(),
1069            duration_ms: result.cost.total_duration_ms,
1070            replan_attempts: attempt,
1071        };
1072
1073        match store.append(&trajectory) {
1074            Ok(()) => None,
1075            Err(e) => Some(e.to_string()),
1076        }
1077    }
1078
1079    /// Score N candidate proposals, execute the best valid one, fall back to
1080    /// next-best on failure. Combines car-planner scoring with engine execution.
1081    ///
1082    /// Returns the result from whichever proposal was executed (best or fallback).
1083    /// If all candidates fail verification, returns an error result for the first.
1084    pub async fn plan_and_execute(
1085        &self,
1086        candidates: &[ActionProposal],
1087        planner_config: Option<car_planner::PlannerConfig>,
1088        feedback: Option<&car_planner::ToolFeedback>,
1089    ) -> ProposalResult {
1090        if candidates.is_empty() {
1091            return ProposalResult {
1092                proposal_id: "empty".to_string(),
1093                results: vec![],
1094                cost: car_ir::CostSummary::default(),
1095            };
1096        }
1097
1098        // Score all candidates
1099        let planner = car_planner::Planner::new(planner_config.unwrap_or_default());
1100        let tools_guard = self.tools.read().await;
1101        let tool_names: std::collections::HashSet<String> = tools_guard.keys().cloned().collect();
1102        drop(tools_guard);
1103
1104        let pre_plan_snapshot = self.state.snapshot();
1105        let pre_plan_transitions = self.state.transition_count();
1106        let ranked = planner.rank_with_feedback(
1107            candidates,
1108            Some(&pre_plan_snapshot),
1109            Some(&tool_names),
1110            feedback,
1111        );
1112
1113        // Try each valid candidate in score order
1114        let mut first_failure: Option<ProposalResult> = None;
1115        for scored in &ranked {
1116            if !scored.valid {
1117                continue;
1118            }
1119
1120            // Restore clean state before each candidate (don't rely on execute's
1121            // internal rollback — it only fires on Abort, not Skip/Retry failures)
1122            self.state
1123                .restore(pre_plan_snapshot.clone(), pre_plan_transitions);
1124
1125            let proposal = &candidates[scored.index];
1126            let result = self.execute(proposal).await;
1127
1128            if result.all_succeeded() {
1129                return result;
1130            }
1131
1132            tracing::info!(
1133                proposal_id = %proposal.id,
1134                score = scored.score,
1135                "plan_and_execute: proposal failed, trying next candidate"
1136            );
1137
1138            if first_failure.is_none() {
1139                first_failure = Some(result);
1140            }
1141        }
1142
1143        // Return the first failure result (don't re-execute — avoids duplicate side effects)
1144        first_failure.unwrap_or_else(|| ProposalResult {
1145            proposal_id: candidates[0].id.clone(),
1146            results: vec![],
1147            cost: car_ir::CostSummary::default(),
1148        })
1149    }
1150
1151    /// Execute a single proposal through the runtime loop (no replanning).
1152    /// Returns (result, state_before_map) where state_before_map has per-action snapshots.
1153    ///
1154    /// `session_id`, when `Some`, scopes per-action policy validation
1155    /// to the named session in addition to global policies. Both
1156    /// layers must pass for an action to run; the session layer cannot
1157    /// loosen what global denies.
1158    async fn execute_inner_with_cancel(
1159        &self,
1160        proposal: &ActionProposal,
1161        cancel: Option<&tokio_util::sync::CancellationToken>,
1162        session_id: Option<&str>,
1163        scope: Option<&crate::scope::RuntimeScope>,
1164    ) -> (ProposalResult, HashMap<String, HashMap<String, Value>>) {
1165        // Generate trace_id for this proposal execution
1166        let trace_id = Uuid::new_v4().to_string();
1167
1168        // Begin root span for proposal execution
1169        let root_span_id = {
1170            let mut log = self.log.lock().await;
1171            log.begin_span(
1172                "proposal.execute",
1173                &trace_id,
1174                None,
1175                [("proposal_id".to_string(), Value::from(proposal.id.as_str()))].into(),
1176            )
1177        };
1178
1179        // Log proposal received
1180        {
1181            let mut log = self.log.lock().await;
1182            log.append(
1183                EventKind::ProposalReceived,
1184                None,
1185                Some(&proposal.id),
1186                [
1187                    ("source".to_string(), Value::from(proposal.source.as_str())),
1188                    (
1189                        "action_count".to_string(),
1190                        Value::from(proposal.actions.len()),
1191                    ),
1192                ]
1193                .into(),
1194            );
1195        }
1196
1197        // Capability check: max_actions budget for entire proposal
1198        {
1199            let caps = self.capabilities.read().await;
1200            if let Some(ref cap) = *caps {
1201                if !cap.actions_within_budget(proposal.actions.len() as u32) {
1202                    let mut action_results = Vec::new();
1203                    for action in &proposal.actions {
1204                        action_results.push(rejected_result(
1205                            &action.id,
1206                            format!(
1207                                "capability denied: proposal has {} actions, max allowed is {:?}",
1208                                proposal.actions.len(),
1209                                cap.max_actions
1210                            ),
1211                        ));
1212                    }
1213                    return (
1214                        ProposalResult {
1215                            proposal_id: proposal.id.clone(),
1216                            results: action_results,
1217                            cost: CostSummary::default(),
1218                        },
1219                        HashMap::new(),
1220                    );
1221                }
1222            }
1223        }
1224
1225        // Snapshot for rollback
1226        let snapshot = self.state.snapshot();
1227        let transition_count = self.state.transition_count();
1228
1229        let mut results: Vec<ActionResult> = Vec::new();
1230        // Per-action state snapshots captured before execution (for TraceEvent.state_before).
1231        let mut state_before_map: HashMap<String, HashMap<String, Value>> = HashMap::new();
1232        let mut aborted = false;
1233        let mut budget_exceeded = false;
1234        let mut total_retries: u32 = 0;
1235
1236        // Running cost counters for budget enforcement
1237        let mut running_tool_calls: u32 = 0;
1238        let mut running_actions: u32 = 0;
1239        let mut running_duration_ms: f64 = 0.0;
1240
1241        // Snapshot the budget once
1242        let budget = self.cost_budget.read().await.clone();
1243
1244        // Build DAG
1245        let levels = build_dag(&proposal.actions);
1246
1247        let mut canceled = false;
1248        for level in &levels {
1249            // Cooperative cancellation check at the level boundary.
1250            // Actions already in flight aren't interrupted (we can't
1251            // safely cancel a tool call dispatched to a user-provided
1252            // executor), but every action that hadn't started runs
1253            // is recorded as canceled with a clear reason.
1254            if !canceled {
1255                if let Some(token) = cancel {
1256                    if token.is_cancelled() {
1257                        canceled = true;
1258                    }
1259                }
1260            }
1261            if canceled {
1262                for &idx in level {
1263                    results.push(canceled_result(
1264                        &proposal.actions[idx].id,
1265                        "cancellation requested by caller",
1266                    ));
1267                }
1268                continue;
1269            }
1270            if aborted || budget_exceeded {
1271                let skip_reason = if budget_exceeded {
1272                    "cost budget exceeded"
1273                } else {
1274                    "skipped due to earlier abort"
1275                };
1276                for &idx in level {
1277                    results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1278                }
1279                continue;
1280            }
1281
1282            // Check if any action in this level has ABORT behavior
1283            let has_abort = level
1284                .iter()
1285                .any(|&i| proposal.actions[i].failure_behavior == FailureBehavior::Abort);
1286
1287            if level.len() == 1 || has_abort {
1288                // Sequential execution
1289                for &idx in level {
1290                    if aborted || budget_exceeded {
1291                        let skip_reason = if budget_exceeded {
1292                            "cost budget exceeded"
1293                        } else {
1294                            "skipped due to abort"
1295                        };
1296                        results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1297                        continue;
1298                    }
1299
1300                    // Budget check before execution
1301                    if let Some(ref b) = budget {
1302                        if let Some(max) = b.max_actions {
1303                            if running_actions >= max {
1304                                budget_exceeded = true;
1305                                results.push(skipped_result(
1306                                    &proposal.actions[idx].id,
1307                                    "cost budget exceeded",
1308                                ));
1309                                continue;
1310                            }
1311                        }
1312                        if let Some(max) = b.max_tool_calls {
1313                            if proposal.actions[idx].action_type == ActionType::ToolCall
1314                                && running_tool_calls >= max
1315                            {
1316                                budget_exceeded = true;
1317                                results.push(skipped_result(
1318                                    &proposal.actions[idx].id,
1319                                    "cost budget exceeded",
1320                                ));
1321                                continue;
1322                            }
1323                        }
1324                        if let Some(max) = b.max_duration_ms {
1325                            if running_duration_ms >= max {
1326                                budget_exceeded = true;
1327                                results.push(skipped_result(
1328                                    &proposal.actions[idx].id,
1329                                    "cost budget exceeded",
1330                                ));
1331                                continue;
1332                            }
1333                        }
1334                    }
1335
1336                    state_before_map.insert(
1337                        proposal.actions[idx].id.clone(),
1338                        snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1339                    );
1340                    let (ar, action_retries) = self
1341                        .process_action(
1342                            &proposal.actions[idx],
1343                            &proposal.id,
1344                            &trace_id,
1345                            &root_span_id,
1346                            session_id,
1347                            scope,
1348                        )
1349                        .await;
1350                    total_retries += action_retries;
1351
1352                    // Update running counters
1353                    if ar.status == ActionStatus::Succeeded
1354                        && proposal.actions[idx].action_type == ActionType::ToolCall
1355                    {
1356                        running_tool_calls += 1;
1357                    }
1358                    if ar.status != ActionStatus::Skipped {
1359                        running_actions += 1;
1360                    }
1361                    if let Some(d) = ar.duration_ms {
1362                        running_duration_ms += d;
1363                    }
1364
1365                    if ar.status == ActionStatus::Failed
1366                        && proposal.actions[idx].failure_behavior == FailureBehavior::Abort
1367                    {
1368                        aborted = true;
1369                    }
1370                    results.push(ar);
1371                }
1372            } else {
1373                // Concurrent execution via futures::join_all
1374                // Snapshot only relevant keys per action (all see same pre-level state)
1375                for &idx in level {
1376                    state_before_map.insert(
1377                        proposal.actions[idx].id.clone(),
1378                        snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1379                    );
1380                }
1381                let futs: Vec<_> = level
1382                    .iter()
1383                    .map(|&idx| {
1384                        self.process_action(
1385                            &proposal.actions[idx],
1386                            &proposal.id,
1387                            &trace_id,
1388                            &root_span_id,
1389                            session_id,
1390                            scope,
1391                        )
1392                    })
1393                    .collect();
1394                let level_results = futures::future::join_all(futs).await;
1395
1396                for (i, (ar, action_retries)) in level_results.into_iter().enumerate() {
1397                    let idx = level[i];
1398                    total_retries += action_retries;
1399                    if ar.status == ActionStatus::Succeeded
1400                        && proposal.actions[idx].action_type == ActionType::ToolCall
1401                    {
1402                        running_tool_calls += 1;
1403                    }
1404                    if ar.status != ActionStatus::Skipped {
1405                        running_actions += 1;
1406                    }
1407                    if let Some(d) = ar.duration_ms {
1408                        running_duration_ms += d;
1409                    }
1410                    results.push(ar);
1411                }
1412            }
1413        }
1414
1415        // Handle rollback
1416        if aborted {
1417            self.state.restore(snapshot.clone(), transition_count);
1418
1419            let mut log = self.log.lock().await;
1420            log.append(
1421                EventKind::StateSnapshot,
1422                None,
1423                Some(&proposal.id),
1424                [(
1425                    "state".to_string(),
1426                    serde_json::to_value(&snapshot).unwrap_or_default(),
1427                )]
1428                .into(),
1429            );
1430            log.append(
1431                EventKind::StateRollback,
1432                None,
1433                Some(&proposal.id),
1434                [(
1435                    "rolled_back_to".to_string(),
1436                    Value::from("pre-proposal snapshot"),
1437                )]
1438                .into(),
1439            );
1440
1441            // Clear idempotency cache for rolled-back actions
1442            let mut cache = self.idempotency_cache.lock().await;
1443            for r in &results {
1444                if r.status == ActionStatus::Succeeded {
1445                    for action in &proposal.actions {
1446                        if action.id == r.action_id && action.idempotent {
1447                            cache.remove(&idempotency_key(action));
1448                        }
1449                    }
1450                }
1451            }
1452        }
1453
1454        // Sort results to match original action order
1455        let action_order: HashMap<String, usize> = proposal
1456            .actions
1457            .iter()
1458            .enumerate()
1459            .map(|(i, a)| (a.id.clone(), i))
1460            .collect();
1461        results.sort_by_key(|r| {
1462            action_order
1463                .get(&r.action_id)
1464                .copied()
1465                .unwrap_or(usize::MAX)
1466        });
1467
1468        // Compute cost summary from results
1469        let mut cost = CostSummary::default();
1470        for r in &results {
1471            let action = action_order
1472                .get(&r.action_id)
1473                .and_then(|&i| proposal.actions.get(i));
1474            match r.status {
1475                ActionStatus::Succeeded => {
1476                    cost.actions_executed += 1;
1477                    if let Some(a) = action {
1478                        if a.action_type == ActionType::ToolCall {
1479                            cost.tool_calls += 1;
1480                        }
1481                    }
1482                }
1483                ActionStatus::Failed | ActionStatus::Rejected => {
1484                    cost.actions_executed += 1;
1485                }
1486                ActionStatus::Skipped => {
1487                    cost.actions_skipped += 1;
1488                }
1489                _ => {}
1490            }
1491            if let Some(d) = r.duration_ms {
1492                cost.total_duration_ms += d;
1493            }
1494        }
1495
1496        // Set retries from inline counter
1497        cost.retries = total_retries;
1498
1499        // End root span — Ok if no abort, Error if aborted
1500        {
1501            let span_status = if aborted {
1502                SpanStatus::Error
1503            } else {
1504                SpanStatus::Ok
1505            };
1506            let mut log = self.log.lock().await;
1507            log.end_span(&root_span_id, span_status);
1508        }
1509
1510        let proposal_result = ProposalResult {
1511            proposal_id: proposal.id.clone(),
1512            results,
1513            cost,
1514        };
1515
1516        // Post-execution: auto-distill skills from this execution trace
1517        if self.auto_distill {
1518            if let Some(ref memgine) = self.memgine {
1519                // Convert results to TraceEvents for distillation
1520                let trace_events: Vec<car_memgine::TraceEvent> = proposal_result
1521                    .results
1522                    .iter()
1523                    .map(|r| {
1524                        let kind = match r.status {
1525                            ActionStatus::Succeeded => "action_succeeded",
1526                            ActionStatus::Failed => "action_failed",
1527                            ActionStatus::Rejected => "action_rejected",
1528                            ActionStatus::Skipped => "action_skipped",
1529                            _ => "unknown",
1530                        };
1531                        // Find the matching action to get the tool name
1532                        let tool = proposal
1533                            .actions
1534                            .iter()
1535                            .find(|a| a.id == r.action_id)
1536                            .and_then(|a| a.tool.clone());
1537                        let mut data = serde_json::Map::new();
1538                        if let Some(ref e) = r.error {
1539                            data.insert("error".into(), Value::from(e.as_str()));
1540                        }
1541                        if let Some(ref o) = r.output {
1542                            data.insert("output".into(), o.clone());
1543                        }
1544                        car_memgine::TraceEvent {
1545                            kind: kind.to_string(),
1546                            action_id: Some(r.action_id.clone()),
1547                            tool,
1548                            data: Value::Object(data),
1549                            duration_ms: r.duration_ms,
1550                            reward: match r.status {
1551                                ActionStatus::Succeeded => Some(1.0),
1552                                ActionStatus::Failed | ActionStatus::Rejected => Some(0.0),
1553                                _ => None,
1554                            },
1555                            ..Default::default()
1556                        }
1557                    })
1558                    .collect();
1559
1560                let mut engine = memgine.lock().await;
1561                let skills = engine.distill_skills(&trace_events).await;
1562                if !skills.is_empty() {
1563                    let count = skills.len();
1564                    // Tenant-aware ingest (Parslee-ai/car#187 phase 3-D).
1565                    // When the proposal carried a tenant, distilled
1566                    // skills get stamped so the same tenant's next
1567                    // build_context can find them. Unscoped proposals
1568                    // continue to land in the global skill pool.
1569                    let tenant = scope.and_then(|s| s.tenant_id.as_deref());
1570                    engine.scoped(tenant).ingest_distilled_skills(&skills);
1571
1572                    // Log the distillation event
1573                    let mut log = self.log.lock().await;
1574                    log.append(
1575                        EventKind::SkillDistilled,
1576                        None,
1577                        Some(&proposal_result.proposal_id),
1578                        [
1579                            ("skills_count".to_string(), Value::from(count)),
1580                            (
1581                                "skill_names".to_string(),
1582                                Value::from(
1583                                    skills.iter().map(|s| s.name.as_str()).collect::<Vec<_>>(),
1584                                ),
1585                            ),
1586                        ]
1587                        .into(),
1588                    );
1589
1590                    // Check if any domains need evolution
1591                    let threshold = engine.evolution_threshold();
1592                    let domains = engine.domains_needing_evolution(threshold);
1593                    for domain in &domains {
1594                        // Collect failed events for this domain
1595                        let failed: Vec<car_memgine::TraceEvent> = trace_events
1596                            .iter()
1597                            .filter(|e| {
1598                                matches!(e.kind.as_str(), "action_failed" | "action_rejected")
1599                            })
1600                            .cloned()
1601                            .collect();
1602                        if !failed.is_empty() {
1603                            let evolved = engine.evolve_skills(&failed, domain).await;
1604                            if !evolved.is_empty() {
1605                                log.append(
1606                                    EventKind::EvolutionTriggered,
1607                                    None,
1608                                    Some(&proposal_result.proposal_id),
1609                                    [
1610                                        ("domain".to_string(), Value::from(domain.as_str())),
1611                                        ("new_skills".to_string(), Value::from(evolved.len())),
1612                                    ]
1613                                    .into(),
1614                                );
1615                            }
1616                        }
1617                    }
1618                }
1619            }
1620        }
1621
1622        (proposal_result, state_before_map)
1623    }
1624
1625    /// Process a single action: idempotency → validate → policy → execute.
1626    /// Returns (ActionResult, retries_count).
1627    async fn process_action(
1628        &self,
1629        action: &Action,
1630        proposal_id: &str,
1631        trace_id: &str,
1632        parent_span_id: &str,
1633        session_id: Option<&str>,
1634        scope: Option<&crate::scope::RuntimeScope>,
1635    ) -> (ActionResult, u32) {
1636        // Derive action type name for span naming
1637        let action_type_name = serde_json::to_string(&action.action_type)
1638            .unwrap_or_default()
1639            .trim_matches('"')
1640            .to_string();
1641        let span_name = format!("action.{}", action_type_name);
1642
1643        // Begin child span for this action
1644        let action_span_id = {
1645            let mut attrs: HashMap<String, Value> = HashMap::new();
1646            attrs.insert("action_id".to_string(), Value::from(action.id.as_str()));
1647            if let Some(ref tool) = action.tool {
1648                attrs.insert("tool".to_string(), Value::from(tool.as_str()));
1649            }
1650            let mut log = self.log.lock().await;
1651            log.begin_span(&span_name, trace_id, Some(parent_span_id), attrs)
1652        };
1653
1654        // Execute the action pipeline and capture result
1655        let (result, retries) = self
1656            .process_action_inner(action, proposal_id, session_id, scope)
1657            .await;
1658
1659        // End action span based on result status
1660        let span_status = match result.status {
1661            ActionStatus::Succeeded => SpanStatus::Ok,
1662            ActionStatus::Failed | ActionStatus::Rejected => SpanStatus::Error,
1663            _ => SpanStatus::Unset,
1664        };
1665        {
1666            let mut log = self.log.lock().await;
1667            log.end_span(&action_span_id, span_status);
1668        }
1669
1670        (result, retries)
1671    }
1672
1673    /// Inner action processing: idempotency -> validate -> policy -> execute.
1674    /// Returns (ActionResult, retries_count).
1675    #[instrument(
1676        name = "action.process",
1677        skip_all,
1678        fields(
1679            action_id = %action.id,
1680            action_type = ?action.action_type,
1681            tool = action.tool.as_deref().unwrap_or("none"),
1682        )
1683    )]
1684    async fn process_action_inner(
1685        &self,
1686        action: &Action,
1687        proposal_id: &str,
1688        session_id: Option<&str>,
1689        scope: Option<&crate::scope::RuntimeScope>,
1690    ) -> (ActionResult, u32) {
1691        // Idempotency check
1692        if action.idempotent {
1693            let key = idempotency_key(action);
1694            let cache = self.idempotency_cache.lock().await;
1695            if let Some(cached) = cache.get(&key) {
1696                let mut log = self.log.lock().await;
1697                log.append(
1698                    EventKind::ActionDeduplicated,
1699                    Some(&action.id),
1700                    Some(proposal_id),
1701                    [(
1702                        "cached_action_id".to_string(),
1703                        Value::from(cached.action_id.as_str()),
1704                    )]
1705                    .into(),
1706                );
1707                return (
1708                    ActionResult {
1709                        action_id: action.id.clone(),
1710                        status: cached.status.clone(),
1711                        output: cached.output.clone(),
1712                        error: cached.error.clone(),
1713                        state_changes: cached.state_changes.clone(),
1714                        duration_ms: Some(0.0),
1715                        timestamp: chrono::Utc::now(),
1716                    },
1717                    0,
1718                );
1719            }
1720        }
1721
1722        // Capability check
1723        {
1724            let caps = self.capabilities.read().await;
1725            if let Some(ref cap) = *caps {
1726                // Check tool capability for ToolCall actions
1727                if action.action_type == ActionType::ToolCall {
1728                    if let Some(ref tool_name) = action.tool {
1729                        if !cap.tool_allowed(tool_name) {
1730                            let mut log = self.log.lock().await;
1731                            log.append(
1732                                EventKind::ActionRejected,
1733                                Some(&action.id),
1734                                Some(proposal_id),
1735                                HashMap::new(),
1736                            );
1737                            return (
1738                                rejected_result(
1739                                    &action.id,
1740                                    format!("capability denied: tool '{}' not allowed", tool_name),
1741                                ),
1742                                0,
1743                            );
1744                        }
1745                    }
1746                }
1747
1748                // Check state key capability for StateWrite/StateRead actions
1749                if action.action_type == ActionType::StateWrite
1750                    || action.action_type == ActionType::StateRead
1751                {
1752                    if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
1753                        if !cap.state_key_allowed(key) {
1754                            let mut log = self.log.lock().await;
1755                            log.append(
1756                                EventKind::ActionRejected,
1757                                Some(&action.id),
1758                                Some(proposal_id),
1759                                HashMap::new(),
1760                            );
1761                            return (
1762                                rejected_result(
1763                                    &action.id,
1764                                    format!("capability denied: state key '{}' not allowed", key),
1765                                ),
1766                                0,
1767                            );
1768                        }
1769                    }
1770                }
1771            }
1772        }
1773
1774        // Validate
1775        let tools = self.tools.read().await;
1776        let validation = validate_action(action, &self.state, &tools);
1777        drop(tools);
1778
1779        if !validation.valid() {
1780            let error = validation
1781                .errors
1782                .iter()
1783                .map(|e| e.reason.as_str())
1784                .collect::<Vec<_>>()
1785                .join("; ");
1786            let mut log = self.log.lock().await;
1787            log.append(
1788                EventKind::ActionRejected,
1789                Some(&action.id),
1790                Some(proposal_id),
1791                HashMap::new(),
1792            );
1793            return (rejected_result(&action.id, error), 0);
1794        }
1795
1796        // Policy check — global registry plus, when the proposal is
1797        // executed under a session, that session's registry. Both
1798        // layers must pass for the action to proceed; session
1799        // policies are additive deny rules — they can deny what
1800        // global allows but cannot allow what global denies.
1801        {
1802            let mut violations = {
1803                let policies = self.policies.read().await;
1804                policies.check(action, &self.state)
1805            };
1806            if let Some(sid) = session_id {
1807                // Snapshot the per-session engine handle out from under
1808                // the outer registry lock so the inner check holds
1809                // only the engine's own RwLock — preserves the
1810                // documented lock-ordering discipline.
1811                let session_engine = {
1812                    let sessions = self.session_policies.read().await;
1813                    sessions.get(sid).cloned()
1814                };
1815                if let Some(engine) = session_engine {
1816                    let engine = engine.read().await;
1817                    violations.extend(engine.check(action, &self.state));
1818                } else {
1819                    // Unknown session id — refuse the action rather
1820                    // than silently fall back to global-only. A
1821                    // proposal submitted under a closed session
1822                    // shouldn't run with looser rules than the caller
1823                    // intended.
1824                    let mut log = self.log.lock().await;
1825                    log.append(
1826                        EventKind::PolicyViolation,
1827                        Some(&action.id),
1828                        Some(proposal_id),
1829                        HashMap::new(),
1830                    );
1831                    return (
1832                        rejected_result(
1833                            &action.id,
1834                            format!(
1835                                "unknown session id '{sid}' — open one via Runtime::open_session before executing under a session"
1836                            ),
1837                        ),
1838                        0,
1839                    );
1840                }
1841            }
1842            if !violations.is_empty() {
1843                let error = violations
1844                    .iter()
1845                    .map(|v| format!("policy '{}': {}", v.policy_name, v.reason))
1846                    .collect::<Vec<_>>()
1847                    .join("; ");
1848                let mut log = self.log.lock().await;
1849                log.append(
1850                    EventKind::PolicyViolation,
1851                    Some(&action.id),
1852                    Some(proposal_id),
1853                    HashMap::new(),
1854                );
1855                return (rejected_result(&action.id, error), 0);
1856            }
1857        }
1858
1859        // Validated
1860        {
1861            let mut log = self.log.lock().await;
1862            log.append(
1863                EventKind::ActionValidated,
1864                Some(&action.id),
1865                Some(proposal_id),
1866                HashMap::new(),
1867            );
1868        }
1869
1870        // Execute with retry
1871        let (result, retries) = self.execute_with_retry(action, proposal_id, scope).await;
1872
1873        // Cache idempotent results
1874        if action.idempotent && result.status == ActionStatus::Succeeded {
1875            let mut cache = self.idempotency_cache.lock().await;
1876            cache.insert(idempotency_key(action), result.clone());
1877        }
1878
1879        tracing::info!(
1880            status = ?result.status,
1881            duration_ms = result.duration_ms,
1882            "action completed"
1883        );
1884
1885        (result, retries)
1886    }
1887
1888    /// Execute with retry logic and timeout.
1889    /// Returns (ActionResult, retries_count).
1890    async fn execute_with_retry(
1891        &self,
1892        action: &Action,
1893        proposal_id: &str,
1894        scope: Option<&crate::scope::RuntimeScope>,
1895    ) -> (ActionResult, u32) {
1896        let max_attempts = if action.failure_behavior == FailureBehavior::Retry {
1897            action.max_retries + 1
1898        } else {
1899            1
1900        };
1901
1902        let mut last_error: Option<String> = None;
1903        let mut retries: u32 = 0;
1904
1905        for attempt in 0..max_attempts {
1906            if attempt > 0 {
1907                retries += 1;
1908                let delay = RETRY_BASE_DELAY_MS * RETRY_BACKOFF_FACTOR.pow(attempt as u32 - 1);
1909                tokio::time::sleep(Duration::from_millis(delay)).await;
1910                let mut log = self.log.lock().await;
1911                log.append(
1912                    EventKind::ActionRetrying,
1913                    Some(&action.id),
1914                    Some(proposal_id),
1915                    [("attempt".to_string(), Value::from(attempt + 1))].into(),
1916                );
1917            }
1918
1919            {
1920                let mut log = self.log.lock().await;
1921                log.append(
1922                    EventKind::ActionExecuting,
1923                    Some(&action.id),
1924                    Some(proposal_id),
1925                    HashMap::new(),
1926                );
1927            }
1928
1929            let start = std::time::Instant::now();
1930            let transitions_before = self.state.transition_count();
1931
1932            // Execute with optional timeout
1933            let exec_result = if let Some(timeout_ms) = action.timeout_ms {
1934                match timeout(
1935                    Duration::from_millis(timeout_ms),
1936                    self.dispatch(action, scope),
1937                )
1938                .await
1939                {
1940                    Ok(r) => r,
1941                    Err(_) => Err(format!("action timed out after {}ms", timeout_ms)),
1942                }
1943            } else {
1944                self.dispatch(action, scope).await
1945            };
1946
1947            let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
1948
1949            match exec_result {
1950                Ok(output) => {
1951                    // Commit post-effects. Tenant-scoped when the
1952                    // proposal carries a scope (Parslee-ai/car#187
1953                    // phase 3) — distinct tenants write to disjoint
1954                    // namespaces so concurrent multi-tenant proposals
1955                    // don't trample each other's keys.
1956                    let tenant_for_effects = scope.and_then(|s| s.tenant_id.as_deref());
1957                    let scoped_state = self.state.scoped(tenant_for_effects);
1958                    let mut state_changes: HashMap<String, Value> = HashMap::new();
1959                    for (key, value) in &action.expected_effects {
1960                        scoped_state.set(key, value.clone(), &action.id);
1961                        state_changes.insert(key.clone(), value.clone());
1962                    }
1963
1964                    // Capture state changes from dispatch
1965                    for t in self.state.transitions_since(transitions_before) {
1966                        if !state_changes.contains_key(&t.key) {
1967                            if let Some(v) = t.new_value {
1968                                state_changes.insert(t.key.clone(), v);
1969                            }
1970                        }
1971                    }
1972
1973                    let mut log = self.log.lock().await;
1974                    log.append(
1975                        EventKind::ActionSucceeded,
1976                        Some(&action.id),
1977                        Some(proposal_id),
1978                        [("duration_ms".to_string(), Value::from(duration_ms))].into(),
1979                    );
1980
1981                    if !state_changes.is_empty() {
1982                        log.append(
1983                            EventKind::StateChanged,
1984                            Some(&action.id),
1985                            Some(proposal_id),
1986                            [(
1987                                "changes".to_string(),
1988                                serde_json::to_value(&state_changes).unwrap_or_default(),
1989                            )]
1990                            .into(),
1991                        );
1992                    }
1993
1994                    return (
1995                        ActionResult {
1996                            action_id: action.id.clone(),
1997                            status: ActionStatus::Succeeded,
1998                            output: Some(output),
1999                            error: None,
2000                            state_changes,
2001                            duration_ms: Some(duration_ms),
2002                            timestamp: chrono::Utc::now(),
2003                        },
2004                        retries,
2005                    );
2006                }
2007                Err(e) => {
2008                    last_error = Some(e.clone());
2009                    let mut log = self.log.lock().await;
2010                    log.append(
2011                        EventKind::ActionFailed,
2012                        Some(&action.id),
2013                        Some(proposal_id),
2014                        [
2015                            ("error".to_string(), Value::from(e.as_str())),
2016                            ("attempt".to_string(), Value::from(attempt + 1)),
2017                        ]
2018                        .into(),
2019                    );
2020                }
2021            }
2022        }
2023
2024        // All attempts exhausted
2025        if action.failure_behavior == FailureBehavior::Skip {
2026            return (
2027                skipped_result(
2028                    &action.id,
2029                    last_error.as_deref().unwrap_or("all attempts exhausted"),
2030                ),
2031                retries,
2032            );
2033        }
2034
2035        (
2036            ActionResult {
2037                action_id: action.id.clone(),
2038                status: ActionStatus::Failed,
2039                output: None,
2040                error: last_error,
2041                state_changes: HashMap::new(),
2042                duration_ms: None,
2043                timestamp: chrono::Utc::now(),
2044            },
2045            retries,
2046        )
2047    }
2048
2049    /// Dispatch an action to the appropriate handler.
2050    ///
2051    /// `scope` is the per-execution caller / tenant surface
2052    /// (Parslee-ai/car#187 phase 3). When the scope carries a
2053    /// tenant id, state R/W operations (`StateWrite`, `StateRead`,
2054    /// `Assertion`) route through `StateStore::scoped(tenant_id)`
2055    /// so distinct tenants can't see each other's keys. Unscoped
2056    /// proposals get the legacy flat-namespace behaviour
2057    /// automatically.
2058    async fn dispatch(
2059        &self,
2060        action: &Action,
2061        scope: Option<&crate::scope::RuntimeScope>,
2062    ) -> Result<Value, String> {
2063        match action.action_type {
2064            ActionType::ToolCall => {
2065                let tool_name = action.tool.as_deref().ok_or("tool_call has no tool")?;
2066                let params = Value::Object(
2067                    action
2068                        .parameters
2069                        .iter()
2070                        .map(|(k, v)| (k.clone(), v.clone()))
2071                        .collect(),
2072                );
2073
2074                // Check cross-proposal result cache.
2075                if let Some(cached) = self.result_cache.get(tool_name, &params).await {
2076                    return Ok(cached);
2077                }
2078
2079                // Apply rate limit backpressure before executing.
2080                self.rate_limiter.acquire(tool_name).await;
2081
2082                // Try built-in inference tools first when the inference engine is available.
2083                if matches!(
2084                    tool_name,
2085                    "infer" | "infer.grounded" | "embed" | "classify" | "transcribe" | "synthesize"
2086                ) {
2087                    if let Some(ref engine) = self.inference_engine {
2088                        // For "infer.grounded" or "infer" with memgine available,
2089                        // build context from memory and attach it to the request.
2090                        let params = {
2091                            let should_ground =
2092                                tool_name == "infer.grounded" || tool_name == "infer";
2093                            if should_ground {
2094                                if let Some(ref memgine) = self.memgine {
2095                                    if let Some(prompt) =
2096                                        params.get("prompt").and_then(|v| v.as_str())
2097                                    {
2098                                        let ctx = {
2099                                            let mut m = memgine.lock().await;
2100                                            m.build_context(prompt)
2101                                        };
2102                                        if !ctx.is_empty() {
2103                                            let mut p = params.clone();
2104                                            if let Some(obj) = p.as_object_mut() {
2105                                                obj.insert("context".to_string(), Value::from(ctx));
2106                                            }
2107                                            p
2108                                        } else {
2109                                            params
2110                                        }
2111                                    } else {
2112                                        params
2113                                    }
2114                                } else {
2115                                    params
2116                                }
2117                            } else {
2118                                params
2119                            }
2120                        };
2121
2122                        // Route "infer.grounded" to "infer" for the service layer
2123                        let effective_tool = if tool_name == "infer.grounded" {
2124                            "infer"
2125                        } else {
2126                            tool_name
2127                        };
2128                        let result =
2129                            car_inference::service::execute_tool(engine, effective_tool, &params)
2130                                .await
2131                                .map_err(|e| e.to_string());
2132
2133                        if let Ok(ref value) = result {
2134                            self.result_cache
2135                                .put(tool_name, &params, value.clone())
2136                                .await;
2137                        }
2138
2139                        return result;
2140                    }
2141                }
2142
2143                // Built-in memory consolidation tool.
2144                if tool_name == "memory.consolidate" {
2145                    if let Some(ref memgine) = self.memgine {
2146                        let report = {
2147                            let mut m = memgine.lock().await;
2148                            m.consolidate().await
2149                        };
2150                        // Log the consolidation event
2151                        {
2152                            let mut log = self.log.lock().await;
2153                            log.append(
2154                                EventKind::Consolidated,
2155                                None,
2156                                None,
2157                                [
2158                                    (
2159                                        "expired_pruned".to_string(),
2160                                        Value::from(report.expired_pruned),
2161                                    ),
2162                                    (
2163                                        "superseded_gc".to_string(),
2164                                        Value::from(report.superseded_gc),
2165                                    ),
2166                                    (
2167                                        "stale_embeddings_removed".to_string(),
2168                                        Value::from(report.stale_embeddings_removed),
2169                                    ),
2170                                    (
2171                                        "nodes_embedded".to_string(),
2172                                        Value::from(report.nodes_embedded),
2173                                    ),
2174                                    (
2175                                        "domains_evolved".to_string(),
2176                                        Value::from(report.domains_evolved.clone()),
2177                                    ),
2178                                    ("total_nodes".to_string(), Value::from(report.total_nodes)),
2179                                    ("total_edges".to_string(), Value::from(report.total_edges)),
2180                                ]
2181                                .into(),
2182                            );
2183                        }
2184                        return Ok(serde_json::to_value(&report).unwrap_or(Value::Null));
2185                    } else {
2186                        return Err(
2187                            "memory.consolidate requires memgine (attach with with_learning)"
2188                                .into(),
2189                        );
2190                    }
2191                }
2192
2193                // Prefer a configured tool_executor for any tool it claims to handle.
2194                // Fall through to agent_basics only when the configured executor is absent
2195                // or explicitly returns "unknown tool" — this prevents agent_basics' built-in
2196                // read_file/write_file (which resolve paths via std::env::current_dir) from
2197                // silently overriding an executor that carries its own working_dir.
2198                let configured = {
2199                    let guard = self.tool_executor.lock().await;
2200                    guard.as_ref().cloned()
2201                };
2202
2203                if let Some(ref executor) = configured {
2204                    let result = executor
2205                        .execute_with_action(tool_name, &params, &action.id)
2206                        .await;
2207                    let fall_through = matches!(&result, Err(e) if e.starts_with("unknown tool"));
2208                    if !fall_through {
2209                        if let Ok(ref value) = result {
2210                            self.result_cache
2211                                .put(tool_name, &params, value.clone())
2212                                .await;
2213                        }
2214                        return result;
2215                    }
2216                }
2217
2218                if let Some(result) = crate::agent_basics::execute(tool_name, &params).await {
2219                    if let Ok(ref value) = result {
2220                        self.result_cache
2221                            .put(tool_name, &params, value.clone())
2222                            .await;
2223                    }
2224                    return result;
2225                }
2226
2227                Err(format!("no handler for tool '{}'", tool_name))
2228            }
2229            ActionType::StateWrite => {
2230                let key = action
2231                    .parameters
2232                    .get("key")
2233                    .and_then(|v| v.as_str())
2234                    .ok_or("state_write requires 'key' parameter")?;
2235                let value = action
2236                    .parameters
2237                    .get("value")
2238                    .cloned()
2239                    .unwrap_or(Value::Null);
2240                let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2241                self.state.scoped(tenant).set(key, value, &action.id);
2242                Ok(Value::from(format!("written: {}", key)))
2243            }
2244            ActionType::StateRead => {
2245                let key = action
2246                    .parameters
2247                    .get("key")
2248                    .and_then(|v| v.as_str())
2249                    .ok_or("state_read requires 'key' parameter")?;
2250                let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2251                Ok(self.state.scoped(tenant).get(key).unwrap_or(Value::Null))
2252            }
2253            ActionType::Assertion => {
2254                let key = action
2255                    .parameters
2256                    .get("key")
2257                    .and_then(|v| v.as_str())
2258                    .ok_or("assertion requires 'key' parameter")?;
2259                let expected = action
2260                    .parameters
2261                    .get("expected")
2262                    .cloned()
2263                    .unwrap_or(Value::Null);
2264                let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2265                let actual = self.state.scoped(tenant).get(key).unwrap_or(Value::Null);
2266                if actual != expected {
2267                    Err(format!(
2268                        "assertion failed: state['{}'] = {:?}, expected {:?}",
2269                        key, actual, expected
2270                    ))
2271                } else {
2272                    Ok(serde_json::json!({"asserted": key, "value": actual}))
2273                }
2274            }
2275        }
2276    }
2277
2278    // --- Checkpoint and resume ---
2279
2280    /// Save a checkpoint of the current runtime state.
2281    pub async fn save_checkpoint(&self) -> Checkpoint {
2282        let state = self.state.snapshot();
2283        let tools: Vec<String> = self.tools.read().await.keys().cloned().collect();
2284        let log = self.log.lock().await;
2285        let events: Vec<Value> = log
2286            .events()
2287            .iter()
2288            .map(|e| serde_json::to_value(e).unwrap_or_default())
2289            .collect();
2290
2291        Checkpoint {
2292            checkpoint_id: Uuid::new_v4().to_string(),
2293            created_at: chrono::Utc::now(),
2294            state,
2295            events,
2296            tools,
2297            metadata: HashMap::new(),
2298        }
2299    }
2300
2301    /// Save checkpoint to a JSON file.
2302    pub async fn save_checkpoint_to_file(&self, path: &str) -> Result<(), String> {
2303        let checkpoint = self.save_checkpoint().await;
2304        let json = serde_json::to_string_pretty(&checkpoint)
2305            .map_err(|e| format!("serialize error: {}", e))?;
2306        tokio::fs::write(path, json)
2307            .await
2308            .map_err(|e| format!("write error: {}", e))?;
2309        Ok(())
2310    }
2311
2312    /// Load a checkpoint from a JSON file and restore state.
2313    pub async fn load_checkpoint_from_file(&self, path: &str) -> Result<Checkpoint, String> {
2314        let json = tokio::fs::read_to_string(path)
2315            .await
2316            .map_err(|e| format!("read error: {}", e))?;
2317        let checkpoint: Checkpoint =
2318            serde_json::from_str(&json).map_err(|e| format!("deserialize error: {}", e))?;
2319        self.restore_checkpoint(&checkpoint).await;
2320        Ok(checkpoint)
2321    }
2322
2323    /// Restore runtime state from a checkpoint.
2324    pub async fn restore_checkpoint(&self, checkpoint: &Checkpoint) {
2325        // Replace state completely — don't merge, don't create synthetic transitions
2326        self.state.replace_all(checkpoint.state.clone());
2327        // Clear idempotency cache — stale results from pre-checkpoint execution
2328        // must not bypass validation/policy on the restored state
2329        self.idempotency_cache.lock().await.clear();
2330        // Restore tools (as name-only schemas; full schemas are not persisted in checkpoint)
2331        let mut tools = self.tools.write().await;
2332        tools.clear();
2333        for tool_name in &checkpoint.tools {
2334            let schema = ToolSchema {
2335                name: tool_name.clone(),
2336                description: String::new(),
2337                parameters: serde_json::Value::Object(Default::default()),
2338                returns: None,
2339                idempotent: false,
2340                cache_ttl_secs: None,
2341                rate_limit: None,
2342            };
2343            tools.insert(tool_name.clone(), schema);
2344        }
2345    }
2346
2347    /// Register a subprocess tool and set up the subprocess executor.
2348    /// If no executor exists, creates a new SubprocessToolExecutor.
2349    /// If one already exists, creates a new SubprocessToolExecutor with the
2350    /// existing executor as fallback.
2351    pub async fn register_subprocess_tool(
2352        &self,
2353        name: &str,
2354        tool: crate::subprocess::SubprocessTool,
2355    ) {
2356        use crate::subprocess::SubprocessToolExecutor;
2357
2358        let schema = ToolSchema {
2359            name: name.to_string(),
2360            description: format!("Subprocess tool: {}", tool.command),
2361            parameters: serde_json::Value::Object(Default::default()),
2362            returns: None,
2363            idempotent: false,
2364            cache_ttl_secs: None,
2365            rate_limit: None,
2366        };
2367        self.register_tool_schema(schema).await;
2368
2369        let mut guard = self.tool_executor.lock().await;
2370        let mut executor = match guard.take() {
2371            Some(existing) => {
2372                let mut sub = SubprocessToolExecutor::new();
2373                sub = sub.with_fallback(existing);
2374                sub
2375            }
2376            None => SubprocessToolExecutor::new(),
2377        };
2378        executor.register(name, tool);
2379        *guard = Some(std::sync::Arc::new(executor));
2380    }
2381}
2382
2383impl Default for Runtime {
2384    fn default() -> Self {
2385        Self::new()
2386    }
2387}