Skip to main content

car_engine/
executor.rs

1//! Core execution engine — propose → validate → execute → commit.
2
3use crate::cache::ResultCache;
4use crate::capabilities::CapabilitySet;
5use crate::checkpoint::Checkpoint;
6use crate::rate_limit::{RateLimit, RateLimiter};
7use car_eventlog::{EventKind, EventLog, SpanStatus};
8use car_ir::{
9    build_dag, Action, ActionProposal, ActionResult, ActionStatus, ActionType, CostSummary,
10    FailureBehavior, ProposalResult, ToolSchema,
11};
12use car_policy::PolicyEngine;
13use car_state::StateStore;
14use car_validator::validate_action;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
20use tokio::time::timeout;
21use tracing::instrument;
22use uuid::Uuid;
23
24/// Retry backoff constants.
25const RETRY_BASE_DELAY_MS: u64 = 100;
26const RETRY_BACKOFF_FACTOR: u64 = 2;
27
28// ---------------------------------------------------------------------------
29// Replan types — failure recovery via model callback
30// ---------------------------------------------------------------------------
31
32/// Callback trait for replanning failed proposals.
33/// Implement this to let the runtime ask the model for an alternative plan
34/// when a proposal aborts.
35#[async_trait::async_trait]
36pub trait ReplanCallback: Send + Sync {
37    async fn replan(&self, ctx: &ReplanContext) -> Result<ActionProposal, String>;
38}
39
40/// Context provided to the replan callback so the model can generate an alternative.
41#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42pub struct ReplanContext {
43    /// Original proposal ID.
44    pub proposal_id: String,
45    /// Which replan attempt this is (1-indexed; attempt 1 = first replan after initial failure).
46    pub attempt: u32,
47    /// Actions that failed and caused the abort.
48    pub failed_actions: Vec<FailedActionSummary>,
49    /// Action IDs that succeeded before the abort (now rolled back).
50    pub completed_action_ids: Vec<String>,
51    /// State snapshot after rollback.
52    pub state_snapshot: HashMap<String, Value>,
53    /// How many replans remain.
54    pub replans_remaining: u32,
55    /// Original proposal source (model name, agent, etc.).
56    pub original_source: String,
57    /// Total actions in the original proposal.
58    pub original_action_count: usize,
59    /// The original goal/context from the proposal (for generating alternatives).
60    pub original_context: HashMap<String, Value>,
61}
62
63/// Summary of a failed action, included in ReplanContext.
64#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
65pub struct FailedActionSummary {
66    pub action_id: String,
67    pub tool: Option<String>,
68    pub error: String,
69    pub parameters: HashMap<String, Value>,
70}
71
72/// Configuration for the replan loop.
73#[derive(Debug, Clone)]
74pub struct ReplanConfig {
75    /// Maximum number of replan attempts. 0 = disabled (default).
76    pub max_replans: u32,
77    /// Delay in milliseconds between replan attempts. Prevents burning through
78    /// attempts instantly if the model returns garbage fast. 0 = no delay.
79    pub delay_ms: u64,
80    /// If true, replan proposals are scored via car-planner's verify() before
81    /// execution. Proposals with errors are rejected without executing.
82    /// Prevents the engine from running a worse plan than the one that failed.
83    pub verify_before_execute: bool,
84}
85
86impl Default for ReplanConfig {
87    fn default() -> Self {
88        Self {
89            max_replans: 0,
90            delay_ms: 0,
91            verify_before_execute: true,
92        }
93    }
94}
95
96/// Trait for tool execution. Implement this to provide tools to the runtime.
97///
98/// In-process: implement directly with function calls.
99/// Daemon mode: implement by sending JSON-RPC to the client.
100#[async_trait::async_trait]
101pub trait ToolExecutor: Send + Sync {
102    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String>;
103}
104
105/// Deterministic key for idempotency deduplication.
106fn idempotency_key(action: &Action) -> String {
107    let sorted: std::collections::BTreeMap<_, _> = action.parameters.iter().collect();
108    let params = serde_json::to_string(&sorted).unwrap_or_default();
109    format!(
110        "{}:{}:{}",
111        serde_json::to_string(&action.action_type).unwrap_or_default(),
112        action.tool.as_deref().unwrap_or(""),
113        params
114    )
115}
116
117fn rejected_result(action_id: &str, error: String) -> ActionResult {
118    ActionResult {
119        action_id: action_id.to_string(),
120        status: ActionStatus::Rejected,
121        output: None,
122        error: Some(error),
123        state_changes: HashMap::new(),
124        duration_ms: None,
125        timestamp: chrono::Utc::now(),
126    }
127}
128
129/// Capture only the state keys relevant to an action (state_dependencies + expected_effects).
130/// Returns an empty map if the action declares no relevant keys (e.g., tool calls
131/// that don't interact with state).
132fn snapshot_relevant_keys(
133    state: &car_state::StateStore,
134    action: &Action,
135) -> HashMap<String, Value> {
136    let mut keys: std::collections::HashSet<&str> = std::collections::HashSet::new();
137    for dep in &action.state_dependencies {
138        keys.insert(dep.as_str());
139    }
140    for key in action.expected_effects.keys() {
141        keys.insert(key.as_str());
142    }
143    // For state_write actions, capture the key being written
144    if action.action_type == ActionType::StateWrite {
145        if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
146            keys.insert(key);
147        }
148    }
149
150    if keys.is_empty() {
151        // No declared state interaction — skip snapshot (empty map)
152        return HashMap::new();
153    }
154
155    keys.iter()
156        .filter_map(|&k| state.get(k).map(|v| (k.to_string(), v)))
157        .collect()
158}
159
160fn skipped_result(action_id: &str, reason: &str) -> ActionResult {
161    ActionResult {
162        action_id: action_id.to_string(),
163        status: ActionStatus::Skipped,
164        output: None,
165        error: Some(reason.to_string()),
166        state_changes: HashMap::new(),
167        duration_ms: None,
168        timestamp: chrono::Utc::now(),
169    }
170}
171
172/// Prefix on the `error` field of an `ActionResult` that distinguishes
173/// "the user pulled the plug" from "earlier abort cascaded." The
174/// `ActionStatus` itself is `Skipped` in both cases (introducing a
175/// new variant ripples through every IR consumer + FFI binding); the
176/// prefix lets callers like the A2A bridge tell the cases apart
177/// without string-matching a magic literal.
178pub const CANCELED_PREFIX: &str = "canceled: ";
179
180/// Result for an action that didn't run because the proposal was
181/// cancelled mid-flight. Distinct from `Skipped` so callers can tell
182/// "didn't run because of an earlier abort" from "didn't run because
183/// the user pulled the plug."
184fn canceled_result(action_id: &str, reason: &str) -> ActionResult {
185    ActionResult {
186        action_id: action_id.to_string(),
187        status: ActionStatus::Skipped,
188        output: None,
189        error: Some(format!("{}{}", CANCELED_PREFIX, reason)),
190        state_changes: HashMap::new(),
191        duration_ms: None,
192        timestamp: chrono::Utc::now(),
193    }
194}
195
196/// Format a tool result for feeding back to a model.
197pub fn format_tool_result(result: &ActionResult) -> String {
198    match result.status {
199        ActionStatus::Succeeded => match &result.output {
200            Some(v) => serde_json::to_string(v).unwrap_or_else(|_| v.to_string()),
201            None => String::new(),
202        },
203        ActionStatus::Rejected => format!("[REJECTED] {}", result.error.as_deref().unwrap_or("")),
204        ActionStatus::Failed => format!("[FAILED] {}", result.error.as_deref().unwrap_or("")),
205        _ => format!(
206            "[{:?}] {}",
207            result.status,
208            result.error.as_deref().unwrap_or("")
209        ),
210    }
211}
212
213/// Budget constraints for proposal execution.
214#[derive(Debug, Clone)]
215pub struct CostBudget {
216    pub max_tool_calls: Option<u32>,
217    pub max_duration_ms: Option<f64>,
218    pub max_actions: Option<u32>,
219}
220
221/// Common Agent Runtime — deterministic execution layer.
222///
223/// Lock ordering discipline (never hold multiple simultaneously, never hold sync locks across .await):
224/// 1. capabilities (RwLock, read-only during execution)
225/// 2. tools (RwLock, read-only during execution)
226/// 3. policies (RwLock, read-only during execution)
227/// 4. session_policies (RwLock to find the per-session engine, then read inner Arc)
228/// 5. cost_budget (RwLock, read-only during execution)
229/// 6. log (TokioMutex, acquired/released per event)
230/// 7. tool_executor (TokioMutex, clone Arc and drop before await)
231/// 8. idempotency_cache (TokioMutex, acquired/released per check)
232///
233/// StateStore uses parking_lot::Mutex (sync) — NEVER hold across .await points.
234pub struct Runtime {
235    pub state: Arc<StateStore>,
236    pub tools: Arc<TokioRwLock<HashMap<String, ToolSchema>>>,
237    pub policies: Arc<TokioRwLock<PolicyEngine>>,
238    /// Per-session policy registries. Hosts that multiplex multiple
239    /// concurrent agent sessions over a single `Runtime` (IDE-style
240    /// frontends with per-project rules, multi-tenant servers) call
241    /// [`Runtime::open_session`] to mint an id, then
242    /// [`Runtime::register_policy_in_session`] to attach session-scoped
243    /// rules. Validation under that session walks the global registry
244    /// AND the session's — both must pass. Sessions can deny what
245    /// global allows; sessions cannot allow what global denies.
246    /// Closing a session drops its registry and any closures it holds.
247    /// See `docs/proposals/per-session-policy-scoping.md`.
248    pub session_policies: Arc<TokioRwLock<HashMap<String, Arc<TokioRwLock<PolicyEngine>>>>>,
249    pub log: Arc<TokioMutex<EventLog>>,
250    pub rate_limiter: Arc<RateLimiter>,
251    pub result_cache: Arc<ResultCache>,
252    tool_executor: TokioMutex<Option<Arc<dyn ToolExecutor>>>,
253    idempotency_cache: TokioMutex<HashMap<String, ActionResult>>,
254    cost_budget: TokioRwLock<Option<CostBudget>>,
255    capabilities: TokioRwLock<Option<CapabilitySet>>,
256    inference_engine: Option<Arc<car_inference::InferenceEngine>>,
257    /// Optional memgine for skill learning (auto-distillation after execution).
258    memgine: Option<Arc<TokioMutex<car_memgine::MemgineEngine>>>,
259    /// Whether to auto-distill skills after each proposal execution.
260    auto_distill: bool,
261    /// Optional trajectory store for persisting execution traces.
262    trajectory_store: Option<Arc<car_memgine::TrajectoryStore>>,
263    /// Optional replan callback for failure recovery.
264    replan_callback: TokioMutex<Option<Arc<dyn ReplanCallback>>>,
265    /// Replan configuration.
266    replan_config: TokioRwLock<ReplanConfig>,
267    /// Canonical tool registry (optional — new code should use this).
268    pub registry: Arc<crate::registry::ToolRegistry>,
269}
270
271impl Runtime {
272    pub fn new() -> Self {
273        Self {
274            state: Arc::new(StateStore::new()),
275            tools: Arc::new(TokioRwLock::new(HashMap::new())),
276            policies: Arc::new(TokioRwLock::new(PolicyEngine::new())),
277            session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
278            log: Arc::new(TokioMutex::new(EventLog::new())),
279            rate_limiter: Arc::new(RateLimiter::new()),
280            result_cache: Arc::new(ResultCache::new()),
281            tool_executor: TokioMutex::new(None),
282            idempotency_cache: TokioMutex::new(HashMap::new()),
283            cost_budget: TokioRwLock::new(None),
284            capabilities: TokioRwLock::new(None),
285            inference_engine: None,
286            memgine: None,
287            auto_distill: false,
288            trajectory_store: None,
289            replan_callback: TokioMutex::new(None),
290            replan_config: TokioRwLock::new(ReplanConfig::default()),
291            registry: Arc::new(crate::registry::ToolRegistry::new()),
292        }
293    }
294
295    /// Create a runtime with shared state, event log, and policies.
296    /// Each runtime gets its own tool set, executor, and idempotency cache.
297    pub fn with_shared(
298        state: Arc<StateStore>,
299        log: Arc<TokioMutex<EventLog>>,
300        policies: Arc<TokioRwLock<PolicyEngine>>,
301    ) -> Self {
302        Self {
303            state,
304            tools: Arc::new(TokioRwLock::new(HashMap::new())),
305            policies,
306            // Session-policy registries are per-runtime — sharing them
307            // across embedders that share global policies would defeat
308            // the isolation point. Hosts that genuinely want shared
309            // sessions should drive them through one shared Runtime.
310            session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
311            log,
312            rate_limiter: Arc::new(RateLimiter::new()),
313            result_cache: Arc::new(ResultCache::new()),
314            tool_executor: TokioMutex::new(None),
315            idempotency_cache: TokioMutex::new(HashMap::new()),
316            cost_budget: TokioRwLock::new(None),
317            capabilities: TokioRwLock::new(None),
318            inference_engine: None,
319            memgine: None,
320            auto_distill: false,
321            trajectory_store: None,
322            replan_callback: TokioMutex::new(None),
323            replan_config: TokioRwLock::new(ReplanConfig::default()),
324            registry: Arc::new(crate::registry::ToolRegistry::new()),
325        }
326    }
327
328    // ─── Session policy lifecycle ───────────────────────────────────
329    //
330    // Per-session policy scoping. Policies registered against a
331    // session apply to proposals executed under that session id (via
332    // [`Self::execute_with_session`] / [`Self::execute_with_session_and_cancel`]).
333    // Policies registered globally always apply, on top of any
334    // session-scoped layer. See `docs/proposals/per-session-policy-scoping.md`.
335
336    /// Mint a new session id and pre-register an empty policy engine
337    /// under it. Hosts call this once per concurrent agent context
338    /// (an IDE project window, a multi-tenant client, etc.) and pair
339    /// it with [`Self::close_session`] when the context ends.
340    ///
341    /// Returns the opaque id to pass to subsequent
342    /// [`Self::register_policy_in_session`] / [`Self::execute_with_session`]
343    /// calls. Ids are UUIDs so collisions across concurrent calls
344    /// don't matter.
345    pub async fn open_session(&self) -> String {
346        let id = Uuid::new_v4().to_string();
347        let mut sessions = self.session_policies.write().await;
348        sessions.insert(id.clone(), Arc::new(TokioRwLock::new(PolicyEngine::new())));
349        id
350    }
351
352    /// Drop the session and every policy scoped to it. Returns true
353    /// if a session by that id existed; false if it didn't (already
354    /// closed, never opened, etc.). Idempotent in effect — closing a
355    /// missing session is a no-op the caller is free to ignore.
356    pub async fn close_session(&self, session_id: &str) -> bool {
357        let mut sessions = self.session_policies.write().await;
358        sessions.remove(session_id).is_some()
359    }
360
361    /// Register a policy under a specific session id. The policy
362    /// applies only when a proposal is executed under that session;
363    /// proposals executed without a session (the default) only see
364    /// global policies.
365    ///
366    /// Returns `Err(...)` if the session is unknown — callers either
367    /// forgot to call [`Self::open_session`] or are using a
368    /// stale/closed id.
369    pub async fn register_policy_in_session(
370        &self,
371        session_id: &str,
372        name: &str,
373        check: car_policy::PolicyCheck,
374        description: &str,
375    ) -> Result<(), String> {
376        let engine = {
377            let sessions = self.session_policies.read().await;
378            sessions
379                .get(session_id)
380                .cloned()
381                .ok_or_else(|| format!("unknown session id '{session_id}'"))?
382        };
383        let mut engine = engine.write().await;
384        engine.register(name, check, description);
385        Ok(())
386    }
387
388    /// True if a session with this id is currently open. Mostly for
389    /// tests and FFI surface validation — production code should
390    /// trust the id it just opened.
391    pub async fn session_exists(&self, session_id: &str) -> bool {
392        self.session_policies.read().await.contains_key(session_id)
393    }
394
395    /// Attach a local inference engine. Registers `infer`, `embed`, `classify`
396    /// as built-in tools with real implementations.
397    pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
398        self.inference_engine = Some(engine);
399        // Register inference tool schemas (non-async init, use try_lock)
400        if let Ok(mut tools) = self.tools.try_write() {
401            for schema in car_inference::service::all_schemas() {
402                tools.insert(schema.name.clone(), schema);
403            }
404        }
405        self
406    }
407
408    /// Attach a memgine for automatic skill learning after execution.
409    /// When `auto_distill` is true, execution traces are automatically distilled
410    /// into skills and domains are evolved when underperforming.
411    pub fn with_learning(
412        mut self,
413        memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>,
414        auto_distill: bool,
415    ) -> Self {
416        self.memgine = Some(memgine);
417        self.auto_distill = auto_distill;
418        self
419    }
420
421    /// Attach a memgine with auto-distillation enabled (recommended default).
422    pub fn with_memgine(self, memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>) -> Self {
423        self.with_learning(memgine, true)
424    }
425
426    /// Attach a trajectory store for persisting execution traces.
427    pub fn with_trajectory_store(mut self, store: Arc<car_memgine::TrajectoryStore>) -> Self {
428        self.trajectory_store = Some(store);
429        self
430    }
431
432    pub fn with_executor(self, executor: Arc<dyn ToolExecutor>) -> Self {
433        // Use try_lock for non-async init context. Safe because we just created the mutex.
434        if let Ok(mut guard) = self.tool_executor.try_lock() {
435            *guard = Some(executor);
436        }
437        self
438    }
439
440    /// Set a tool executor for the next execute() call.
441    /// Used by NAPI bindings where executor varies per call.
442    pub async fn set_executor(&self, executor: Arc<dyn ToolExecutor>) {
443        *self.tool_executor.lock().await = Some(executor);
444    }
445
446    pub fn with_event_log(mut self, log: EventLog) -> Self {
447        self.log = Arc::new(TokioMutex::new(log));
448        self
449    }
450
451    /// Attach a replan callback for failure recovery (builder).
452    pub fn with_replan(self, callback: Arc<dyn ReplanCallback>, config: ReplanConfig) -> Self {
453        if let Ok(mut guard) = self.replan_callback.try_lock() {
454            *guard = Some(callback);
455        }
456        if let Ok(mut guard) = self.replan_config.try_write() {
457            *guard = config;
458        }
459        self
460    }
461
462    /// Set a replan callback at runtime.
463    pub async fn set_replan_callback(&self, callback: Arc<dyn ReplanCallback>) {
464        *self.replan_callback.lock().await = Some(callback);
465    }
466
467    /// Set replan configuration at runtime.
468    pub async fn set_replan_config(&self, config: ReplanConfig) {
469        *self.replan_config.write().await = config;
470    }
471
472    /// Register a tool with just a name (backward compatible).
473    pub async fn register_tool(&self, name: &str) {
474        let schema = ToolSchema {
475            name: name.to_string(),
476            description: String::new(),
477            parameters: serde_json::Value::Object(Default::default()),
478            returns: None,
479            idempotent: false,
480            cache_ttl_secs: None,
481            rate_limit: None,
482        };
483        self.register_tool_schema(schema).await;
484    }
485
486    /// Register a tool with full schema.
487    pub async fn register_tool_schema(&self, schema: ToolSchema) {
488        // Auto-configure cache if schema specifies it
489        if let Some(ttl) = schema.cache_ttl_secs {
490            self.result_cache.enable_caching(&schema.name, ttl).await;
491        }
492        // Auto-configure rate limit if schema specifies it
493        if let Some(ref rl) = schema.rate_limit {
494            self.rate_limiter
495                .set_limit(
496                    &schema.name,
497                    RateLimit {
498                        max_calls: rl.max_calls,
499                        interval_secs: rl.interval_secs,
500                    },
501                )
502                .await;
503        }
504        self.tools.write().await.insert(schema.name.clone(), schema);
505    }
506
507    /// Register a tool via the canonical registry.
508    /// This is the preferred way to register tools — it updates both the
509    /// registry and the legacy tools HashMap for backward compatibility.
510    pub async fn register_tool_entry(&self, entry: crate::registry::ToolEntry) {
511        let schema = entry.schema.clone();
512        self.registry.register(entry).await;
513        self.register_tool_schema(schema).await;
514    }
515
516    /// Register CAR's built-in agent utility stdlib.
517    ///
518    /// This is an opt-in convenience layer for common local-file and text tools.
519    /// Existing runtimes remain unchanged until this is called.
520    pub async fn register_agent_basics(&self) {
521        for entry in crate::agent_basics::entries() {
522            self.register_tool_entry(entry).await;
523        }
524    }
525
526    /// Get all registered tool schemas (for model prompt generation).
527    pub async fn tool_schemas(&self) -> Vec<ToolSchema> {
528        self.tools.read().await.values().cloned().collect()
529    }
530
531    /// Set a cost budget that limits proposal execution.
532    pub async fn set_cost_budget(&self, budget: CostBudget) {
533        *self.cost_budget.write().await = Some(budget);
534    }
535
536    /// Set per-agent capability permissions that restrict tools, state keys, and action count.
537    pub async fn set_capabilities(&self, caps: CapabilitySet) {
538        *self.capabilities.write().await = Some(caps);
539    }
540
541    /// Set a per-tool rate limit (token bucket).
542    ///
543    /// `max_calls` tokens are available per `interval_secs` window.
544    /// When the bucket is empty, `dispatch()` applies backpressure by
545    /// waiting until a token refills.
546    pub async fn set_rate_limit(&self, tool: &str, max_calls: u32, interval_secs: f64) {
547        self.rate_limiter
548            .set_limit(
549                tool,
550                RateLimit {
551                    max_calls,
552                    interval_secs,
553                },
554            )
555            .await;
556    }
557
558    /// Enable cross-proposal result caching for a tool with a TTL in seconds.
559    pub async fn enable_tool_cache(&self, tool: &str, ttl_secs: u64) {
560        self.result_cache.enable_caching(tool, ttl_secs).await;
561    }
562
563    /// Execute a proposal with automatic replanning on failure.
564    ///
565    /// If a `ReplanCallback` is registered and `max_replans > 0`, the runtime
566    /// will catch abort failures, roll back state, ask the model for an
567    /// alternative proposal via the callback, and re-execute. This transforms
568    /// "execute-and-hope" into "execute-and-recover."
569    ///
570    /// If no callback is registered or `max_replans == 0`, behaves identically
571    /// to a single `execute_inner()` call (zero overhead, fully backward compatible).
572    #[instrument(
573        name = "proposal.execute",
574        skip_all,
575        fields(
576            proposal_id = %proposal.id,
577            action_count = proposal.actions.len(),
578        )
579    )]
580    pub async fn execute(&self, proposal: &ActionProposal) -> ProposalResult {
581        // Forward to the cancel-aware variant with a never-cancelled
582        // token. Existing callers see no behaviour change.
583        let token = tokio_util::sync::CancellationToken::new();
584        self.execute_with_cancel(proposal, &token).await
585    }
586
587    /// Execute a proposal scoped to a specific session id.
588    ///
589    /// Validation walks the global policy registry plus the session's
590    /// own registry — both must pass for an action to run. Session
591    /// policies can deny what global allows; they cannot allow what
592    /// global denies (validation is conjunctive).
593    ///
594    /// Returns the same [`ProposalResult`] shape as [`Self::execute`].
595    /// Errors with an action-level rejection if the session id is
596    /// unknown — callers should check via [`Self::session_exists`] or
597    /// trust an id they minted via [`Self::open_session`].
598    pub async fn execute_with_session(
599        &self,
600        proposal: &ActionProposal,
601        session_id: &str,
602    ) -> ProposalResult {
603        let token = tokio_util::sync::CancellationToken::new();
604        self.execute_with_session_and_cancel(proposal, session_id, &token).await
605    }
606
607    /// Combined session-scoped + cancellable execute. The session id
608    /// is passed verbatim to the per-action policy check; the cancel
609    /// token behaves identically to [`Self::execute_with_cancel`].
610    pub async fn execute_with_session_and_cancel(
611        &self,
612        proposal: &ActionProposal,
613        session_id: &str,
614        cancel: &tokio_util::sync::CancellationToken,
615    ) -> ProposalResult {
616        self.execute_with_optional_session(proposal, Some(session_id), None, cancel)
617            .await
618    }
619
620    /// Execute a proposal with cooperative cancellation.
621    ///
622    /// The runtime checks `token.is_cancelled()` at each DAG level
623    /// boundary. When set, every action that hadn't yet started runs
624    /// is reported as `Skipped` with `error = "canceled: ..."` so
625    /// callers can distinguish "user pulled the plug" from "earlier
626    /// abort cascaded." Actions already in flight continue to
627    /// completion — tool calls dispatched to user-provided executors
628    /// can't be safely interrupted from the engine.
629    ///
630    /// The CAR A2A bridge uses this so `tasks/cancel` produces a
631    /// `ProposalResult` with clean partial state rather than relying
632    /// on `JoinHandle::abort` to interrupt mid-await (which leaves
633    /// no record of which actions actually ran).
634    ///
635    /// **FFI exposure:** this method is intentionally not surfaced
636    /// through the NAPI / PyO3 / `car-server-core` JSON-RPC bindings.
637    /// Those consumers (Node, Python, WebSocket) don't currently
638    /// expose long-running async-task surfaces that need
639    /// cancellation; the bridge is the lone consumer. When a binding
640    /// gains a long-running task surface, the path is clear: add a
641    /// per-binding token registry keyed by some caller-provided id,
642    /// expose `cancelExecution(id)` / `cancel_execution(id)` /
643    /// `proposal.cancel { id }`, and have the runtime call
644    /// `execute_with_cancel` with the matching token. Skipping that
645    /// today avoids speculative API surface that bloats bindings
646    /// without a consumer.
647    pub async fn execute_with_cancel(
648        &self,
649        proposal: &ActionProposal,
650        cancel: &tokio_util::sync::CancellationToken,
651    ) -> ProposalResult {
652        self.execute_with_optional_session(proposal, None, None, cancel)
653            .await
654    }
655
656    /// Execute a proposal with an attached [`RuntimeScope`]
657    /// (Parslee-ai/car#187 phase 3).
658    ///
659    /// Same contract as [`Self::execute`] plus a per-execution
660    /// identity surface — typically built by the car-a2a dispatcher
661    /// from the verified `Identity` and cooperative `a2a_caller`
662    /// metadata on the inbound `ActionProposal`. The scope is
663    /// recorded on the event log so downstream audit / log analysis
664    /// can see which caller / tenant issued each action.
665    ///
666    /// **What this enforces today**: scope is captured + logged.
667    /// Memgine queries and state-store ops still hit global
668    /// namespaces — those follow-ups are tracked under #187.
669    /// Tool / policy code that needs per-tenant behaviour right now
670    /// should keep reading `proposal.context["a2a_caller_verified"]`
671    /// directly (the phase 1 / 2 surface).
672    pub async fn execute_scoped(
673        &self,
674        proposal: &ActionProposal,
675        scope: &crate::scope::RuntimeScope,
676    ) -> ProposalResult {
677        let token = tokio_util::sync::CancellationToken::new();
678        self.execute_scoped_with_cancel(proposal, scope, &token).await
679    }
680
681    /// Combined scoped + cancellable execute. Mirrors the shape of
682    /// [`Self::execute_with_session_and_cancel`] for symmetry — both
683    /// add a side-channel (session id / scope) on top of the
684    /// cancellable form.
685    pub async fn execute_scoped_with_cancel(
686        &self,
687        proposal: &ActionProposal,
688        scope: &crate::scope::RuntimeScope,
689        cancel: &tokio_util::sync::CancellationToken,
690    ) -> ProposalResult {
691        self.execute_with_optional_session(proposal, None, Some(scope), cancel)
692            .await
693    }
694
695    /// Internal entry point that backs both
696    /// [`Self::execute_with_cancel`] (no session) and
697    /// [`Self::execute_with_session_and_cancel`]. Holds the replan
698    /// loop and threads `session_id` into the per-action validation
699    /// path so session-scoped policies stack on top of global ones.
700    async fn execute_with_optional_session(
701        &self,
702        proposal: &ActionProposal,
703        session_id: Option<&str>,
704        scope: Option<&crate::scope::RuntimeScope>,
705        cancel: &tokio_util::sync::CancellationToken,
706    ) -> ProposalResult {
707        let config = self.replan_config.read().await.clone();
708        let mut current_proposal = proposal.clone();
709        let mut attempt: u32 = 0;
710
711        // Phase 3 foundation (Parslee-ai/car#187): record the scope
712        // on the event log so audit / log analysis can correlate
713        // actions to the caller / tenant that triggered them. Only
714        // logged when at least one identity field is set — keeps
715        // the existing in-process call sites free of noise.
716        if let Some(s) = scope {
717            if !s.is_unscoped() {
718                let mut props: HashMap<String, Value> = HashMap::new();
719                if let Some(cid) = &s.caller_id {
720                    props.insert("caller_id".to_string(), Value::from(cid.as_str()));
721                }
722                if let Some(tid) = &s.tenant_id {
723                    props.insert("tenant_id".to_string(), Value::from(tid.as_str()));
724                }
725                if !s.claims.is_empty() {
726                    if let Ok(claims_json) = serde_json::to_value(&s.claims) {
727                        props.insert("claims".to_string(), claims_json);
728                    }
729                }
730                let mut log = self.log.lock().await;
731                log.append(
732                    EventKind::SessionScope,
733                    None,
734                    Some(&proposal.id),
735                    props,
736                );
737            }
738        }
739
740        loop {
741            let (result, state_before_map) = self
742                .execute_inner_with_cancel(&current_proposal, Some(cancel), session_id, scope)
743                .await;
744
745            // Check if we aborted
746            let aborted = result
747                .results
748                .iter()
749                .any(|r| r.status == ActionStatus::Failed);
750            if !aborted || attempt >= config.max_replans {
751                if aborted && attempt > 0 {
752                    // Exhausted all replan attempts
753                    let mut log = self.log.lock().await;
754                    log.append(
755                        EventKind::ReplanExhausted,
756                        None,
757                        Some(&proposal.id),
758                        [("attempts".to_string(), Value::from(attempt))].into(),
759                    );
760                }
761
762                // Persist trajectory
763                let outcome = if !aborted {
764                    if attempt > 0 {
765                        car_memgine::TrajectoryOutcome::ReplanSuccess
766                    } else {
767                        car_memgine::TrajectoryOutcome::Success
768                    }
769                } else if attempt > 0 {
770                    car_memgine::TrajectoryOutcome::ReplanExhausted
771                } else {
772                    car_memgine::TrajectoryOutcome::Failed
773                };
774                if let Some(err) = self.persist_trajectory(
775                    proposal,
776                    &current_proposal,
777                    &result,
778                    outcome,
779                    attempt,
780                    &state_before_map,
781                ) {
782                    let mut log = self.log.lock().await;
783                    log.append(
784                        EventKind::ActionFailed,
785                        None,
786                        Some(&proposal.id),
787                        [(
788                            "trajectory_persist_error".to_string(),
789                            Value::from(err.as_str()),
790                        )]
791                        .into(),
792                    );
793                }
794
795                return result;
796            }
797
798            // Get replan callback (clone Arc, drop lock immediately)
799            let callback = {
800                let guard = self.replan_callback.lock().await;
801                guard.clone()
802            };
803            let Some(callback) = callback else {
804                // No callback registered — persist trajectory and return
805                if let Some(err) = self.persist_trajectory(
806                    proposal,
807                    &current_proposal,
808                    &result,
809                    car_memgine::TrajectoryOutcome::Failed,
810                    attempt,
811                    &state_before_map,
812                ) {
813                    let mut log = self.log.lock().await;
814                    log.append(
815                        EventKind::ActionFailed,
816                        None,
817                        Some(&proposal.id),
818                        [(
819                            "trajectory_persist_error".to_string(),
820                            Value::from(err.as_str()),
821                        )]
822                        .into(),
823                    );
824                }
825                return result;
826            };
827
828            // Build replan context
829            let failed_actions: Vec<FailedActionSummary> = result
830                .results
831                .iter()
832                .filter(|r| r.status == ActionStatus::Failed)
833                .map(|r| {
834                    let action = current_proposal
835                        .actions
836                        .iter()
837                        .find(|a| a.id == r.action_id);
838                    FailedActionSummary {
839                        action_id: r.action_id.clone(),
840                        tool: action.and_then(|a| a.tool.clone()),
841                        error: r.error.clone().unwrap_or_default(),
842                        parameters: action.map(|a| a.parameters.clone()).unwrap_or_default(),
843                    }
844                })
845                .collect();
846
847            let completed_action_ids: Vec<String> = result
848                .results
849                .iter()
850                .filter(|r| r.status == ActionStatus::Succeeded)
851                .map(|r| r.action_id.clone())
852                .collect();
853
854            let ctx = ReplanContext {
855                proposal_id: proposal.id.clone(),
856                attempt: attempt + 1,
857                failed_actions,
858                completed_action_ids,
859                state_snapshot: self.state.snapshot(),
860                replans_remaining: config.max_replans.saturating_sub(attempt + 1),
861                original_source: proposal.source.clone(),
862                original_action_count: proposal.actions.len(),
863                original_context: proposal.context.clone(),
864            };
865
866            // Backoff delay between replan attempts
867            if config.delay_ms > 0 {
868                tokio::time::sleep(Duration::from_millis(config.delay_ms)).await;
869            }
870
871            // Log replan attempt
872            {
873                let mut log = self.log.lock().await;
874                log.append(
875                    EventKind::ReplanAttempted,
876                    None,
877                    Some(&proposal.id),
878                    [
879                        ("attempt".to_string(), Value::from(attempt + 1)),
880                        (
881                            "failed_count".to_string(),
882                            Value::from(ctx.failed_actions.len()),
883                        ),
884                    ]
885                    .into(),
886                );
887            }
888
889            // Call the model for a new plan
890            match callback.replan(&ctx).await {
891                Ok(new_proposal) => {
892                    // Quality gate: verify replan proposal before executing
893                    if config.verify_before_execute {
894                        let tools_guard = self.tools.read().await;
895                        let tool_names: std::collections::HashSet<String> =
896                            tools_guard.keys().cloned().collect();
897                        drop(tools_guard);
898
899                        let current_state = self.state.snapshot();
900                        let vr = car_verify::verify(
901                            &new_proposal,
902                            Some(&current_state),
903                            Some(&tool_names),
904                            100,
905                        );
906                        if !vr.valid {
907                            let error_msgs: Vec<String> = vr
908                                .issues
909                                .iter()
910                                .filter(|i| i.severity == "error")
911                                .map(|i| i.message.clone())
912                                .collect();
913                            let mut log = self.log.lock().await;
914                            log.append(
915                                EventKind::ReplanRejected,
916                                None,
917                                Some(&proposal.id),
918                                [
919                                    ("errors".to_string(), Value::from(error_msgs.join("; "))),
920                                    ("attempt".to_string(), Value::from(attempt + 1)),
921                                ]
922                                .into(),
923                            );
924                            // Don't execute a broken replan — count as failed attempt
925                            attempt += 1;
926                            continue;
927                        }
928                    }
929
930                    // Log accepted proposal
931                    {
932                        let mut log = self.log.lock().await;
933                        log.append(
934                            EventKind::ReplanProposalReceived,
935                            None,
936                            Some(&proposal.id),
937                            [
938                                ("attempt".to_string(), Value::from(attempt + 1)),
939                                (
940                                    "new_action_count".to_string(),
941                                    Value::from(new_proposal.actions.len()),
942                                ),
943                            ]
944                            .into(),
945                        );
946                    }
947                    current_proposal = new_proposal;
948                    attempt += 1;
949                }
950                Err(e) => {
951                    // Replan callback itself failed — log and return original failure
952                    let mut log = self.log.lock().await;
953                    log.append(
954                        EventKind::ReplanExhausted,
955                        None,
956                        Some(&proposal.id),
957                        [
958                            ("reason".to_string(), Value::from("callback_error")),
959                            ("error".to_string(), Value::from(e.as_str())),
960                            ("attempt".to_string(), Value::from(attempt + 1)),
961                        ]
962                        .into(),
963                    );
964                    if let Some(err) = self.persist_trajectory(
965                        proposal,
966                        &current_proposal,
967                        &result,
968                        car_memgine::TrajectoryOutcome::Failed,
969                        attempt,
970                        &state_before_map,
971                    ) {
972                        log.append(
973                            EventKind::ActionFailed,
974                            None,
975                            Some(&proposal.id),
976                            [(
977                                "trajectory_persist_error".to_string(),
978                                Value::from(err.as_str()),
979                            )]
980                            .into(),
981                        );
982                    }
983                    return result;
984                }
985            }
986        }
987    }
988
989    /// Persist a trajectory to the store if configured.
990    fn persist_trajectory(
991        &self,
992        proposal: &ActionProposal,
993        current_proposal: &ActionProposal,
994        result: &ProposalResult,
995        outcome: car_memgine::TrajectoryOutcome,
996        attempt: u32,
997        state_before_map: &HashMap<String, HashMap<String, Value>>,
998    ) -> Option<String> {
999        let store = self.trajectory_store.as_ref()?;
1000
1001        let trace_events: Vec<car_memgine::TraceEvent> = result
1002            .results
1003            .iter()
1004            .map(|r| {
1005                let kind = match r.status {
1006                    ActionStatus::Succeeded => "action_succeeded",
1007                    ActionStatus::Failed => "action_failed",
1008                    ActionStatus::Rejected => "action_rejected",
1009                    ActionStatus::Skipped => "action_skipped",
1010                    _ => "unknown",
1011                };
1012                let tool = current_proposal
1013                    .actions
1014                    .iter()
1015                    .find(|a| a.id == r.action_id)
1016                    .and_then(|a| a.tool.clone());
1017                let reward = match r.status {
1018                    ActionStatus::Succeeded => Some(1.0),
1019                    ActionStatus::Failed => Some(0.0),
1020                    ActionStatus::Rejected => Some(0.0),
1021                    ActionStatus::Skipped => None,
1022                    _ => None,
1023                };
1024                car_memgine::TraceEvent {
1025                    kind: kind.to_string(),
1026                    action_id: Some(r.action_id.clone()),
1027                    tool,
1028                    data: r
1029                        .error
1030                        .as_ref()
1031                        .map(|e| serde_json::json!({"error": e}))
1032                        .unwrap_or(serde_json::json!({})),
1033                    duration_ms: r.duration_ms,
1034                    state_before: state_before_map.get(&r.action_id).cloned(),
1035                    state_after: if !r.state_changes.is_empty() {
1036                        Some(r.state_changes.clone())
1037                    } else {
1038                        None
1039                    },
1040                    reward,
1041                }
1042            })
1043            .collect();
1044
1045        let trajectory = car_memgine::Trajectory {
1046            proposal_id: proposal.id.clone(),
1047            source: proposal.source.clone(),
1048            action_count: current_proposal.actions.len(),
1049            events: trace_events,
1050            outcome,
1051            timestamp: chrono::Utc::now(),
1052            duration_ms: result.cost.total_duration_ms,
1053            replan_attempts: attempt,
1054        };
1055
1056        match store.append(&trajectory) {
1057            Ok(()) => None,
1058            Err(e) => Some(e.to_string()),
1059        }
1060    }
1061
1062    /// Score N candidate proposals, execute the best valid one, fall back to
1063    /// next-best on failure. Combines car-planner scoring with engine execution.
1064    ///
1065    /// Returns the result from whichever proposal was executed (best or fallback).
1066    /// If all candidates fail verification, returns an error result for the first.
1067    pub async fn plan_and_execute(
1068        &self,
1069        candidates: &[ActionProposal],
1070        planner_config: Option<car_planner::PlannerConfig>,
1071        feedback: Option<&car_planner::ToolFeedback>,
1072    ) -> ProposalResult {
1073        if candidates.is_empty() {
1074            return ProposalResult {
1075                proposal_id: "empty".to_string(),
1076                results: vec![],
1077                cost: car_ir::CostSummary::default(),
1078            };
1079        }
1080
1081        // Score all candidates
1082        let planner = car_planner::Planner::new(planner_config.unwrap_or_default());
1083        let tools_guard = self.tools.read().await;
1084        let tool_names: std::collections::HashSet<String> = tools_guard.keys().cloned().collect();
1085        drop(tools_guard);
1086
1087        let pre_plan_snapshot = self.state.snapshot();
1088        let pre_plan_transitions = self.state.transition_count();
1089        let ranked = planner.rank_with_feedback(
1090            candidates,
1091            Some(&pre_plan_snapshot),
1092            Some(&tool_names),
1093            feedback,
1094        );
1095
1096        // Try each valid candidate in score order
1097        let mut first_failure: Option<ProposalResult> = None;
1098        for scored in &ranked {
1099            if !scored.valid {
1100                continue;
1101            }
1102
1103            // Restore clean state before each candidate (don't rely on execute's
1104            // internal rollback — it only fires on Abort, not Skip/Retry failures)
1105            self.state
1106                .restore(pre_plan_snapshot.clone(), pre_plan_transitions);
1107
1108            let proposal = &candidates[scored.index];
1109            let result = self.execute(proposal).await;
1110
1111            if result.all_succeeded() {
1112                return result;
1113            }
1114
1115            tracing::info!(
1116                proposal_id = %proposal.id,
1117                score = scored.score,
1118                "plan_and_execute: proposal failed, trying next candidate"
1119            );
1120
1121            if first_failure.is_none() {
1122                first_failure = Some(result);
1123            }
1124        }
1125
1126        // Return the first failure result (don't re-execute — avoids duplicate side effects)
1127        first_failure.unwrap_or_else(|| ProposalResult {
1128            proposal_id: candidates[0].id.clone(),
1129            results: vec![],
1130            cost: car_ir::CostSummary::default(),
1131        })
1132    }
1133
1134    /// Execute a single proposal through the runtime loop (no replanning).
1135    /// Returns (result, state_before_map) where state_before_map has per-action snapshots.
1136    ///
1137    /// `session_id`, when `Some`, scopes per-action policy validation
1138    /// to the named session in addition to global policies. Both
1139    /// layers must pass for an action to run; the session layer cannot
1140    /// loosen what global denies.
1141    async fn execute_inner_with_cancel(
1142        &self,
1143        proposal: &ActionProposal,
1144        cancel: Option<&tokio_util::sync::CancellationToken>,
1145        session_id: Option<&str>,
1146        scope: Option<&crate::scope::RuntimeScope>,
1147    ) -> (ProposalResult, HashMap<String, HashMap<String, Value>>) {
1148        // Generate trace_id for this proposal execution
1149        let trace_id = Uuid::new_v4().to_string();
1150
1151        // Begin root span for proposal execution
1152        let root_span_id = {
1153            let mut log = self.log.lock().await;
1154            log.begin_span(
1155                "proposal.execute",
1156                &trace_id,
1157                None,
1158                [("proposal_id".to_string(), Value::from(proposal.id.as_str()))].into(),
1159            )
1160        };
1161
1162        // Log proposal received
1163        {
1164            let mut log = self.log.lock().await;
1165            log.append(
1166                EventKind::ProposalReceived,
1167                None,
1168                Some(&proposal.id),
1169                [
1170                    ("source".to_string(), Value::from(proposal.source.as_str())),
1171                    (
1172                        "action_count".to_string(),
1173                        Value::from(proposal.actions.len()),
1174                    ),
1175                ]
1176                .into(),
1177            );
1178        }
1179
1180        // Capability check: max_actions budget for entire proposal
1181        {
1182            let caps = self.capabilities.read().await;
1183            if let Some(ref cap) = *caps {
1184                if !cap.actions_within_budget(proposal.actions.len() as u32) {
1185                    let mut action_results = Vec::new();
1186                    for action in &proposal.actions {
1187                        action_results.push(rejected_result(
1188                            &action.id,
1189                            format!(
1190                                "capability denied: proposal has {} actions, max allowed is {:?}",
1191                                proposal.actions.len(),
1192                                cap.max_actions
1193                            ),
1194                        ));
1195                    }
1196                    return (
1197                        ProposalResult {
1198                            proposal_id: proposal.id.clone(),
1199                            results: action_results,
1200                            cost: CostSummary::default(),
1201                        },
1202                        HashMap::new(),
1203                    );
1204                }
1205            }
1206        }
1207
1208        // Snapshot for rollback
1209        let snapshot = self.state.snapshot();
1210        let transition_count = self.state.transition_count();
1211
1212        let mut results: Vec<ActionResult> = Vec::new();
1213        // Per-action state snapshots captured before execution (for TraceEvent.state_before).
1214        let mut state_before_map: HashMap<String, HashMap<String, Value>> = HashMap::new();
1215        let mut aborted = false;
1216        let mut budget_exceeded = false;
1217        let mut total_retries: u32 = 0;
1218
1219        // Running cost counters for budget enforcement
1220        let mut running_tool_calls: u32 = 0;
1221        let mut running_actions: u32 = 0;
1222        let mut running_duration_ms: f64 = 0.0;
1223
1224        // Snapshot the budget once
1225        let budget = self.cost_budget.read().await.clone();
1226
1227        // Build DAG
1228        let levels = build_dag(&proposal.actions);
1229
1230        let mut canceled = false;
1231        for level in &levels {
1232            // Cooperative cancellation check at the level boundary.
1233            // Actions already in flight aren't interrupted (we can't
1234            // safely cancel a tool call dispatched to a user-provided
1235            // executor), but every action that hadn't started runs
1236            // is recorded as canceled with a clear reason.
1237            if !canceled {
1238                if let Some(token) = cancel {
1239                    if token.is_cancelled() {
1240                        canceled = true;
1241                    }
1242                }
1243            }
1244            if canceled {
1245                for &idx in level {
1246                    results.push(canceled_result(
1247                        &proposal.actions[idx].id,
1248                        "cancellation requested by caller",
1249                    ));
1250                }
1251                continue;
1252            }
1253            if aborted || budget_exceeded {
1254                let skip_reason = if budget_exceeded {
1255                    "cost budget exceeded"
1256                } else {
1257                    "skipped due to earlier abort"
1258                };
1259                for &idx in level {
1260                    results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1261                }
1262                continue;
1263            }
1264
1265            // Check if any action in this level has ABORT behavior
1266            let has_abort = level
1267                .iter()
1268                .any(|&i| proposal.actions[i].failure_behavior == FailureBehavior::Abort);
1269
1270            if level.len() == 1 || has_abort {
1271                // Sequential execution
1272                for &idx in level {
1273                    if aborted || budget_exceeded {
1274                        let skip_reason = if budget_exceeded {
1275                            "cost budget exceeded"
1276                        } else {
1277                            "skipped due to abort"
1278                        };
1279                        results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1280                        continue;
1281                    }
1282
1283                    // Budget check before execution
1284                    if let Some(ref b) = budget {
1285                        if let Some(max) = b.max_actions {
1286                            if running_actions >= max {
1287                                budget_exceeded = true;
1288                                results.push(skipped_result(
1289                                    &proposal.actions[idx].id,
1290                                    "cost budget exceeded",
1291                                ));
1292                                continue;
1293                            }
1294                        }
1295                        if let Some(max) = b.max_tool_calls {
1296                            if proposal.actions[idx].action_type == ActionType::ToolCall
1297                                && running_tool_calls >= max
1298                            {
1299                                budget_exceeded = true;
1300                                results.push(skipped_result(
1301                                    &proposal.actions[idx].id,
1302                                    "cost budget exceeded",
1303                                ));
1304                                continue;
1305                            }
1306                        }
1307                        if let Some(max) = b.max_duration_ms {
1308                            if running_duration_ms >= max {
1309                                budget_exceeded = true;
1310                                results.push(skipped_result(
1311                                    &proposal.actions[idx].id,
1312                                    "cost budget exceeded",
1313                                ));
1314                                continue;
1315                            }
1316                        }
1317                    }
1318
1319                    state_before_map.insert(
1320                        proposal.actions[idx].id.clone(),
1321                        snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1322                    );
1323                    let (ar, action_retries) = self
1324                        .process_action(
1325                            &proposal.actions[idx],
1326                            &proposal.id,
1327                            &trace_id,
1328                            &root_span_id,
1329                            session_id,
1330                            scope,
1331                        )
1332                        .await;
1333                    total_retries += action_retries;
1334
1335                    // Update running counters
1336                    if ar.status == ActionStatus::Succeeded
1337                        && proposal.actions[idx].action_type == ActionType::ToolCall
1338                    {
1339                        running_tool_calls += 1;
1340                    }
1341                    if ar.status != ActionStatus::Skipped {
1342                        running_actions += 1;
1343                    }
1344                    if let Some(d) = ar.duration_ms {
1345                        running_duration_ms += d;
1346                    }
1347
1348                    if ar.status == ActionStatus::Failed
1349                        && proposal.actions[idx].failure_behavior == FailureBehavior::Abort
1350                    {
1351                        aborted = true;
1352                    }
1353                    results.push(ar);
1354                }
1355            } else {
1356                // Concurrent execution via futures::join_all
1357                // Snapshot only relevant keys per action (all see same pre-level state)
1358                for &idx in level {
1359                    state_before_map.insert(
1360                        proposal.actions[idx].id.clone(),
1361                        snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1362                    );
1363                }
1364                let futs: Vec<_> = level
1365                    .iter()
1366                    .map(|&idx| {
1367                        self.process_action(
1368                            &proposal.actions[idx],
1369                            &proposal.id,
1370                            &trace_id,
1371                            &root_span_id,
1372                            session_id,
1373                            scope,
1374                        )
1375                    })
1376                    .collect();
1377                let level_results = futures::future::join_all(futs).await;
1378
1379                for (i, (ar, action_retries)) in level_results.into_iter().enumerate() {
1380                    let idx = level[i];
1381                    total_retries += action_retries;
1382                    if ar.status == ActionStatus::Succeeded
1383                        && proposal.actions[idx].action_type == ActionType::ToolCall
1384                    {
1385                        running_tool_calls += 1;
1386                    }
1387                    if ar.status != ActionStatus::Skipped {
1388                        running_actions += 1;
1389                    }
1390                    if let Some(d) = ar.duration_ms {
1391                        running_duration_ms += d;
1392                    }
1393                    results.push(ar);
1394                }
1395            }
1396        }
1397
1398        // Handle rollback
1399        if aborted {
1400            self.state.restore(snapshot.clone(), transition_count);
1401
1402            let mut log = self.log.lock().await;
1403            log.append(
1404                EventKind::StateSnapshot,
1405                None,
1406                Some(&proposal.id),
1407                [(
1408                    "state".to_string(),
1409                    serde_json::to_value(&snapshot).unwrap_or_default(),
1410                )]
1411                .into(),
1412            );
1413            log.append(
1414                EventKind::StateRollback,
1415                None,
1416                Some(&proposal.id),
1417                [(
1418                    "rolled_back_to".to_string(),
1419                    Value::from("pre-proposal snapshot"),
1420                )]
1421                .into(),
1422            );
1423
1424            // Clear idempotency cache for rolled-back actions
1425            let mut cache = self.idempotency_cache.lock().await;
1426            for r in &results {
1427                if r.status == ActionStatus::Succeeded {
1428                    for action in &proposal.actions {
1429                        if action.id == r.action_id && action.idempotent {
1430                            cache.remove(&idempotency_key(action));
1431                        }
1432                    }
1433                }
1434            }
1435        }
1436
1437        // Sort results to match original action order
1438        let action_order: HashMap<String, usize> = proposal
1439            .actions
1440            .iter()
1441            .enumerate()
1442            .map(|(i, a)| (a.id.clone(), i))
1443            .collect();
1444        results.sort_by_key(|r| {
1445            action_order
1446                .get(&r.action_id)
1447                .copied()
1448                .unwrap_or(usize::MAX)
1449        });
1450
1451        // Compute cost summary from results
1452        let mut cost = CostSummary::default();
1453        for r in &results {
1454            let action = action_order
1455                .get(&r.action_id)
1456                .and_then(|&i| proposal.actions.get(i));
1457            match r.status {
1458                ActionStatus::Succeeded => {
1459                    cost.actions_executed += 1;
1460                    if let Some(a) = action {
1461                        if a.action_type == ActionType::ToolCall {
1462                            cost.tool_calls += 1;
1463                        }
1464                    }
1465                }
1466                ActionStatus::Failed | ActionStatus::Rejected => {
1467                    cost.actions_executed += 1;
1468                }
1469                ActionStatus::Skipped => {
1470                    cost.actions_skipped += 1;
1471                }
1472                _ => {}
1473            }
1474            if let Some(d) = r.duration_ms {
1475                cost.total_duration_ms += d;
1476            }
1477        }
1478
1479        // Set retries from inline counter
1480        cost.retries = total_retries;
1481
1482        // End root span — Ok if no abort, Error if aborted
1483        {
1484            let span_status = if aborted {
1485                SpanStatus::Error
1486            } else {
1487                SpanStatus::Ok
1488            };
1489            let mut log = self.log.lock().await;
1490            log.end_span(&root_span_id, span_status);
1491        }
1492
1493        let proposal_result = ProposalResult {
1494            proposal_id: proposal.id.clone(),
1495            results,
1496            cost,
1497        };
1498
1499        // Post-execution: auto-distill skills from this execution trace
1500        if self.auto_distill {
1501            if let Some(ref memgine) = self.memgine {
1502                // Convert results to TraceEvents for distillation
1503                let trace_events: Vec<car_memgine::TraceEvent> = proposal_result
1504                    .results
1505                    .iter()
1506                    .map(|r| {
1507                        let kind = match r.status {
1508                            ActionStatus::Succeeded => "action_succeeded",
1509                            ActionStatus::Failed => "action_failed",
1510                            ActionStatus::Rejected => "action_rejected",
1511                            ActionStatus::Skipped => "action_skipped",
1512                            _ => "unknown",
1513                        };
1514                        // Find the matching action to get the tool name
1515                        let tool = proposal
1516                            .actions
1517                            .iter()
1518                            .find(|a| a.id == r.action_id)
1519                            .and_then(|a| a.tool.clone());
1520                        let mut data = serde_json::Map::new();
1521                        if let Some(ref e) = r.error {
1522                            data.insert("error".into(), Value::from(e.as_str()));
1523                        }
1524                        if let Some(ref o) = r.output {
1525                            data.insert("output".into(), o.clone());
1526                        }
1527                        car_memgine::TraceEvent {
1528                            kind: kind.to_string(),
1529                            action_id: Some(r.action_id.clone()),
1530                            tool,
1531                            data: Value::Object(data),
1532                            duration_ms: r.duration_ms,
1533                            reward: match r.status {
1534                                ActionStatus::Succeeded => Some(1.0),
1535                                ActionStatus::Failed | ActionStatus::Rejected => Some(0.0),
1536                                _ => None,
1537                            },
1538                            ..Default::default()
1539                        }
1540                    })
1541                    .collect();
1542
1543                let mut engine = memgine.lock().await;
1544                let skills = engine.distill_skills(&trace_events).await;
1545                if !skills.is_empty() {
1546                    let count = skills.len();
1547                    // Tenant-aware ingest (Parslee-ai/car#187 phase 3-D).
1548                    // When the proposal carried a tenant, distilled
1549                    // skills get stamped so the same tenant's next
1550                    // build_context can find them. Unscoped proposals
1551                    // continue to land in the global skill pool.
1552                    let tenant = scope.and_then(|s| s.tenant_id.as_deref());
1553                    engine.scoped(tenant).ingest_distilled_skills(&skills);
1554
1555                    // Log the distillation event
1556                    let mut log = self.log.lock().await;
1557                    log.append(
1558                        EventKind::SkillDistilled,
1559                        None,
1560                        Some(&proposal_result.proposal_id),
1561                        [
1562                            ("skills_count".to_string(), Value::from(count)),
1563                            (
1564                                "skill_names".to_string(),
1565                                Value::from(
1566                                    skills.iter().map(|s| s.name.as_str()).collect::<Vec<_>>(),
1567                                ),
1568                            ),
1569                        ]
1570                        .into(),
1571                    );
1572
1573                    // Check if any domains need evolution
1574                    let threshold = engine.evolution_threshold();
1575                    let domains = engine.domains_needing_evolution(threshold);
1576                    for domain in &domains {
1577                        // Collect failed events for this domain
1578                        let failed: Vec<car_memgine::TraceEvent> = trace_events
1579                            .iter()
1580                            .filter(|e| {
1581                                matches!(e.kind.as_str(), "action_failed" | "action_rejected")
1582                            })
1583                            .cloned()
1584                            .collect();
1585                        if !failed.is_empty() {
1586                            let evolved = engine.evolve_skills(&failed, domain).await;
1587                            if !evolved.is_empty() {
1588                                log.append(
1589                                    EventKind::EvolutionTriggered,
1590                                    None,
1591                                    Some(&proposal_result.proposal_id),
1592                                    [
1593                                        ("domain".to_string(), Value::from(domain.as_str())),
1594                                        ("new_skills".to_string(), Value::from(evolved.len())),
1595                                    ]
1596                                    .into(),
1597                                );
1598                            }
1599                        }
1600                    }
1601                }
1602            }
1603        }
1604
1605        (proposal_result, state_before_map)
1606    }
1607
1608    /// Process a single action: idempotency → validate → policy → execute.
1609    /// Returns (ActionResult, retries_count).
1610    async fn process_action(
1611        &self,
1612        action: &Action,
1613        proposal_id: &str,
1614        trace_id: &str,
1615        parent_span_id: &str,
1616        session_id: Option<&str>,
1617        scope: Option<&crate::scope::RuntimeScope>,
1618    ) -> (ActionResult, u32) {
1619        // Derive action type name for span naming
1620        let action_type_name = serde_json::to_string(&action.action_type)
1621            .unwrap_or_default()
1622            .trim_matches('"')
1623            .to_string();
1624        let span_name = format!("action.{}", action_type_name);
1625
1626        // Begin child span for this action
1627        let action_span_id = {
1628            let mut attrs: HashMap<String, Value> = HashMap::new();
1629            attrs.insert("action_id".to_string(), Value::from(action.id.as_str()));
1630            if let Some(ref tool) = action.tool {
1631                attrs.insert("tool".to_string(), Value::from(tool.as_str()));
1632            }
1633            let mut log = self.log.lock().await;
1634            log.begin_span(&span_name, trace_id, Some(parent_span_id), attrs)
1635        };
1636
1637        // Execute the action pipeline and capture result
1638        let (result, retries) = self
1639            .process_action_inner(action, proposal_id, session_id, scope)
1640            .await;
1641
1642        // End action span based on result status
1643        let span_status = match result.status {
1644            ActionStatus::Succeeded => SpanStatus::Ok,
1645            ActionStatus::Failed | ActionStatus::Rejected => SpanStatus::Error,
1646            _ => SpanStatus::Unset,
1647        };
1648        {
1649            let mut log = self.log.lock().await;
1650            log.end_span(&action_span_id, span_status);
1651        }
1652
1653        (result, retries)
1654    }
1655
1656    /// Inner action processing: idempotency -> validate -> policy -> execute.
1657    /// Returns (ActionResult, retries_count).
1658    #[instrument(
1659        name = "action.process",
1660        skip_all,
1661        fields(
1662            action_id = %action.id,
1663            action_type = ?action.action_type,
1664            tool = action.tool.as_deref().unwrap_or("none"),
1665        )
1666    )]
1667    async fn process_action_inner(
1668        &self,
1669        action: &Action,
1670        proposal_id: &str,
1671        session_id: Option<&str>,
1672        scope: Option<&crate::scope::RuntimeScope>,
1673    ) -> (ActionResult, u32) {
1674        // Idempotency check
1675        if action.idempotent {
1676            let key = idempotency_key(action);
1677            let cache = self.idempotency_cache.lock().await;
1678            if let Some(cached) = cache.get(&key) {
1679                let mut log = self.log.lock().await;
1680                log.append(
1681                    EventKind::ActionDeduplicated,
1682                    Some(&action.id),
1683                    Some(proposal_id),
1684                    [(
1685                        "cached_action_id".to_string(),
1686                        Value::from(cached.action_id.as_str()),
1687                    )]
1688                    .into(),
1689                );
1690                return (
1691                    ActionResult {
1692                        action_id: action.id.clone(),
1693                        status: cached.status.clone(),
1694                        output: cached.output.clone(),
1695                        error: cached.error.clone(),
1696                        state_changes: cached.state_changes.clone(),
1697                        duration_ms: Some(0.0),
1698                        timestamp: chrono::Utc::now(),
1699                    },
1700                    0,
1701                );
1702            }
1703        }
1704
1705        // Capability check
1706        {
1707            let caps = self.capabilities.read().await;
1708            if let Some(ref cap) = *caps {
1709                // Check tool capability for ToolCall actions
1710                if action.action_type == ActionType::ToolCall {
1711                    if let Some(ref tool_name) = action.tool {
1712                        if !cap.tool_allowed(tool_name) {
1713                            let mut log = self.log.lock().await;
1714                            log.append(
1715                                EventKind::ActionRejected,
1716                                Some(&action.id),
1717                                Some(proposal_id),
1718                                HashMap::new(),
1719                            );
1720                            return (
1721                                rejected_result(
1722                                    &action.id,
1723                                    format!("capability denied: tool '{}' not allowed", tool_name),
1724                                ),
1725                                0,
1726                            );
1727                        }
1728                    }
1729                }
1730
1731                // Check state key capability for StateWrite/StateRead actions
1732                if action.action_type == ActionType::StateWrite
1733                    || action.action_type == ActionType::StateRead
1734                {
1735                    if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
1736                        if !cap.state_key_allowed(key) {
1737                            let mut log = self.log.lock().await;
1738                            log.append(
1739                                EventKind::ActionRejected,
1740                                Some(&action.id),
1741                                Some(proposal_id),
1742                                HashMap::new(),
1743                            );
1744                            return (
1745                                rejected_result(
1746                                    &action.id,
1747                                    format!("capability denied: state key '{}' not allowed", key),
1748                                ),
1749                                0,
1750                            );
1751                        }
1752                    }
1753                }
1754            }
1755        }
1756
1757        // Validate
1758        let tools = self.tools.read().await;
1759        let validation = validate_action(action, &self.state, &tools);
1760        drop(tools);
1761
1762        if !validation.valid() {
1763            let error = validation
1764                .errors
1765                .iter()
1766                .map(|e| e.reason.as_str())
1767                .collect::<Vec<_>>()
1768                .join("; ");
1769            let mut log = self.log.lock().await;
1770            log.append(
1771                EventKind::ActionRejected,
1772                Some(&action.id),
1773                Some(proposal_id),
1774                HashMap::new(),
1775            );
1776            return (rejected_result(&action.id, error), 0);
1777        }
1778
1779        // Policy check — global registry plus, when the proposal is
1780        // executed under a session, that session's registry. Both
1781        // layers must pass for the action to proceed; session
1782        // policies are additive deny rules — they can deny what
1783        // global allows but cannot allow what global denies.
1784        {
1785            let mut violations = {
1786                let policies = self.policies.read().await;
1787                policies.check(action, &self.state)
1788            };
1789            if let Some(sid) = session_id {
1790                // Snapshot the per-session engine handle out from under
1791                // the outer registry lock so the inner check holds
1792                // only the engine's own RwLock — preserves the
1793                // documented lock-ordering discipline.
1794                let session_engine = {
1795                    let sessions = self.session_policies.read().await;
1796                    sessions.get(sid).cloned()
1797                };
1798                if let Some(engine) = session_engine {
1799                    let engine = engine.read().await;
1800                    violations.extend(engine.check(action, &self.state));
1801                } else {
1802                    // Unknown session id — refuse the action rather
1803                    // than silently fall back to global-only. A
1804                    // proposal submitted under a closed session
1805                    // shouldn't run with looser rules than the caller
1806                    // intended.
1807                    let mut log = self.log.lock().await;
1808                    log.append(
1809                        EventKind::PolicyViolation,
1810                        Some(&action.id),
1811                        Some(proposal_id),
1812                        HashMap::new(),
1813                    );
1814                    return (
1815                        rejected_result(
1816                            &action.id,
1817                            format!(
1818                                "unknown session id '{sid}' — open one via Runtime::open_session before executing under a session"
1819                            ),
1820                        ),
1821                        0,
1822                    );
1823                }
1824            }
1825            if !violations.is_empty() {
1826                let error = violations
1827                    .iter()
1828                    .map(|v| format!("policy '{}': {}", v.policy_name, v.reason))
1829                    .collect::<Vec<_>>()
1830                    .join("; ");
1831                let mut log = self.log.lock().await;
1832                log.append(
1833                    EventKind::PolicyViolation,
1834                    Some(&action.id),
1835                    Some(proposal_id),
1836                    HashMap::new(),
1837                );
1838                return (rejected_result(&action.id, error), 0);
1839            }
1840        }
1841
1842        // Validated
1843        {
1844            let mut log = self.log.lock().await;
1845            log.append(
1846                EventKind::ActionValidated,
1847                Some(&action.id),
1848                Some(proposal_id),
1849                HashMap::new(),
1850            );
1851        }
1852
1853        // Execute with retry
1854        let (result, retries) = self.execute_with_retry(action, proposal_id, scope).await;
1855
1856        // Cache idempotent results
1857        if action.idempotent && result.status == ActionStatus::Succeeded {
1858            let mut cache = self.idempotency_cache.lock().await;
1859            cache.insert(idempotency_key(action), result.clone());
1860        }
1861
1862        tracing::info!(
1863            status = ?result.status,
1864            duration_ms = result.duration_ms,
1865            "action completed"
1866        );
1867
1868        (result, retries)
1869    }
1870
1871    /// Execute with retry logic and timeout.
1872    /// Returns (ActionResult, retries_count).
1873    async fn execute_with_retry(
1874        &self,
1875        action: &Action,
1876        proposal_id: &str,
1877        scope: Option<&crate::scope::RuntimeScope>,
1878    ) -> (ActionResult, u32) {
1879        let max_attempts = if action.failure_behavior == FailureBehavior::Retry {
1880            action.max_retries + 1
1881        } else {
1882            1
1883        };
1884
1885        let mut last_error: Option<String> = None;
1886        let mut retries: u32 = 0;
1887
1888        for attempt in 0..max_attempts {
1889            if attempt > 0 {
1890                retries += 1;
1891                let delay = RETRY_BASE_DELAY_MS * RETRY_BACKOFF_FACTOR.pow(attempt as u32 - 1);
1892                tokio::time::sleep(Duration::from_millis(delay)).await;
1893                let mut log = self.log.lock().await;
1894                log.append(
1895                    EventKind::ActionRetrying,
1896                    Some(&action.id),
1897                    Some(proposal_id),
1898                    [("attempt".to_string(), Value::from(attempt + 1))].into(),
1899                );
1900            }
1901
1902            {
1903                let mut log = self.log.lock().await;
1904                log.append(
1905                    EventKind::ActionExecuting,
1906                    Some(&action.id),
1907                    Some(proposal_id),
1908                    HashMap::new(),
1909                );
1910            }
1911
1912            let start = std::time::Instant::now();
1913            let transitions_before = self.state.transition_count();
1914
1915            // Execute with optional timeout
1916            let exec_result = if let Some(timeout_ms) = action.timeout_ms {
1917                match timeout(Duration::from_millis(timeout_ms), self.dispatch(action, scope)).await {
1918                    Ok(r) => r,
1919                    Err(_) => Err(format!("action timed out after {}ms", timeout_ms)),
1920                }
1921            } else {
1922                self.dispatch(action, scope).await
1923            };
1924
1925            let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
1926
1927            match exec_result {
1928                Ok(output) => {
1929                    // Commit post-effects. Tenant-scoped when the
1930                    // proposal carries a scope (Parslee-ai/car#187
1931                    // phase 3) — distinct tenants write to disjoint
1932                    // namespaces so concurrent multi-tenant proposals
1933                    // don't trample each other's keys.
1934                    let tenant_for_effects = scope.and_then(|s| s.tenant_id.as_deref());
1935                    let scoped_state = self.state.scoped(tenant_for_effects);
1936                    let mut state_changes: HashMap<String, Value> = HashMap::new();
1937                    for (key, value) in &action.expected_effects {
1938                        scoped_state.set(key, value.clone(), &action.id);
1939                        state_changes.insert(key.clone(), value.clone());
1940                    }
1941
1942                    // Capture state changes from dispatch
1943                    for t in self.state.transitions_since(transitions_before) {
1944                        if !state_changes.contains_key(&t.key) {
1945                            if let Some(v) = t.new_value {
1946                                state_changes.insert(t.key.clone(), v);
1947                            }
1948                        }
1949                    }
1950
1951                    let mut log = self.log.lock().await;
1952                    log.append(
1953                        EventKind::ActionSucceeded,
1954                        Some(&action.id),
1955                        Some(proposal_id),
1956                        [("duration_ms".to_string(), Value::from(duration_ms))].into(),
1957                    );
1958
1959                    if !state_changes.is_empty() {
1960                        log.append(
1961                            EventKind::StateChanged,
1962                            Some(&action.id),
1963                            Some(proposal_id),
1964                            [(
1965                                "changes".to_string(),
1966                                serde_json::to_value(&state_changes).unwrap_or_default(),
1967                            )]
1968                            .into(),
1969                        );
1970                    }
1971
1972                    return (
1973                        ActionResult {
1974                            action_id: action.id.clone(),
1975                            status: ActionStatus::Succeeded,
1976                            output: Some(output),
1977                            error: None,
1978                            state_changes,
1979                            duration_ms: Some(duration_ms),
1980                            timestamp: chrono::Utc::now(),
1981                        },
1982                        retries,
1983                    );
1984                }
1985                Err(e) => {
1986                    last_error = Some(e.clone());
1987                    let mut log = self.log.lock().await;
1988                    log.append(
1989                        EventKind::ActionFailed,
1990                        Some(&action.id),
1991                        Some(proposal_id),
1992                        [
1993                            ("error".to_string(), Value::from(e.as_str())),
1994                            ("attempt".to_string(), Value::from(attempt + 1)),
1995                        ]
1996                        .into(),
1997                    );
1998                }
1999            }
2000        }
2001
2002        // All attempts exhausted
2003        if action.failure_behavior == FailureBehavior::Skip {
2004            return (
2005                skipped_result(
2006                    &action.id,
2007                    last_error.as_deref().unwrap_or("all attempts exhausted"),
2008                ),
2009                retries,
2010            );
2011        }
2012
2013        (
2014            ActionResult {
2015                action_id: action.id.clone(),
2016                status: ActionStatus::Failed,
2017                output: None,
2018                error: last_error,
2019                state_changes: HashMap::new(),
2020                duration_ms: None,
2021                timestamp: chrono::Utc::now(),
2022            },
2023            retries,
2024        )
2025    }
2026
2027    /// Dispatch an action to the appropriate handler.
2028    ///
2029    /// `scope` is the per-execution caller / tenant surface
2030    /// (Parslee-ai/car#187 phase 3). When the scope carries a
2031    /// tenant id, state R/W operations (`StateWrite`, `StateRead`,
2032    /// `Assertion`) route through `StateStore::scoped(tenant_id)`
2033    /// so distinct tenants can't see each other's keys. Unscoped
2034    /// proposals get the legacy flat-namespace behaviour
2035    /// automatically.
2036    async fn dispatch(
2037        &self,
2038        action: &Action,
2039        scope: Option<&crate::scope::RuntimeScope>,
2040    ) -> Result<Value, String> {
2041        match action.action_type {
2042            ActionType::ToolCall => {
2043                let tool_name = action.tool.as_deref().ok_or("tool_call has no tool")?;
2044                let params = Value::Object(
2045                    action
2046                        .parameters
2047                        .iter()
2048                        .map(|(k, v)| (k.clone(), v.clone()))
2049                        .collect(),
2050                );
2051
2052                // Check cross-proposal result cache.
2053                if let Some(cached) = self.result_cache.get(tool_name, &params).await {
2054                    return Ok(cached);
2055                }
2056
2057                // Apply rate limit backpressure before executing.
2058                self.rate_limiter.acquire(tool_name).await;
2059
2060                // Try built-in inference tools first when the inference engine is available.
2061                if matches!(
2062                    tool_name,
2063                    "infer" | "infer.grounded" | "embed" | "classify" | "transcribe" | "synthesize"
2064                ) {
2065                    if let Some(ref engine) = self.inference_engine {
2066                        // For "infer.grounded" or "infer" with memgine available,
2067                        // build context from memory and attach it to the request.
2068                        let params = {
2069                            let should_ground =
2070                                tool_name == "infer.grounded" || tool_name == "infer";
2071                            if should_ground {
2072                                if let Some(ref memgine) = self.memgine {
2073                                    if let Some(prompt) =
2074                                        params.get("prompt").and_then(|v| v.as_str())
2075                                    {
2076                                        let ctx = {
2077                                            let mut m = memgine.lock().await;
2078                                            m.build_context(prompt)
2079                                        };
2080                                        if !ctx.is_empty() {
2081                                            let mut p = params.clone();
2082                                            if let Some(obj) = p.as_object_mut() {
2083                                                obj.insert("context".to_string(), Value::from(ctx));
2084                                            }
2085                                            p
2086                                        } else {
2087                                            params
2088                                        }
2089                                    } else {
2090                                        params
2091                                    }
2092                                } else {
2093                                    params
2094                                }
2095                            } else {
2096                                params
2097                            }
2098                        };
2099
2100                        // Route "infer.grounded" to "infer" for the service layer
2101                        let effective_tool = if tool_name == "infer.grounded" {
2102                            "infer"
2103                        } else {
2104                            tool_name
2105                        };
2106                        let result =
2107                            car_inference::service::execute_tool(engine, effective_tool, &params)
2108                                .await
2109                                .map_err(|e| e.to_string());
2110
2111                        if let Ok(ref value) = result {
2112                            self.result_cache
2113                                .put(tool_name, &params, value.clone())
2114                                .await;
2115                        }
2116
2117                        return result;
2118                    }
2119                }
2120
2121                // Built-in memory consolidation tool.
2122                if tool_name == "memory.consolidate" {
2123                    if let Some(ref memgine) = self.memgine {
2124                        let report = {
2125                            let mut m = memgine.lock().await;
2126                            m.consolidate().await
2127                        };
2128                        // Log the consolidation event
2129                        {
2130                            let mut log = self.log.lock().await;
2131                            log.append(
2132                                EventKind::Consolidated,
2133                                None,
2134                                None,
2135                                [
2136                                    (
2137                                        "expired_pruned".to_string(),
2138                                        Value::from(report.expired_pruned),
2139                                    ),
2140                                    (
2141                                        "superseded_gc".to_string(),
2142                                        Value::from(report.superseded_gc),
2143                                    ),
2144                                    (
2145                                        "stale_embeddings_removed".to_string(),
2146                                        Value::from(report.stale_embeddings_removed),
2147                                    ),
2148                                    (
2149                                        "nodes_embedded".to_string(),
2150                                        Value::from(report.nodes_embedded),
2151                                    ),
2152                                    (
2153                                        "domains_evolved".to_string(),
2154                                        Value::from(report.domains_evolved.clone()),
2155                                    ),
2156                                    ("total_nodes".to_string(), Value::from(report.total_nodes)),
2157                                    ("total_edges".to_string(), Value::from(report.total_edges)),
2158                                ]
2159                                .into(),
2160                            );
2161                        }
2162                        return Ok(serde_json::to_value(&report).unwrap_or(Value::Null));
2163                    } else {
2164                        return Err(
2165                            "memory.consolidate requires memgine (attach with with_learning)"
2166                                .into(),
2167                        );
2168                    }
2169                }
2170
2171                // Prefer a configured tool_executor for any tool it claims to handle.
2172                // Fall through to agent_basics only when the configured executor is absent
2173                // or explicitly returns "unknown tool" — this prevents agent_basics' built-in
2174                // read_file/write_file (which resolve paths via std::env::current_dir) from
2175                // silently overriding an executor that carries its own working_dir.
2176                let configured = {
2177                    let guard = self.tool_executor.lock().await;
2178                    guard.as_ref().cloned()
2179                };
2180
2181                if let Some(ref executor) = configured {
2182                    let result = executor.execute(tool_name, &params).await;
2183                    let fall_through = matches!(&result, Err(e) if e.starts_with("unknown tool"));
2184                    if !fall_through {
2185                        if let Ok(ref value) = result {
2186                            self.result_cache
2187                                .put(tool_name, &params, value.clone())
2188                                .await;
2189                        }
2190                        return result;
2191                    }
2192                }
2193
2194                if let Some(result) = crate::agent_basics::execute(tool_name, &params).await {
2195                    if let Ok(ref value) = result {
2196                        self.result_cache
2197                            .put(tool_name, &params, value.clone())
2198                            .await;
2199                    }
2200                    return result;
2201                }
2202
2203                Err(format!("no handler for tool '{}'", tool_name))
2204            }
2205            ActionType::StateWrite => {
2206                let key = action
2207                    .parameters
2208                    .get("key")
2209                    .and_then(|v| v.as_str())
2210                    .ok_or("state_write requires 'key' parameter")?;
2211                let value = action
2212                    .parameters
2213                    .get("value")
2214                    .cloned()
2215                    .unwrap_or(Value::Null);
2216                let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2217                self.state.scoped(tenant).set(key, value, &action.id);
2218                Ok(Value::from(format!("written: {}", key)))
2219            }
2220            ActionType::StateRead => {
2221                let key = action
2222                    .parameters
2223                    .get("key")
2224                    .and_then(|v| v.as_str())
2225                    .ok_or("state_read requires 'key' parameter")?;
2226                let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2227                Ok(self.state.scoped(tenant).get(key).unwrap_or(Value::Null))
2228            }
2229            ActionType::Assertion => {
2230                let key = action
2231                    .parameters
2232                    .get("key")
2233                    .and_then(|v| v.as_str())
2234                    .ok_or("assertion requires 'key' parameter")?;
2235                let expected = action
2236                    .parameters
2237                    .get("expected")
2238                    .cloned()
2239                    .unwrap_or(Value::Null);
2240                let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2241                let actual = self.state.scoped(tenant).get(key).unwrap_or(Value::Null);
2242                if actual != expected {
2243                    Err(format!(
2244                        "assertion failed: state['{}'] = {:?}, expected {:?}",
2245                        key, actual, expected
2246                    ))
2247                } else {
2248                    Ok(serde_json::json!({"asserted": key, "value": actual}))
2249                }
2250            }
2251        }
2252    }
2253
2254    // --- Checkpoint and resume ---
2255
2256    /// Save a checkpoint of the current runtime state.
2257    pub async fn save_checkpoint(&self) -> Checkpoint {
2258        let state = self.state.snapshot();
2259        let tools: Vec<String> = self.tools.read().await.keys().cloned().collect();
2260        let log = self.log.lock().await;
2261        let events: Vec<Value> = log
2262            .events()
2263            .iter()
2264            .map(|e| serde_json::to_value(e).unwrap_or_default())
2265            .collect();
2266
2267        Checkpoint {
2268            checkpoint_id: Uuid::new_v4().to_string(),
2269            created_at: chrono::Utc::now(),
2270            state,
2271            events,
2272            tools,
2273            metadata: HashMap::new(),
2274        }
2275    }
2276
2277    /// Save checkpoint to a JSON file.
2278    pub async fn save_checkpoint_to_file(&self, path: &str) -> Result<(), String> {
2279        let checkpoint = self.save_checkpoint().await;
2280        let json = serde_json::to_string_pretty(&checkpoint)
2281            .map_err(|e| format!("serialize error: {}", e))?;
2282        tokio::fs::write(path, json)
2283            .await
2284            .map_err(|e| format!("write error: {}", e))?;
2285        Ok(())
2286    }
2287
2288    /// Load a checkpoint from a JSON file and restore state.
2289    pub async fn load_checkpoint_from_file(&self, path: &str) -> Result<Checkpoint, String> {
2290        let json = tokio::fs::read_to_string(path)
2291            .await
2292            .map_err(|e| format!("read error: {}", e))?;
2293        let checkpoint: Checkpoint =
2294            serde_json::from_str(&json).map_err(|e| format!("deserialize error: {}", e))?;
2295        self.restore_checkpoint(&checkpoint).await;
2296        Ok(checkpoint)
2297    }
2298
2299    /// Restore runtime state from a checkpoint.
2300    pub async fn restore_checkpoint(&self, checkpoint: &Checkpoint) {
2301        // Replace state completely — don't merge, don't create synthetic transitions
2302        self.state.replace_all(checkpoint.state.clone());
2303        // Clear idempotency cache — stale results from pre-checkpoint execution
2304        // must not bypass validation/policy on the restored state
2305        self.idempotency_cache.lock().await.clear();
2306        // Restore tools (as name-only schemas; full schemas are not persisted in checkpoint)
2307        let mut tools = self.tools.write().await;
2308        tools.clear();
2309        for tool_name in &checkpoint.tools {
2310            let schema = ToolSchema {
2311                name: tool_name.clone(),
2312                description: String::new(),
2313                parameters: serde_json::Value::Object(Default::default()),
2314                returns: None,
2315                idempotent: false,
2316                cache_ttl_secs: None,
2317                rate_limit: None,
2318            };
2319            tools.insert(tool_name.clone(), schema);
2320        }
2321    }
2322
2323    /// Register a subprocess tool and set up the subprocess executor.
2324    /// If no executor exists, creates a new SubprocessToolExecutor.
2325    /// If one already exists, creates a new SubprocessToolExecutor with the
2326    /// existing executor as fallback.
2327    pub async fn register_subprocess_tool(
2328        &self,
2329        name: &str,
2330        tool: crate::subprocess::SubprocessTool,
2331    ) {
2332        use crate::subprocess::SubprocessToolExecutor;
2333
2334        let schema = ToolSchema {
2335            name: name.to_string(),
2336            description: format!("Subprocess tool: {}", tool.command),
2337            parameters: serde_json::Value::Object(Default::default()),
2338            returns: None,
2339            idempotent: false,
2340            cache_ttl_secs: None,
2341            rate_limit: None,
2342        };
2343        self.register_tool_schema(schema).await;
2344
2345        let mut guard = self.tool_executor.lock().await;
2346        let mut executor = match guard.take() {
2347            Some(existing) => {
2348                let mut sub = SubprocessToolExecutor::new();
2349                sub = sub.with_fallback(existing);
2350                sub
2351            }
2352            None => SubprocessToolExecutor::new(),
2353        };
2354        executor.register(name, tool);
2355        *guard = Some(std::sync::Arc::new(executor));
2356    }
2357}
2358
2359impl Default for Runtime {
2360    fn default() -> Self {
2361        Self::new()
2362    }
2363}