Skip to main content

car_engine/
executor.rs

1//! Core execution engine — propose → validate → execute → commit.
2
3use crate::cache::ResultCache;
4use crate::capabilities::CapabilitySet;
5use crate::checkpoint::Checkpoint;
6use crate::rate_limit::{RateLimit, RateLimiter};
7use car_eventlog::{EventKind, EventLog, SpanStatus};
8use car_ir::{
9    build_dag, Action, ActionProposal, ActionResult, ActionStatus, ActionType, CostSummary,
10    FailureBehavior, ProposalResult, ToolSchema,
11};
12use car_policy::PolicyEngine;
13use car_state::StateStore;
14use car_validator::validate_action;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
20use tokio::time::timeout;
21use tracing::instrument;
22use uuid::Uuid;
23
24/// Retry backoff constants.
25const RETRY_BASE_DELAY_MS: u64 = 100;
26const RETRY_BACKOFF_FACTOR: u64 = 2;
27
28// ---------------------------------------------------------------------------
29// Replan types — failure recovery via model callback
30// ---------------------------------------------------------------------------
31
32/// Callback trait for replanning failed proposals.
33/// Implement this to let the runtime ask the model for an alternative plan
34/// when a proposal aborts.
35#[async_trait::async_trait]
36pub trait ReplanCallback: Send + Sync {
37    async fn replan(&self, ctx: &ReplanContext) -> Result<ActionProposal, String>;
38}
39
40/// Context provided to the replan callback so the model can generate an alternative.
41#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42pub struct ReplanContext {
43    /// Original proposal ID.
44    pub proposal_id: String,
45    /// Which replan attempt this is (1-indexed; attempt 1 = first replan after initial failure).
46    pub attempt: u32,
47    /// Actions that failed and caused the abort.
48    pub failed_actions: Vec<FailedActionSummary>,
49    /// Action IDs that succeeded before the abort (now rolled back).
50    pub completed_action_ids: Vec<String>,
51    /// State snapshot after rollback.
52    pub state_snapshot: HashMap<String, Value>,
53    /// How many replans remain.
54    pub replans_remaining: u32,
55    /// Original proposal source (model name, agent, etc.).
56    pub original_source: String,
57    /// Total actions in the original proposal.
58    pub original_action_count: usize,
59    /// The original goal/context from the proposal (for generating alternatives).
60    pub original_context: HashMap<String, Value>,
61}
62
63/// Summary of a failed action, included in ReplanContext.
64#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
65pub struct FailedActionSummary {
66    pub action_id: String,
67    pub tool: Option<String>,
68    pub error: String,
69    pub parameters: HashMap<String, Value>,
70}
71
72/// Configuration for the replan loop.
73#[derive(Debug, Clone)]
74pub struct ReplanConfig {
75    /// Maximum number of replan attempts. 0 = disabled (default).
76    pub max_replans: u32,
77    /// Delay in milliseconds between replan attempts. Prevents burning through
78    /// attempts instantly if the model returns garbage fast. 0 = no delay.
79    pub delay_ms: u64,
80    /// If true, replan proposals are scored via car-planner's verify() before
81    /// execution. Proposals with errors are rejected without executing.
82    /// Prevents the engine from running a worse plan than the one that failed.
83    pub verify_before_execute: bool,
84}
85
86impl Default for ReplanConfig {
87    fn default() -> Self {
88        Self {
89            max_replans: 0,
90            delay_ms: 0,
91            verify_before_execute: true,
92        }
93    }
94}
95
96/// Trait for tool execution. Implement this to provide tools to the runtime.
97///
98/// In-process: implement directly with function calls.
99/// Daemon mode: implement by sending JSON-RPC to the client.
100#[async_trait::async_trait]
101pub trait ToolExecutor: Send + Sync {
102    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String>;
103}
104
105/// Deterministic key for idempotency deduplication.
106fn idempotency_key(action: &Action) -> String {
107    let sorted: std::collections::BTreeMap<_, _> = action.parameters.iter().collect();
108    let params = serde_json::to_string(&sorted).unwrap_or_default();
109    format!(
110        "{}:{}:{}",
111        serde_json::to_string(&action.action_type).unwrap_or_default(),
112        action.tool.as_deref().unwrap_or(""),
113        params
114    )
115}
116
117fn rejected_result(action_id: &str, error: String) -> ActionResult {
118    ActionResult {
119        action_id: action_id.to_string(),
120        status: ActionStatus::Rejected,
121        output: None,
122        error: Some(error),
123        state_changes: HashMap::new(),
124        duration_ms: None,
125        timestamp: chrono::Utc::now(),
126    }
127}
128
129/// Capture only the state keys relevant to an action (state_dependencies + expected_effects).
130/// Returns an empty map if the action declares no relevant keys (e.g., tool calls
131/// that don't interact with state).
132fn snapshot_relevant_keys(
133    state: &car_state::StateStore,
134    action: &Action,
135) -> HashMap<String, Value> {
136    let mut keys: std::collections::HashSet<&str> = std::collections::HashSet::new();
137    for dep in &action.state_dependencies {
138        keys.insert(dep.as_str());
139    }
140    for key in action.expected_effects.keys() {
141        keys.insert(key.as_str());
142    }
143    // For state_write actions, capture the key being written
144    if action.action_type == ActionType::StateWrite {
145        if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
146            keys.insert(key);
147        }
148    }
149
150    if keys.is_empty() {
151        // No declared state interaction — skip snapshot (empty map)
152        return HashMap::new();
153    }
154
155    keys.iter()
156        .filter_map(|&k| state.get(k).map(|v| (k.to_string(), v)))
157        .collect()
158}
159
160fn skipped_result(action_id: &str, reason: &str) -> ActionResult {
161    ActionResult {
162        action_id: action_id.to_string(),
163        status: ActionStatus::Skipped,
164        output: None,
165        error: Some(reason.to_string()),
166        state_changes: HashMap::new(),
167        duration_ms: None,
168        timestamp: chrono::Utc::now(),
169    }
170}
171
172/// Prefix on the `error` field of an `ActionResult` that distinguishes
173/// "the user pulled the plug" from "earlier abort cascaded." The
174/// `ActionStatus` itself is `Skipped` in both cases (introducing a
175/// new variant ripples through every IR consumer + FFI binding); the
176/// prefix lets callers like the A2A bridge tell the cases apart
177/// without string-matching a magic literal.
178pub const CANCELED_PREFIX: &str = "canceled: ";
179
180/// Result for an action that didn't run because the proposal was
181/// cancelled mid-flight. Distinct from `Skipped` so callers can tell
182/// "didn't run because of an earlier abort" from "didn't run because
183/// the user pulled the plug."
184fn canceled_result(action_id: &str, reason: &str) -> ActionResult {
185    ActionResult {
186        action_id: action_id.to_string(),
187        status: ActionStatus::Skipped,
188        output: None,
189        error: Some(format!("{}{}", CANCELED_PREFIX, reason)),
190        state_changes: HashMap::new(),
191        duration_ms: None,
192        timestamp: chrono::Utc::now(),
193    }
194}
195
196/// Format a tool result for feeding back to a model.
197pub fn format_tool_result(result: &ActionResult) -> String {
198    match result.status {
199        ActionStatus::Succeeded => match &result.output {
200            Some(v) => serde_json::to_string(v).unwrap_or_else(|_| v.to_string()),
201            None => String::new(),
202        },
203        ActionStatus::Rejected => format!("[REJECTED] {}", result.error.as_deref().unwrap_or("")),
204        ActionStatus::Failed => format!("[FAILED] {}", result.error.as_deref().unwrap_or("")),
205        _ => format!(
206            "[{:?}] {}",
207            result.status,
208            result.error.as_deref().unwrap_or("")
209        ),
210    }
211}
212
213/// Budget constraints for proposal execution.
214#[derive(Debug, Clone)]
215pub struct CostBudget {
216    pub max_tool_calls: Option<u32>,
217    pub max_duration_ms: Option<f64>,
218    pub max_actions: Option<u32>,
219}
220
221/// Common Agent Runtime — deterministic execution layer.
222///
223/// Lock ordering discipline (never hold multiple simultaneously, never hold sync locks across .await):
224/// 1. capabilities (RwLock, read-only during execution)
225/// 2. tools (RwLock, read-only during execution)
226/// 3. policies (RwLock, read-only during execution)
227/// 4. session_policies (RwLock to find the per-session engine, then read inner Arc)
228/// 5. cost_budget (RwLock, read-only during execution)
229/// 6. log (TokioMutex, acquired/released per event)
230/// 7. tool_executor (TokioMutex, clone Arc and drop before await)
231/// 8. idempotency_cache (TokioMutex, acquired/released per check)
232///
233/// StateStore uses parking_lot::Mutex (sync) — NEVER hold across .await points.
234pub struct Runtime {
235    pub state: Arc<StateStore>,
236    pub tools: Arc<TokioRwLock<HashMap<String, ToolSchema>>>,
237    pub policies: Arc<TokioRwLock<PolicyEngine>>,
238    /// Per-session policy registries. Hosts that multiplex multiple
239    /// concurrent agent sessions over a single `Runtime` (IDE-style
240    /// frontends with per-project rules, multi-tenant servers) call
241    /// [`Runtime::open_session`] to mint an id, then
242    /// [`Runtime::register_policy_in_session`] to attach session-scoped
243    /// rules. Validation under that session walks the global registry
244    /// AND the session's — both must pass. Sessions can deny what
245    /// global allows; sessions cannot allow what global denies.
246    /// Closing a session drops its registry and any closures it holds.
247    /// See `docs/proposals/per-session-policy-scoping.md`.
248    pub session_policies: Arc<TokioRwLock<HashMap<String, Arc<TokioRwLock<PolicyEngine>>>>>,
249    pub log: Arc<TokioMutex<EventLog>>,
250    pub rate_limiter: Arc<RateLimiter>,
251    pub result_cache: Arc<ResultCache>,
252    tool_executor: TokioMutex<Option<Arc<dyn ToolExecutor>>>,
253    idempotency_cache: TokioMutex<HashMap<String, ActionResult>>,
254    cost_budget: TokioRwLock<Option<CostBudget>>,
255    capabilities: TokioRwLock<Option<CapabilitySet>>,
256    inference_engine: Option<Arc<car_inference::InferenceEngine>>,
257    /// Optional memgine for skill learning (auto-distillation after execution).
258    memgine: Option<Arc<TokioMutex<car_memgine::MemgineEngine>>>,
259    /// Whether to auto-distill skills after each proposal execution.
260    auto_distill: bool,
261    /// Optional trajectory store for persisting execution traces.
262    trajectory_store: Option<Arc<car_memgine::TrajectoryStore>>,
263    /// Optional replan callback for failure recovery.
264    replan_callback: TokioMutex<Option<Arc<dyn ReplanCallback>>>,
265    /// Replan configuration.
266    replan_config: TokioRwLock<ReplanConfig>,
267    /// Canonical tool registry (optional — new code should use this).
268    pub registry: Arc<crate::registry::ToolRegistry>,
269}
270
271impl Runtime {
272    pub fn new() -> Self {
273        Self {
274            state: Arc::new(StateStore::new()),
275            tools: Arc::new(TokioRwLock::new(HashMap::new())),
276            policies: Arc::new(TokioRwLock::new(PolicyEngine::new())),
277            session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
278            log: Arc::new(TokioMutex::new(EventLog::new())),
279            rate_limiter: Arc::new(RateLimiter::new()),
280            result_cache: Arc::new(ResultCache::new()),
281            tool_executor: TokioMutex::new(None),
282            idempotency_cache: TokioMutex::new(HashMap::new()),
283            cost_budget: TokioRwLock::new(None),
284            capabilities: TokioRwLock::new(None),
285            inference_engine: None,
286            memgine: None,
287            auto_distill: false,
288            trajectory_store: None,
289            replan_callback: TokioMutex::new(None),
290            replan_config: TokioRwLock::new(ReplanConfig::default()),
291            registry: Arc::new(crate::registry::ToolRegistry::new()),
292        }
293    }
294
295    /// Create a runtime with shared state, event log, and policies.
296    /// Each runtime gets its own tool set, executor, and idempotency cache.
297    pub fn with_shared(
298        state: Arc<StateStore>,
299        log: Arc<TokioMutex<EventLog>>,
300        policies: Arc<TokioRwLock<PolicyEngine>>,
301    ) -> Self {
302        Self {
303            state,
304            tools: Arc::new(TokioRwLock::new(HashMap::new())),
305            policies,
306            // Session-policy registries are per-runtime — sharing them
307            // across embedders that share global policies would defeat
308            // the isolation point. Hosts that genuinely want shared
309            // sessions should drive them through one shared Runtime.
310            session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
311            log,
312            rate_limiter: Arc::new(RateLimiter::new()),
313            result_cache: Arc::new(ResultCache::new()),
314            tool_executor: TokioMutex::new(None),
315            idempotency_cache: TokioMutex::new(HashMap::new()),
316            cost_budget: TokioRwLock::new(None),
317            capabilities: TokioRwLock::new(None),
318            inference_engine: None,
319            memgine: None,
320            auto_distill: false,
321            trajectory_store: None,
322            replan_callback: TokioMutex::new(None),
323            replan_config: TokioRwLock::new(ReplanConfig::default()),
324            registry: Arc::new(crate::registry::ToolRegistry::new()),
325        }
326    }
327
328    // ─── Session policy lifecycle ───────────────────────────────────
329    //
330    // Per-session policy scoping. Policies registered against a
331    // session apply to proposals executed under that session id (via
332    // [`Self::execute_with_session`] / [`Self::execute_with_session_and_cancel`]).
333    // Policies registered globally always apply, on top of any
334    // session-scoped layer. See `docs/proposals/per-session-policy-scoping.md`.
335
336    /// Mint a new session id and pre-register an empty policy engine
337    /// under it. Hosts call this once per concurrent agent context
338    /// (an IDE project window, a multi-tenant client, etc.) and pair
339    /// it with [`Self::close_session`] when the context ends.
340    ///
341    /// Returns the opaque id to pass to subsequent
342    /// [`Self::register_policy_in_session`] / [`Self::execute_with_session`]
343    /// calls. Ids are UUIDs so collisions across concurrent calls
344    /// don't matter.
345    pub async fn open_session(&self) -> String {
346        let id = Uuid::new_v4().to_string();
347        let mut sessions = self.session_policies.write().await;
348        sessions.insert(id.clone(), Arc::new(TokioRwLock::new(PolicyEngine::new())));
349        id
350    }
351
352    /// Drop the session and every policy scoped to it. Returns true
353    /// if a session by that id existed; false if it didn't (already
354    /// closed, never opened, etc.). Idempotent in effect — closing a
355    /// missing session is a no-op the caller is free to ignore.
356    pub async fn close_session(&self, session_id: &str) -> bool {
357        let mut sessions = self.session_policies.write().await;
358        sessions.remove(session_id).is_some()
359    }
360
361    /// Register a policy under a specific session id. The policy
362    /// applies only when a proposal is executed under that session;
363    /// proposals executed without a session (the default) only see
364    /// global policies.
365    ///
366    /// Returns `Err(...)` if the session is unknown — callers either
367    /// forgot to call [`Self::open_session`] or are using a
368    /// stale/closed id.
369    pub async fn register_policy_in_session(
370        &self,
371        session_id: &str,
372        name: &str,
373        check: car_policy::PolicyCheck,
374        description: &str,
375    ) -> Result<(), String> {
376        let engine = {
377            let sessions = self.session_policies.read().await;
378            sessions
379                .get(session_id)
380                .cloned()
381                .ok_or_else(|| format!("unknown session id '{session_id}'"))?
382        };
383        let mut engine = engine.write().await;
384        engine.register(name, check, description);
385        Ok(())
386    }
387
388    /// True if a session with this id is currently open. Mostly for
389    /// tests and FFI surface validation — production code should
390    /// trust the id it just opened.
391    pub async fn session_exists(&self, session_id: &str) -> bool {
392        self.session_policies.read().await.contains_key(session_id)
393    }
394
395    /// Attach a local inference engine. Registers `infer`, `embed`, `classify`
396    /// as built-in tools with real implementations.
397    pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
398        self.inference_engine = Some(engine);
399        // Register inference tool schemas (non-async init, use try_lock)
400        if let Ok(mut tools) = self.tools.try_write() {
401            for schema in car_inference::service::all_schemas() {
402                tools.insert(schema.name.clone(), schema);
403            }
404        }
405        self
406    }
407
408    /// Attach a memgine for automatic skill learning after execution.
409    /// When `auto_distill` is true, execution traces are automatically distilled
410    /// into skills and domains are evolved when underperforming.
411    pub fn with_learning(
412        mut self,
413        memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>,
414        auto_distill: bool,
415    ) -> Self {
416        self.memgine = Some(memgine);
417        self.auto_distill = auto_distill;
418        self
419    }
420
421    /// Attach a memgine with auto-distillation enabled (recommended default).
422    pub fn with_memgine(self, memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>) -> Self {
423        self.with_learning(memgine, true)
424    }
425
426    /// Attach a trajectory store for persisting execution traces.
427    pub fn with_trajectory_store(mut self, store: Arc<car_memgine::TrajectoryStore>) -> Self {
428        self.trajectory_store = Some(store);
429        self
430    }
431
432    pub fn with_executor(self, executor: Arc<dyn ToolExecutor>) -> Self {
433        // Use try_lock for non-async init context. Safe because we just created the mutex.
434        if let Ok(mut guard) = self.tool_executor.try_lock() {
435            *guard = Some(executor);
436        }
437        self
438    }
439
440    /// Set a tool executor for the next execute() call.
441    /// Used by NAPI bindings where executor varies per call.
442    pub async fn set_executor(&self, executor: Arc<dyn ToolExecutor>) {
443        *self.tool_executor.lock().await = Some(executor);
444    }
445
446    pub fn with_event_log(mut self, log: EventLog) -> Self {
447        self.log = Arc::new(TokioMutex::new(log));
448        self
449    }
450
451    /// Attach a replan callback for failure recovery (builder).
452    pub fn with_replan(self, callback: Arc<dyn ReplanCallback>, config: ReplanConfig) -> Self {
453        if let Ok(mut guard) = self.replan_callback.try_lock() {
454            *guard = Some(callback);
455        }
456        if let Ok(mut guard) = self.replan_config.try_write() {
457            *guard = config;
458        }
459        self
460    }
461
462    /// Set a replan callback at runtime.
463    pub async fn set_replan_callback(&self, callback: Arc<dyn ReplanCallback>) {
464        *self.replan_callback.lock().await = Some(callback);
465    }
466
467    /// Set replan configuration at runtime.
468    pub async fn set_replan_config(&self, config: ReplanConfig) {
469        *self.replan_config.write().await = config;
470    }
471
472    /// Register a tool with just a name (backward compatible).
473    pub async fn register_tool(&self, name: &str) {
474        let schema = ToolSchema {
475            name: name.to_string(),
476            description: String::new(),
477            parameters: serde_json::Value::Object(Default::default()),
478            returns: None,
479            idempotent: false,
480            cache_ttl_secs: None,
481            rate_limit: None,
482        };
483        self.register_tool_schema(schema).await;
484    }
485
486    /// Register a tool with full schema.
487    pub async fn register_tool_schema(&self, schema: ToolSchema) {
488        // Auto-configure cache if schema specifies it
489        if let Some(ttl) = schema.cache_ttl_secs {
490            self.result_cache.enable_caching(&schema.name, ttl).await;
491        }
492        // Auto-configure rate limit if schema specifies it
493        if let Some(ref rl) = schema.rate_limit {
494            self.rate_limiter
495                .set_limit(
496                    &schema.name,
497                    RateLimit {
498                        max_calls: rl.max_calls,
499                        interval_secs: rl.interval_secs,
500                    },
501                )
502                .await;
503        }
504        self.tools.write().await.insert(schema.name.clone(), schema);
505    }
506
507    /// Register a tool via the canonical registry.
508    /// This is the preferred way to register tools — it updates both the
509    /// registry and the legacy tools HashMap for backward compatibility.
510    pub async fn register_tool_entry(&self, entry: crate::registry::ToolEntry) {
511        let schema = entry.schema.clone();
512        self.registry.register(entry).await;
513        self.register_tool_schema(schema).await;
514    }
515
516    /// Register CAR's built-in agent utility stdlib.
517    ///
518    /// This is an opt-in convenience layer for common local-file and text tools.
519    /// Existing runtimes remain unchanged until this is called.
520    pub async fn register_agent_basics(&self) {
521        for entry in crate::agent_basics::entries() {
522            self.register_tool_entry(entry).await;
523        }
524    }
525
526    /// Get all registered tool schemas (for model prompt generation).
527    pub async fn tool_schemas(&self) -> Vec<ToolSchema> {
528        self.tools.read().await.values().cloned().collect()
529    }
530
531    /// Set a cost budget that limits proposal execution.
532    pub async fn set_cost_budget(&self, budget: CostBudget) {
533        *self.cost_budget.write().await = Some(budget);
534    }
535
536    /// Set per-agent capability permissions that restrict tools, state keys, and action count.
537    pub async fn set_capabilities(&self, caps: CapabilitySet) {
538        *self.capabilities.write().await = Some(caps);
539    }
540
541    /// Set a per-tool rate limit (token bucket).
542    ///
543    /// `max_calls` tokens are available per `interval_secs` window.
544    /// When the bucket is empty, `dispatch()` applies backpressure by
545    /// waiting until a token refills.
546    pub async fn set_rate_limit(&self, tool: &str, max_calls: u32, interval_secs: f64) {
547        self.rate_limiter
548            .set_limit(
549                tool,
550                RateLimit {
551                    max_calls,
552                    interval_secs,
553                },
554            )
555            .await;
556    }
557
558    /// Enable cross-proposal result caching for a tool with a TTL in seconds.
559    pub async fn enable_tool_cache(&self, tool: &str, ttl_secs: u64) {
560        self.result_cache.enable_caching(tool, ttl_secs).await;
561    }
562
563    /// Execute a proposal with automatic replanning on failure.
564    ///
565    /// If a `ReplanCallback` is registered and `max_replans > 0`, the runtime
566    /// will catch abort failures, roll back state, ask the model for an
567    /// alternative proposal via the callback, and re-execute. This transforms
568    /// "execute-and-hope" into "execute-and-recover."
569    ///
570    /// If no callback is registered or `max_replans == 0`, behaves identically
571    /// to a single `execute_inner()` call (zero overhead, fully backward compatible).
572    #[instrument(
573        name = "proposal.execute",
574        skip_all,
575        fields(
576            proposal_id = %proposal.id,
577            action_count = proposal.actions.len(),
578        )
579    )]
580    pub async fn execute(&self, proposal: &ActionProposal) -> ProposalResult {
581        // Forward to the cancel-aware variant with a never-cancelled
582        // token. Existing callers see no behaviour change.
583        let token = tokio_util::sync::CancellationToken::new();
584        self.execute_with_cancel(proposal, &token).await
585    }
586
587    /// Execute a proposal scoped to a specific session id.
588    ///
589    /// Validation walks the global policy registry plus the session's
590    /// own registry — both must pass for an action to run. Session
591    /// policies can deny what global allows; they cannot allow what
592    /// global denies (validation is conjunctive).
593    ///
594    /// Returns the same [`ProposalResult`] shape as [`Self::execute`].
595    /// Errors with an action-level rejection if the session id is
596    /// unknown — callers should check via [`Self::session_exists`] or
597    /// trust an id they minted via [`Self::open_session`].
598    pub async fn execute_with_session(
599        &self,
600        proposal: &ActionProposal,
601        session_id: &str,
602    ) -> ProposalResult {
603        let token = tokio_util::sync::CancellationToken::new();
604        self.execute_with_session_and_cancel(proposal, session_id, &token).await
605    }
606
607    /// Combined session-scoped + cancellable execute. The session id
608    /// is passed verbatim to the per-action policy check; the cancel
609    /// token behaves identically to [`Self::execute_with_cancel`].
610    pub async fn execute_with_session_and_cancel(
611        &self,
612        proposal: &ActionProposal,
613        session_id: &str,
614        cancel: &tokio_util::sync::CancellationToken,
615    ) -> ProposalResult {
616        self.execute_with_optional_session(proposal, Some(session_id), cancel).await
617    }
618
619    /// Execute a proposal with cooperative cancellation.
620    ///
621    /// The runtime checks `token.is_cancelled()` at each DAG level
622    /// boundary. When set, every action that hadn't yet started runs
623    /// is reported as `Skipped` with `error = "canceled: ..."` so
624    /// callers can distinguish "user pulled the plug" from "earlier
625    /// abort cascaded." Actions already in flight continue to
626    /// completion — tool calls dispatched to user-provided executors
627    /// can't be safely interrupted from the engine.
628    ///
629    /// The CAR A2A bridge uses this so `tasks/cancel` produces a
630    /// `ProposalResult` with clean partial state rather than relying
631    /// on `JoinHandle::abort` to interrupt mid-await (which leaves
632    /// no record of which actions actually ran).
633    ///
634    /// **FFI exposure:** this method is intentionally not surfaced
635    /// through the NAPI / PyO3 / `car-server-core` JSON-RPC bindings.
636    /// Those consumers (Node, Python, WebSocket) don't currently
637    /// expose long-running async-task surfaces that need
638    /// cancellation; the bridge is the lone consumer. When a binding
639    /// gains a long-running task surface, the path is clear: add a
640    /// per-binding token registry keyed by some caller-provided id,
641    /// expose `cancelExecution(id)` / `cancel_execution(id)` /
642    /// `proposal.cancel { id }`, and have the runtime call
643    /// `execute_with_cancel` with the matching token. Skipping that
644    /// today avoids speculative API surface that bloats bindings
645    /// without a consumer.
646    pub async fn execute_with_cancel(
647        &self,
648        proposal: &ActionProposal,
649        cancel: &tokio_util::sync::CancellationToken,
650    ) -> ProposalResult {
651        self.execute_with_optional_session(proposal, None, cancel).await
652    }
653
654    /// Internal entry point that backs both
655    /// [`Self::execute_with_cancel`] (no session) and
656    /// [`Self::execute_with_session_and_cancel`]. Holds the replan
657    /// loop and threads `session_id` into the per-action validation
658    /// path so session-scoped policies stack on top of global ones.
659    async fn execute_with_optional_session(
660        &self,
661        proposal: &ActionProposal,
662        session_id: Option<&str>,
663        cancel: &tokio_util::sync::CancellationToken,
664    ) -> ProposalResult {
665        let config = self.replan_config.read().await.clone();
666        let mut current_proposal = proposal.clone();
667        let mut attempt: u32 = 0;
668
669        loop {
670            let (result, state_before_map) = self
671                .execute_inner_with_cancel(&current_proposal, Some(cancel), session_id)
672                .await;
673
674            // Check if we aborted
675            let aborted = result
676                .results
677                .iter()
678                .any(|r| r.status == ActionStatus::Failed);
679            if !aborted || attempt >= config.max_replans {
680                if aborted && attempt > 0 {
681                    // Exhausted all replan attempts
682                    let mut log = self.log.lock().await;
683                    log.append(
684                        EventKind::ReplanExhausted,
685                        None,
686                        Some(&proposal.id),
687                        [("attempts".to_string(), Value::from(attempt))].into(),
688                    );
689                }
690
691                // Persist trajectory
692                let outcome = if !aborted {
693                    if attempt > 0 {
694                        car_memgine::TrajectoryOutcome::ReplanSuccess
695                    } else {
696                        car_memgine::TrajectoryOutcome::Success
697                    }
698                } else if attempt > 0 {
699                    car_memgine::TrajectoryOutcome::ReplanExhausted
700                } else {
701                    car_memgine::TrajectoryOutcome::Failed
702                };
703                if let Some(err) = self.persist_trajectory(
704                    proposal,
705                    &current_proposal,
706                    &result,
707                    outcome,
708                    attempt,
709                    &state_before_map,
710                ) {
711                    let mut log = self.log.lock().await;
712                    log.append(
713                        EventKind::ActionFailed,
714                        None,
715                        Some(&proposal.id),
716                        [(
717                            "trajectory_persist_error".to_string(),
718                            Value::from(err.as_str()),
719                        )]
720                        .into(),
721                    );
722                }
723
724                return result;
725            }
726
727            // Get replan callback (clone Arc, drop lock immediately)
728            let callback = {
729                let guard = self.replan_callback.lock().await;
730                guard.clone()
731            };
732            let Some(callback) = callback else {
733                // No callback registered — persist trajectory and return
734                if let Some(err) = self.persist_trajectory(
735                    proposal,
736                    &current_proposal,
737                    &result,
738                    car_memgine::TrajectoryOutcome::Failed,
739                    attempt,
740                    &state_before_map,
741                ) {
742                    let mut log = self.log.lock().await;
743                    log.append(
744                        EventKind::ActionFailed,
745                        None,
746                        Some(&proposal.id),
747                        [(
748                            "trajectory_persist_error".to_string(),
749                            Value::from(err.as_str()),
750                        )]
751                        .into(),
752                    );
753                }
754                return result;
755            };
756
757            // Build replan context
758            let failed_actions: Vec<FailedActionSummary> = result
759                .results
760                .iter()
761                .filter(|r| r.status == ActionStatus::Failed)
762                .map(|r| {
763                    let action = current_proposal
764                        .actions
765                        .iter()
766                        .find(|a| a.id == r.action_id);
767                    FailedActionSummary {
768                        action_id: r.action_id.clone(),
769                        tool: action.and_then(|a| a.tool.clone()),
770                        error: r.error.clone().unwrap_or_default(),
771                        parameters: action.map(|a| a.parameters.clone()).unwrap_or_default(),
772                    }
773                })
774                .collect();
775
776            let completed_action_ids: Vec<String> = result
777                .results
778                .iter()
779                .filter(|r| r.status == ActionStatus::Succeeded)
780                .map(|r| r.action_id.clone())
781                .collect();
782
783            let ctx = ReplanContext {
784                proposal_id: proposal.id.clone(),
785                attempt: attempt + 1,
786                failed_actions,
787                completed_action_ids,
788                state_snapshot: self.state.snapshot(),
789                replans_remaining: config.max_replans.saturating_sub(attempt + 1),
790                original_source: proposal.source.clone(),
791                original_action_count: proposal.actions.len(),
792                original_context: proposal.context.clone(),
793            };
794
795            // Backoff delay between replan attempts
796            if config.delay_ms > 0 {
797                tokio::time::sleep(Duration::from_millis(config.delay_ms)).await;
798            }
799
800            // Log replan attempt
801            {
802                let mut log = self.log.lock().await;
803                log.append(
804                    EventKind::ReplanAttempted,
805                    None,
806                    Some(&proposal.id),
807                    [
808                        ("attempt".to_string(), Value::from(attempt + 1)),
809                        (
810                            "failed_count".to_string(),
811                            Value::from(ctx.failed_actions.len()),
812                        ),
813                    ]
814                    .into(),
815                );
816            }
817
818            // Call the model for a new plan
819            match callback.replan(&ctx).await {
820                Ok(new_proposal) => {
821                    // Quality gate: verify replan proposal before executing
822                    if config.verify_before_execute {
823                        let tools_guard = self.tools.read().await;
824                        let tool_names: std::collections::HashSet<String> =
825                            tools_guard.keys().cloned().collect();
826                        drop(tools_guard);
827
828                        let current_state = self.state.snapshot();
829                        let vr = car_verify::verify(
830                            &new_proposal,
831                            Some(&current_state),
832                            Some(&tool_names),
833                            100,
834                        );
835                        if !vr.valid {
836                            let error_msgs: Vec<String> = vr
837                                .issues
838                                .iter()
839                                .filter(|i| i.severity == "error")
840                                .map(|i| i.message.clone())
841                                .collect();
842                            let mut log = self.log.lock().await;
843                            log.append(
844                                EventKind::ReplanRejected,
845                                None,
846                                Some(&proposal.id),
847                                [
848                                    ("errors".to_string(), Value::from(error_msgs.join("; "))),
849                                    ("attempt".to_string(), Value::from(attempt + 1)),
850                                ]
851                                .into(),
852                            );
853                            // Don't execute a broken replan — count as failed attempt
854                            attempt += 1;
855                            continue;
856                        }
857                    }
858
859                    // Log accepted proposal
860                    {
861                        let mut log = self.log.lock().await;
862                        log.append(
863                            EventKind::ReplanProposalReceived,
864                            None,
865                            Some(&proposal.id),
866                            [
867                                ("attempt".to_string(), Value::from(attempt + 1)),
868                                (
869                                    "new_action_count".to_string(),
870                                    Value::from(new_proposal.actions.len()),
871                                ),
872                            ]
873                            .into(),
874                        );
875                    }
876                    current_proposal = new_proposal;
877                    attempt += 1;
878                }
879                Err(e) => {
880                    // Replan callback itself failed — log and return original failure
881                    let mut log = self.log.lock().await;
882                    log.append(
883                        EventKind::ReplanExhausted,
884                        None,
885                        Some(&proposal.id),
886                        [
887                            ("reason".to_string(), Value::from("callback_error")),
888                            ("error".to_string(), Value::from(e.as_str())),
889                            ("attempt".to_string(), Value::from(attempt + 1)),
890                        ]
891                        .into(),
892                    );
893                    if let Some(err) = self.persist_trajectory(
894                        proposal,
895                        &current_proposal,
896                        &result,
897                        car_memgine::TrajectoryOutcome::Failed,
898                        attempt,
899                        &state_before_map,
900                    ) {
901                        log.append(
902                            EventKind::ActionFailed,
903                            None,
904                            Some(&proposal.id),
905                            [(
906                                "trajectory_persist_error".to_string(),
907                                Value::from(err.as_str()),
908                            )]
909                            .into(),
910                        );
911                    }
912                    return result;
913                }
914            }
915        }
916    }
917
918    /// Persist a trajectory to the store if configured.
919    fn persist_trajectory(
920        &self,
921        proposal: &ActionProposal,
922        current_proposal: &ActionProposal,
923        result: &ProposalResult,
924        outcome: car_memgine::TrajectoryOutcome,
925        attempt: u32,
926        state_before_map: &HashMap<String, HashMap<String, Value>>,
927    ) -> Option<String> {
928        let store = self.trajectory_store.as_ref()?;
929
930        let trace_events: Vec<car_memgine::TraceEvent> = result
931            .results
932            .iter()
933            .map(|r| {
934                let kind = match r.status {
935                    ActionStatus::Succeeded => "action_succeeded",
936                    ActionStatus::Failed => "action_failed",
937                    ActionStatus::Rejected => "action_rejected",
938                    ActionStatus::Skipped => "action_skipped",
939                    _ => "unknown",
940                };
941                let tool = current_proposal
942                    .actions
943                    .iter()
944                    .find(|a| a.id == r.action_id)
945                    .and_then(|a| a.tool.clone());
946                let reward = match r.status {
947                    ActionStatus::Succeeded => Some(1.0),
948                    ActionStatus::Failed => Some(0.0),
949                    ActionStatus::Rejected => Some(0.0),
950                    ActionStatus::Skipped => None,
951                    _ => None,
952                };
953                car_memgine::TraceEvent {
954                    kind: kind.to_string(),
955                    action_id: Some(r.action_id.clone()),
956                    tool,
957                    data: r
958                        .error
959                        .as_ref()
960                        .map(|e| serde_json::json!({"error": e}))
961                        .unwrap_or(serde_json::json!({})),
962                    duration_ms: r.duration_ms,
963                    state_before: state_before_map.get(&r.action_id).cloned(),
964                    state_after: if !r.state_changes.is_empty() {
965                        Some(r.state_changes.clone())
966                    } else {
967                        None
968                    },
969                    reward,
970                }
971            })
972            .collect();
973
974        let trajectory = car_memgine::Trajectory {
975            proposal_id: proposal.id.clone(),
976            source: proposal.source.clone(),
977            action_count: current_proposal.actions.len(),
978            events: trace_events,
979            outcome,
980            timestamp: chrono::Utc::now(),
981            duration_ms: result.cost.total_duration_ms,
982            replan_attempts: attempt,
983        };
984
985        match store.append(&trajectory) {
986            Ok(()) => None,
987            Err(e) => Some(e.to_string()),
988        }
989    }
990
991    /// Score N candidate proposals, execute the best valid one, fall back to
992    /// next-best on failure. Combines car-planner scoring with engine execution.
993    ///
994    /// Returns the result from whichever proposal was executed (best or fallback).
995    /// If all candidates fail verification, returns an error result for the first.
996    pub async fn plan_and_execute(
997        &self,
998        candidates: &[ActionProposal],
999        planner_config: Option<car_planner::PlannerConfig>,
1000        feedback: Option<&car_planner::ToolFeedback>,
1001    ) -> ProposalResult {
1002        if candidates.is_empty() {
1003            return ProposalResult {
1004                proposal_id: "empty".to_string(),
1005                results: vec![],
1006                cost: car_ir::CostSummary::default(),
1007            };
1008        }
1009
1010        // Score all candidates
1011        let planner = car_planner::Planner::new(planner_config.unwrap_or_default());
1012        let tools_guard = self.tools.read().await;
1013        let tool_names: std::collections::HashSet<String> = tools_guard.keys().cloned().collect();
1014        drop(tools_guard);
1015
1016        let pre_plan_snapshot = self.state.snapshot();
1017        let pre_plan_transitions = self.state.transition_count();
1018        let ranked = planner.rank_with_feedback(
1019            candidates,
1020            Some(&pre_plan_snapshot),
1021            Some(&tool_names),
1022            feedback,
1023        );
1024
1025        // Try each valid candidate in score order
1026        let mut first_failure: Option<ProposalResult> = None;
1027        for scored in &ranked {
1028            if !scored.valid {
1029                continue;
1030            }
1031
1032            // Restore clean state before each candidate (don't rely on execute's
1033            // internal rollback — it only fires on Abort, not Skip/Retry failures)
1034            self.state
1035                .restore(pre_plan_snapshot.clone(), pre_plan_transitions);
1036
1037            let proposal = &candidates[scored.index];
1038            let result = self.execute(proposal).await;
1039
1040            if result.all_succeeded() {
1041                return result;
1042            }
1043
1044            tracing::info!(
1045                proposal_id = %proposal.id,
1046                score = scored.score,
1047                "plan_and_execute: proposal failed, trying next candidate"
1048            );
1049
1050            if first_failure.is_none() {
1051                first_failure = Some(result);
1052            }
1053        }
1054
1055        // Return the first failure result (don't re-execute — avoids duplicate side effects)
1056        first_failure.unwrap_or_else(|| ProposalResult {
1057            proposal_id: candidates[0].id.clone(),
1058            results: vec![],
1059            cost: car_ir::CostSummary::default(),
1060        })
1061    }
1062
1063    /// Execute a single proposal through the runtime loop (no replanning).
1064    /// Returns (result, state_before_map) where state_before_map has per-action snapshots.
1065    ///
1066    /// `session_id`, when `Some`, scopes per-action policy validation
1067    /// to the named session in addition to global policies. Both
1068    /// layers must pass for an action to run; the session layer cannot
1069    /// loosen what global denies.
1070    async fn execute_inner_with_cancel(
1071        &self,
1072        proposal: &ActionProposal,
1073        cancel: Option<&tokio_util::sync::CancellationToken>,
1074        session_id: Option<&str>,
1075    ) -> (ProposalResult, HashMap<String, HashMap<String, Value>>) {
1076        // Generate trace_id for this proposal execution
1077        let trace_id = Uuid::new_v4().to_string();
1078
1079        // Begin root span for proposal execution
1080        let root_span_id = {
1081            let mut log = self.log.lock().await;
1082            log.begin_span(
1083                "proposal.execute",
1084                &trace_id,
1085                None,
1086                [("proposal_id".to_string(), Value::from(proposal.id.as_str()))].into(),
1087            )
1088        };
1089
1090        // Log proposal received
1091        {
1092            let mut log = self.log.lock().await;
1093            log.append(
1094                EventKind::ProposalReceived,
1095                None,
1096                Some(&proposal.id),
1097                [
1098                    ("source".to_string(), Value::from(proposal.source.as_str())),
1099                    (
1100                        "action_count".to_string(),
1101                        Value::from(proposal.actions.len()),
1102                    ),
1103                ]
1104                .into(),
1105            );
1106        }
1107
1108        // Capability check: max_actions budget for entire proposal
1109        {
1110            let caps = self.capabilities.read().await;
1111            if let Some(ref cap) = *caps {
1112                if !cap.actions_within_budget(proposal.actions.len() as u32) {
1113                    let mut action_results = Vec::new();
1114                    for action in &proposal.actions {
1115                        action_results.push(rejected_result(
1116                            &action.id,
1117                            format!(
1118                                "capability denied: proposal has {} actions, max allowed is {:?}",
1119                                proposal.actions.len(),
1120                                cap.max_actions
1121                            ),
1122                        ));
1123                    }
1124                    return (
1125                        ProposalResult {
1126                            proposal_id: proposal.id.clone(),
1127                            results: action_results,
1128                            cost: CostSummary::default(),
1129                        },
1130                        HashMap::new(),
1131                    );
1132                }
1133            }
1134        }
1135
1136        // Snapshot for rollback
1137        let snapshot = self.state.snapshot();
1138        let transition_count = self.state.transition_count();
1139
1140        let mut results: Vec<ActionResult> = Vec::new();
1141        // Per-action state snapshots captured before execution (for TraceEvent.state_before).
1142        let mut state_before_map: HashMap<String, HashMap<String, Value>> = HashMap::new();
1143        let mut aborted = false;
1144        let mut budget_exceeded = false;
1145        let mut total_retries: u32 = 0;
1146
1147        // Running cost counters for budget enforcement
1148        let mut running_tool_calls: u32 = 0;
1149        let mut running_actions: u32 = 0;
1150        let mut running_duration_ms: f64 = 0.0;
1151
1152        // Snapshot the budget once
1153        let budget = self.cost_budget.read().await.clone();
1154
1155        // Build DAG
1156        let levels = build_dag(&proposal.actions);
1157
1158        let mut canceled = false;
1159        for level in &levels {
1160            // Cooperative cancellation check at the level boundary.
1161            // Actions already in flight aren't interrupted (we can't
1162            // safely cancel a tool call dispatched to a user-provided
1163            // executor), but every action that hadn't started runs
1164            // is recorded as canceled with a clear reason.
1165            if !canceled {
1166                if let Some(token) = cancel {
1167                    if token.is_cancelled() {
1168                        canceled = true;
1169                    }
1170                }
1171            }
1172            if canceled {
1173                for &idx in level {
1174                    results.push(canceled_result(
1175                        &proposal.actions[idx].id,
1176                        "cancellation requested by caller",
1177                    ));
1178                }
1179                continue;
1180            }
1181            if aborted || budget_exceeded {
1182                let skip_reason = if budget_exceeded {
1183                    "cost budget exceeded"
1184                } else {
1185                    "skipped due to earlier abort"
1186                };
1187                for &idx in level {
1188                    results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1189                }
1190                continue;
1191            }
1192
1193            // Check if any action in this level has ABORT behavior
1194            let has_abort = level
1195                .iter()
1196                .any(|&i| proposal.actions[i].failure_behavior == FailureBehavior::Abort);
1197
1198            if level.len() == 1 || has_abort {
1199                // Sequential execution
1200                for &idx in level {
1201                    if aborted || budget_exceeded {
1202                        let skip_reason = if budget_exceeded {
1203                            "cost budget exceeded"
1204                        } else {
1205                            "skipped due to abort"
1206                        };
1207                        results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1208                        continue;
1209                    }
1210
1211                    // Budget check before execution
1212                    if let Some(ref b) = budget {
1213                        if let Some(max) = b.max_actions {
1214                            if running_actions >= max {
1215                                budget_exceeded = true;
1216                                results.push(skipped_result(
1217                                    &proposal.actions[idx].id,
1218                                    "cost budget exceeded",
1219                                ));
1220                                continue;
1221                            }
1222                        }
1223                        if let Some(max) = b.max_tool_calls {
1224                            if proposal.actions[idx].action_type == ActionType::ToolCall
1225                                && running_tool_calls >= max
1226                            {
1227                                budget_exceeded = true;
1228                                results.push(skipped_result(
1229                                    &proposal.actions[idx].id,
1230                                    "cost budget exceeded",
1231                                ));
1232                                continue;
1233                            }
1234                        }
1235                        if let Some(max) = b.max_duration_ms {
1236                            if running_duration_ms >= max {
1237                                budget_exceeded = true;
1238                                results.push(skipped_result(
1239                                    &proposal.actions[idx].id,
1240                                    "cost budget exceeded",
1241                                ));
1242                                continue;
1243                            }
1244                        }
1245                    }
1246
1247                    state_before_map.insert(
1248                        proposal.actions[idx].id.clone(),
1249                        snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1250                    );
1251                    let (ar, action_retries) = self
1252                        .process_action(
1253                            &proposal.actions[idx],
1254                            &proposal.id,
1255                            &trace_id,
1256                            &root_span_id,
1257                            session_id,
1258                        )
1259                        .await;
1260                    total_retries += action_retries;
1261
1262                    // Update running counters
1263                    if ar.status == ActionStatus::Succeeded
1264                        && proposal.actions[idx].action_type == ActionType::ToolCall
1265                    {
1266                        running_tool_calls += 1;
1267                    }
1268                    if ar.status != ActionStatus::Skipped {
1269                        running_actions += 1;
1270                    }
1271                    if let Some(d) = ar.duration_ms {
1272                        running_duration_ms += d;
1273                    }
1274
1275                    if ar.status == ActionStatus::Failed
1276                        && proposal.actions[idx].failure_behavior == FailureBehavior::Abort
1277                    {
1278                        aborted = true;
1279                    }
1280                    results.push(ar);
1281                }
1282            } else {
1283                // Concurrent execution via futures::join_all
1284                // Snapshot only relevant keys per action (all see same pre-level state)
1285                for &idx in level {
1286                    state_before_map.insert(
1287                        proposal.actions[idx].id.clone(),
1288                        snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1289                    );
1290                }
1291                let futs: Vec<_> = level
1292                    .iter()
1293                    .map(|&idx| {
1294                        self.process_action(
1295                            &proposal.actions[idx],
1296                            &proposal.id,
1297                            &trace_id,
1298                            &root_span_id,
1299                            session_id,
1300                        )
1301                    })
1302                    .collect();
1303                let level_results = futures::future::join_all(futs).await;
1304
1305                for (i, (ar, action_retries)) in level_results.into_iter().enumerate() {
1306                    let idx = level[i];
1307                    total_retries += action_retries;
1308                    if ar.status == ActionStatus::Succeeded
1309                        && proposal.actions[idx].action_type == ActionType::ToolCall
1310                    {
1311                        running_tool_calls += 1;
1312                    }
1313                    if ar.status != ActionStatus::Skipped {
1314                        running_actions += 1;
1315                    }
1316                    if let Some(d) = ar.duration_ms {
1317                        running_duration_ms += d;
1318                    }
1319                    results.push(ar);
1320                }
1321            }
1322        }
1323
1324        // Handle rollback
1325        if aborted {
1326            self.state.restore(snapshot.clone(), transition_count);
1327
1328            let mut log = self.log.lock().await;
1329            log.append(
1330                EventKind::StateSnapshot,
1331                None,
1332                Some(&proposal.id),
1333                [(
1334                    "state".to_string(),
1335                    serde_json::to_value(&snapshot).unwrap_or_default(),
1336                )]
1337                .into(),
1338            );
1339            log.append(
1340                EventKind::StateRollback,
1341                None,
1342                Some(&proposal.id),
1343                [(
1344                    "rolled_back_to".to_string(),
1345                    Value::from("pre-proposal snapshot"),
1346                )]
1347                .into(),
1348            );
1349
1350            // Clear idempotency cache for rolled-back actions
1351            let mut cache = self.idempotency_cache.lock().await;
1352            for r in &results {
1353                if r.status == ActionStatus::Succeeded {
1354                    for action in &proposal.actions {
1355                        if action.id == r.action_id && action.idempotent {
1356                            cache.remove(&idempotency_key(action));
1357                        }
1358                    }
1359                }
1360            }
1361        }
1362
1363        // Sort results to match original action order
1364        let action_order: HashMap<String, usize> = proposal
1365            .actions
1366            .iter()
1367            .enumerate()
1368            .map(|(i, a)| (a.id.clone(), i))
1369            .collect();
1370        results.sort_by_key(|r| {
1371            action_order
1372                .get(&r.action_id)
1373                .copied()
1374                .unwrap_or(usize::MAX)
1375        });
1376
1377        // Compute cost summary from results
1378        let mut cost = CostSummary::default();
1379        for r in &results {
1380            let action = action_order
1381                .get(&r.action_id)
1382                .and_then(|&i| proposal.actions.get(i));
1383            match r.status {
1384                ActionStatus::Succeeded => {
1385                    cost.actions_executed += 1;
1386                    if let Some(a) = action {
1387                        if a.action_type == ActionType::ToolCall {
1388                            cost.tool_calls += 1;
1389                        }
1390                    }
1391                }
1392                ActionStatus::Failed | ActionStatus::Rejected => {
1393                    cost.actions_executed += 1;
1394                }
1395                ActionStatus::Skipped => {
1396                    cost.actions_skipped += 1;
1397                }
1398                _ => {}
1399            }
1400            if let Some(d) = r.duration_ms {
1401                cost.total_duration_ms += d;
1402            }
1403        }
1404
1405        // Set retries from inline counter
1406        cost.retries = total_retries;
1407
1408        // End root span — Ok if no abort, Error if aborted
1409        {
1410            let span_status = if aborted {
1411                SpanStatus::Error
1412            } else {
1413                SpanStatus::Ok
1414            };
1415            let mut log = self.log.lock().await;
1416            log.end_span(&root_span_id, span_status);
1417        }
1418
1419        let proposal_result = ProposalResult {
1420            proposal_id: proposal.id.clone(),
1421            results,
1422            cost,
1423        };
1424
1425        // Post-execution: auto-distill skills from this execution trace
1426        if self.auto_distill {
1427            if let Some(ref memgine) = self.memgine {
1428                // Convert results to TraceEvents for distillation
1429                let trace_events: Vec<car_memgine::TraceEvent> = proposal_result
1430                    .results
1431                    .iter()
1432                    .map(|r| {
1433                        let kind = match r.status {
1434                            ActionStatus::Succeeded => "action_succeeded",
1435                            ActionStatus::Failed => "action_failed",
1436                            ActionStatus::Rejected => "action_rejected",
1437                            ActionStatus::Skipped => "action_skipped",
1438                            _ => "unknown",
1439                        };
1440                        // Find the matching action to get the tool name
1441                        let tool = proposal
1442                            .actions
1443                            .iter()
1444                            .find(|a| a.id == r.action_id)
1445                            .and_then(|a| a.tool.clone());
1446                        let mut data = serde_json::Map::new();
1447                        if let Some(ref e) = r.error {
1448                            data.insert("error".into(), Value::from(e.as_str()));
1449                        }
1450                        if let Some(ref o) = r.output {
1451                            data.insert("output".into(), o.clone());
1452                        }
1453                        car_memgine::TraceEvent {
1454                            kind: kind.to_string(),
1455                            action_id: Some(r.action_id.clone()),
1456                            tool,
1457                            data: Value::Object(data),
1458                            duration_ms: r.duration_ms,
1459                            reward: match r.status {
1460                                ActionStatus::Succeeded => Some(1.0),
1461                                ActionStatus::Failed | ActionStatus::Rejected => Some(0.0),
1462                                _ => None,
1463                            },
1464                            ..Default::default()
1465                        }
1466                    })
1467                    .collect();
1468
1469                let mut engine = memgine.lock().await;
1470                let skills = engine.distill_skills(&trace_events).await;
1471                if !skills.is_empty() {
1472                    let count = skills.len();
1473                    engine.ingest_distilled_skills(&skills);
1474
1475                    // Log the distillation event
1476                    let mut log = self.log.lock().await;
1477                    log.append(
1478                        EventKind::SkillDistilled,
1479                        None,
1480                        Some(&proposal_result.proposal_id),
1481                        [
1482                            ("skills_count".to_string(), Value::from(count)),
1483                            (
1484                                "skill_names".to_string(),
1485                                Value::from(
1486                                    skills.iter().map(|s| s.name.as_str()).collect::<Vec<_>>(),
1487                                ),
1488                            ),
1489                        ]
1490                        .into(),
1491                    );
1492
1493                    // Check if any domains need evolution
1494                    let threshold = engine.evolution_threshold();
1495                    let domains = engine.domains_needing_evolution(threshold);
1496                    for domain in &domains {
1497                        // Collect failed events for this domain
1498                        let failed: Vec<car_memgine::TraceEvent> = trace_events
1499                            .iter()
1500                            .filter(|e| {
1501                                matches!(e.kind.as_str(), "action_failed" | "action_rejected")
1502                            })
1503                            .cloned()
1504                            .collect();
1505                        if !failed.is_empty() {
1506                            let evolved = engine.evolve_skills(&failed, domain).await;
1507                            if !evolved.is_empty() {
1508                                log.append(
1509                                    EventKind::EvolutionTriggered,
1510                                    None,
1511                                    Some(&proposal_result.proposal_id),
1512                                    [
1513                                        ("domain".to_string(), Value::from(domain.as_str())),
1514                                        ("new_skills".to_string(), Value::from(evolved.len())),
1515                                    ]
1516                                    .into(),
1517                                );
1518                            }
1519                        }
1520                    }
1521                }
1522            }
1523        }
1524
1525        (proposal_result, state_before_map)
1526    }
1527
1528    /// Process a single action: idempotency → validate → policy → execute.
1529    /// Returns (ActionResult, retries_count).
1530    async fn process_action(
1531        &self,
1532        action: &Action,
1533        proposal_id: &str,
1534        trace_id: &str,
1535        parent_span_id: &str,
1536        session_id: Option<&str>,
1537    ) -> (ActionResult, u32) {
1538        // Derive action type name for span naming
1539        let action_type_name = serde_json::to_string(&action.action_type)
1540            .unwrap_or_default()
1541            .trim_matches('"')
1542            .to_string();
1543        let span_name = format!("action.{}", action_type_name);
1544
1545        // Begin child span for this action
1546        let action_span_id = {
1547            let mut attrs: HashMap<String, Value> = HashMap::new();
1548            attrs.insert("action_id".to_string(), Value::from(action.id.as_str()));
1549            if let Some(ref tool) = action.tool {
1550                attrs.insert("tool".to_string(), Value::from(tool.as_str()));
1551            }
1552            let mut log = self.log.lock().await;
1553            log.begin_span(&span_name, trace_id, Some(parent_span_id), attrs)
1554        };
1555
1556        // Execute the action pipeline and capture result
1557        let (result, retries) = self
1558            .process_action_inner(action, proposal_id, session_id)
1559            .await;
1560
1561        // End action span based on result status
1562        let span_status = match result.status {
1563            ActionStatus::Succeeded => SpanStatus::Ok,
1564            ActionStatus::Failed | ActionStatus::Rejected => SpanStatus::Error,
1565            _ => SpanStatus::Unset,
1566        };
1567        {
1568            let mut log = self.log.lock().await;
1569            log.end_span(&action_span_id, span_status);
1570        }
1571
1572        (result, retries)
1573    }
1574
1575    /// Inner action processing: idempotency -> validate -> policy -> execute.
1576    /// Returns (ActionResult, retries_count).
1577    #[instrument(
1578        name = "action.process",
1579        skip_all,
1580        fields(
1581            action_id = %action.id,
1582            action_type = ?action.action_type,
1583            tool = action.tool.as_deref().unwrap_or("none"),
1584        )
1585    )]
1586    async fn process_action_inner(
1587        &self,
1588        action: &Action,
1589        proposal_id: &str,
1590        session_id: Option<&str>,
1591    ) -> (ActionResult, u32) {
1592        // Idempotency check
1593        if action.idempotent {
1594            let key = idempotency_key(action);
1595            let cache = self.idempotency_cache.lock().await;
1596            if let Some(cached) = cache.get(&key) {
1597                let mut log = self.log.lock().await;
1598                log.append(
1599                    EventKind::ActionDeduplicated,
1600                    Some(&action.id),
1601                    Some(proposal_id),
1602                    [(
1603                        "cached_action_id".to_string(),
1604                        Value::from(cached.action_id.as_str()),
1605                    )]
1606                    .into(),
1607                );
1608                return (
1609                    ActionResult {
1610                        action_id: action.id.clone(),
1611                        status: cached.status.clone(),
1612                        output: cached.output.clone(),
1613                        error: cached.error.clone(),
1614                        state_changes: cached.state_changes.clone(),
1615                        duration_ms: Some(0.0),
1616                        timestamp: chrono::Utc::now(),
1617                    },
1618                    0,
1619                );
1620            }
1621        }
1622
1623        // Capability check
1624        {
1625            let caps = self.capabilities.read().await;
1626            if let Some(ref cap) = *caps {
1627                // Check tool capability for ToolCall actions
1628                if action.action_type == ActionType::ToolCall {
1629                    if let Some(ref tool_name) = action.tool {
1630                        if !cap.tool_allowed(tool_name) {
1631                            let mut log = self.log.lock().await;
1632                            log.append(
1633                                EventKind::ActionRejected,
1634                                Some(&action.id),
1635                                Some(proposal_id),
1636                                HashMap::new(),
1637                            );
1638                            return (
1639                                rejected_result(
1640                                    &action.id,
1641                                    format!("capability denied: tool '{}' not allowed", tool_name),
1642                                ),
1643                                0,
1644                            );
1645                        }
1646                    }
1647                }
1648
1649                // Check state key capability for StateWrite/StateRead actions
1650                if action.action_type == ActionType::StateWrite
1651                    || action.action_type == ActionType::StateRead
1652                {
1653                    if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
1654                        if !cap.state_key_allowed(key) {
1655                            let mut log = self.log.lock().await;
1656                            log.append(
1657                                EventKind::ActionRejected,
1658                                Some(&action.id),
1659                                Some(proposal_id),
1660                                HashMap::new(),
1661                            );
1662                            return (
1663                                rejected_result(
1664                                    &action.id,
1665                                    format!("capability denied: state key '{}' not allowed", key),
1666                                ),
1667                                0,
1668                            );
1669                        }
1670                    }
1671                }
1672            }
1673        }
1674
1675        // Validate
1676        let tools = self.tools.read().await;
1677        let validation = validate_action(action, &self.state, &tools);
1678        drop(tools);
1679
1680        if !validation.valid() {
1681            let error = validation
1682                .errors
1683                .iter()
1684                .map(|e| e.reason.as_str())
1685                .collect::<Vec<_>>()
1686                .join("; ");
1687            let mut log = self.log.lock().await;
1688            log.append(
1689                EventKind::ActionRejected,
1690                Some(&action.id),
1691                Some(proposal_id),
1692                HashMap::new(),
1693            );
1694            return (rejected_result(&action.id, error), 0);
1695        }
1696
1697        // Policy check — global registry plus, when the proposal is
1698        // executed under a session, that session's registry. Both
1699        // layers must pass for the action to proceed; session
1700        // policies are additive deny rules — they can deny what
1701        // global allows but cannot allow what global denies.
1702        {
1703            let mut violations = {
1704                let policies = self.policies.read().await;
1705                policies.check(action, &self.state)
1706            };
1707            if let Some(sid) = session_id {
1708                // Snapshot the per-session engine handle out from under
1709                // the outer registry lock so the inner check holds
1710                // only the engine's own RwLock — preserves the
1711                // documented lock-ordering discipline.
1712                let session_engine = {
1713                    let sessions = self.session_policies.read().await;
1714                    sessions.get(sid).cloned()
1715                };
1716                if let Some(engine) = session_engine {
1717                    let engine = engine.read().await;
1718                    violations.extend(engine.check(action, &self.state));
1719                } else {
1720                    // Unknown session id — refuse the action rather
1721                    // than silently fall back to global-only. A
1722                    // proposal submitted under a closed session
1723                    // shouldn't run with looser rules than the caller
1724                    // intended.
1725                    let mut log = self.log.lock().await;
1726                    log.append(
1727                        EventKind::PolicyViolation,
1728                        Some(&action.id),
1729                        Some(proposal_id),
1730                        HashMap::new(),
1731                    );
1732                    return (
1733                        rejected_result(
1734                            &action.id,
1735                            format!(
1736                                "unknown session id '{sid}' — open one via Runtime::open_session before executing under a session"
1737                            ),
1738                        ),
1739                        0,
1740                    );
1741                }
1742            }
1743            if !violations.is_empty() {
1744                let error = violations
1745                    .iter()
1746                    .map(|v| format!("policy '{}': {}", v.policy_name, v.reason))
1747                    .collect::<Vec<_>>()
1748                    .join("; ");
1749                let mut log = self.log.lock().await;
1750                log.append(
1751                    EventKind::PolicyViolation,
1752                    Some(&action.id),
1753                    Some(proposal_id),
1754                    HashMap::new(),
1755                );
1756                return (rejected_result(&action.id, error), 0);
1757            }
1758        }
1759
1760        // Validated
1761        {
1762            let mut log = self.log.lock().await;
1763            log.append(
1764                EventKind::ActionValidated,
1765                Some(&action.id),
1766                Some(proposal_id),
1767                HashMap::new(),
1768            );
1769        }
1770
1771        // Execute with retry
1772        let (result, retries) = self.execute_with_retry(action, proposal_id).await;
1773
1774        // Cache idempotent results
1775        if action.idempotent && result.status == ActionStatus::Succeeded {
1776            let mut cache = self.idempotency_cache.lock().await;
1777            cache.insert(idempotency_key(action), result.clone());
1778        }
1779
1780        tracing::info!(
1781            status = ?result.status,
1782            duration_ms = result.duration_ms,
1783            "action completed"
1784        );
1785
1786        (result, retries)
1787    }
1788
1789    /// Execute with retry logic and timeout.
1790    /// Returns (ActionResult, retries_count).
1791    async fn execute_with_retry(&self, action: &Action, proposal_id: &str) -> (ActionResult, u32) {
1792        let max_attempts = if action.failure_behavior == FailureBehavior::Retry {
1793            action.max_retries + 1
1794        } else {
1795            1
1796        };
1797
1798        let mut last_error: Option<String> = None;
1799        let mut retries: u32 = 0;
1800
1801        for attempt in 0..max_attempts {
1802            if attempt > 0 {
1803                retries += 1;
1804                let delay = RETRY_BASE_DELAY_MS * RETRY_BACKOFF_FACTOR.pow(attempt as u32 - 1);
1805                tokio::time::sleep(Duration::from_millis(delay)).await;
1806                let mut log = self.log.lock().await;
1807                log.append(
1808                    EventKind::ActionRetrying,
1809                    Some(&action.id),
1810                    Some(proposal_id),
1811                    [("attempt".to_string(), Value::from(attempt + 1))].into(),
1812                );
1813            }
1814
1815            {
1816                let mut log = self.log.lock().await;
1817                log.append(
1818                    EventKind::ActionExecuting,
1819                    Some(&action.id),
1820                    Some(proposal_id),
1821                    HashMap::new(),
1822                );
1823            }
1824
1825            let start = std::time::Instant::now();
1826            let transitions_before = self.state.transition_count();
1827
1828            // Execute with optional timeout
1829            let exec_result = if let Some(timeout_ms) = action.timeout_ms {
1830                match timeout(Duration::from_millis(timeout_ms), self.dispatch(action)).await {
1831                    Ok(r) => r,
1832                    Err(_) => Err(format!("action timed out after {}ms", timeout_ms)),
1833                }
1834            } else {
1835                self.dispatch(action).await
1836            };
1837
1838            let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
1839
1840            match exec_result {
1841                Ok(output) => {
1842                    // Commit post-effects
1843                    let mut state_changes: HashMap<String, Value> = HashMap::new();
1844                    for (key, value) in &action.expected_effects {
1845                        self.state.set(key, value.clone(), &action.id);
1846                        state_changes.insert(key.clone(), value.clone());
1847                    }
1848
1849                    // Capture state changes from dispatch
1850                    for t in self.state.transitions_since(transitions_before) {
1851                        if !state_changes.contains_key(&t.key) {
1852                            if let Some(v) = t.new_value {
1853                                state_changes.insert(t.key.clone(), v);
1854                            }
1855                        }
1856                    }
1857
1858                    let mut log = self.log.lock().await;
1859                    log.append(
1860                        EventKind::ActionSucceeded,
1861                        Some(&action.id),
1862                        Some(proposal_id),
1863                        [("duration_ms".to_string(), Value::from(duration_ms))].into(),
1864                    );
1865
1866                    if !state_changes.is_empty() {
1867                        log.append(
1868                            EventKind::StateChanged,
1869                            Some(&action.id),
1870                            Some(proposal_id),
1871                            [(
1872                                "changes".to_string(),
1873                                serde_json::to_value(&state_changes).unwrap_or_default(),
1874                            )]
1875                            .into(),
1876                        );
1877                    }
1878
1879                    return (
1880                        ActionResult {
1881                            action_id: action.id.clone(),
1882                            status: ActionStatus::Succeeded,
1883                            output: Some(output),
1884                            error: None,
1885                            state_changes,
1886                            duration_ms: Some(duration_ms),
1887                            timestamp: chrono::Utc::now(),
1888                        },
1889                        retries,
1890                    );
1891                }
1892                Err(e) => {
1893                    last_error = Some(e.clone());
1894                    let mut log = self.log.lock().await;
1895                    log.append(
1896                        EventKind::ActionFailed,
1897                        Some(&action.id),
1898                        Some(proposal_id),
1899                        [
1900                            ("error".to_string(), Value::from(e.as_str())),
1901                            ("attempt".to_string(), Value::from(attempt + 1)),
1902                        ]
1903                        .into(),
1904                    );
1905                }
1906            }
1907        }
1908
1909        // All attempts exhausted
1910        if action.failure_behavior == FailureBehavior::Skip {
1911            return (
1912                skipped_result(
1913                    &action.id,
1914                    last_error.as_deref().unwrap_or("all attempts exhausted"),
1915                ),
1916                retries,
1917            );
1918        }
1919
1920        (
1921            ActionResult {
1922                action_id: action.id.clone(),
1923                status: ActionStatus::Failed,
1924                output: None,
1925                error: last_error,
1926                state_changes: HashMap::new(),
1927                duration_ms: None,
1928                timestamp: chrono::Utc::now(),
1929            },
1930            retries,
1931        )
1932    }
1933
1934    /// Dispatch an action to the appropriate handler.
1935    async fn dispatch(&self, action: &Action) -> Result<Value, String> {
1936        match action.action_type {
1937            ActionType::ToolCall => {
1938                let tool_name = action.tool.as_deref().ok_or("tool_call has no tool")?;
1939                let params = Value::Object(
1940                    action
1941                        .parameters
1942                        .iter()
1943                        .map(|(k, v)| (k.clone(), v.clone()))
1944                        .collect(),
1945                );
1946
1947                // Check cross-proposal result cache.
1948                if let Some(cached) = self.result_cache.get(tool_name, &params).await {
1949                    return Ok(cached);
1950                }
1951
1952                // Apply rate limit backpressure before executing.
1953                self.rate_limiter.acquire(tool_name).await;
1954
1955                // Try built-in inference tools first when the inference engine is available.
1956                if matches!(
1957                    tool_name,
1958                    "infer" | "infer.grounded" | "embed" | "classify" | "transcribe" | "synthesize"
1959                ) {
1960                    if let Some(ref engine) = self.inference_engine {
1961                        // For "infer.grounded" or "infer" with memgine available,
1962                        // build context from memory and attach it to the request.
1963                        let params = {
1964                            let should_ground =
1965                                tool_name == "infer.grounded" || tool_name == "infer";
1966                            if should_ground {
1967                                if let Some(ref memgine) = self.memgine {
1968                                    if let Some(prompt) =
1969                                        params.get("prompt").and_then(|v| v.as_str())
1970                                    {
1971                                        let ctx = {
1972                                            let mut m = memgine.lock().await;
1973                                            m.build_context(prompt)
1974                                        };
1975                                        if !ctx.is_empty() {
1976                                            let mut p = params.clone();
1977                                            if let Some(obj) = p.as_object_mut() {
1978                                                obj.insert("context".to_string(), Value::from(ctx));
1979                                            }
1980                                            p
1981                                        } else {
1982                                            params
1983                                        }
1984                                    } else {
1985                                        params
1986                                    }
1987                                } else {
1988                                    params
1989                                }
1990                            } else {
1991                                params
1992                            }
1993                        };
1994
1995                        // Route "infer.grounded" to "infer" for the service layer
1996                        let effective_tool = if tool_name == "infer.grounded" {
1997                            "infer"
1998                        } else {
1999                            tool_name
2000                        };
2001                        let result =
2002                            car_inference::service::execute_tool(engine, effective_tool, &params)
2003                                .await
2004                                .map_err(|e| e.to_string());
2005
2006                        if let Ok(ref value) = result {
2007                            self.result_cache
2008                                .put(tool_name, &params, value.clone())
2009                                .await;
2010                        }
2011
2012                        return result;
2013                    }
2014                }
2015
2016                // Built-in memory consolidation tool.
2017                if tool_name == "memory.consolidate" {
2018                    if let Some(ref memgine) = self.memgine {
2019                        let report = {
2020                            let mut m = memgine.lock().await;
2021                            m.consolidate().await
2022                        };
2023                        // Log the consolidation event
2024                        {
2025                            let mut log = self.log.lock().await;
2026                            log.append(
2027                                EventKind::Consolidated,
2028                                None,
2029                                None,
2030                                [
2031                                    (
2032                                        "expired_pruned".to_string(),
2033                                        Value::from(report.expired_pruned),
2034                                    ),
2035                                    (
2036                                        "superseded_gc".to_string(),
2037                                        Value::from(report.superseded_gc),
2038                                    ),
2039                                    (
2040                                        "stale_embeddings_removed".to_string(),
2041                                        Value::from(report.stale_embeddings_removed),
2042                                    ),
2043                                    (
2044                                        "nodes_embedded".to_string(),
2045                                        Value::from(report.nodes_embedded),
2046                                    ),
2047                                    (
2048                                        "domains_evolved".to_string(),
2049                                        Value::from(report.domains_evolved.clone()),
2050                                    ),
2051                                    ("total_nodes".to_string(), Value::from(report.total_nodes)),
2052                                    ("total_edges".to_string(), Value::from(report.total_edges)),
2053                                ]
2054                                .into(),
2055                            );
2056                        }
2057                        return Ok(serde_json::to_value(&report).unwrap_or(Value::Null));
2058                    } else {
2059                        return Err(
2060                            "memory.consolidate requires memgine (attach with with_learning)"
2061                                .into(),
2062                        );
2063                    }
2064                }
2065
2066                // Prefer a configured tool_executor for any tool it claims to handle.
2067                // Fall through to agent_basics only when the configured executor is absent
2068                // or explicitly returns "unknown tool" — this prevents agent_basics' built-in
2069                // read_file/write_file (which resolve paths via std::env::current_dir) from
2070                // silently overriding an executor that carries its own working_dir.
2071                let configured = {
2072                    let guard = self.tool_executor.lock().await;
2073                    guard.as_ref().cloned()
2074                };
2075
2076                if let Some(ref executor) = configured {
2077                    let result = executor.execute(tool_name, &params).await;
2078                    let fall_through = matches!(&result, Err(e) if e.starts_with("unknown tool"));
2079                    if !fall_through {
2080                        if let Ok(ref value) = result {
2081                            self.result_cache
2082                                .put(tool_name, &params, value.clone())
2083                                .await;
2084                        }
2085                        return result;
2086                    }
2087                }
2088
2089                if let Some(result) = crate::agent_basics::execute(tool_name, &params).await {
2090                    if let Ok(ref value) = result {
2091                        self.result_cache
2092                            .put(tool_name, &params, value.clone())
2093                            .await;
2094                    }
2095                    return result;
2096                }
2097
2098                Err(format!("no handler for tool '{}'", tool_name))
2099            }
2100            ActionType::StateWrite => {
2101                let key = action
2102                    .parameters
2103                    .get("key")
2104                    .and_then(|v| v.as_str())
2105                    .ok_or("state_write requires 'key' parameter")?;
2106                let value = action
2107                    .parameters
2108                    .get("value")
2109                    .cloned()
2110                    .unwrap_or(Value::Null);
2111                self.state.set(key, value, &action.id);
2112                Ok(Value::from(format!("written: {}", key)))
2113            }
2114            ActionType::StateRead => {
2115                let key = action
2116                    .parameters
2117                    .get("key")
2118                    .and_then(|v| v.as_str())
2119                    .ok_or("state_read requires 'key' parameter")?;
2120                Ok(self.state.get(key).unwrap_or(Value::Null))
2121            }
2122            ActionType::Assertion => {
2123                let key = action
2124                    .parameters
2125                    .get("key")
2126                    .and_then(|v| v.as_str())
2127                    .ok_or("assertion requires 'key' parameter")?;
2128                let expected = action
2129                    .parameters
2130                    .get("expected")
2131                    .cloned()
2132                    .unwrap_or(Value::Null);
2133                let actual = self.state.get(key).unwrap_or(Value::Null);
2134                if actual != expected {
2135                    Err(format!(
2136                        "assertion failed: state['{}'] = {:?}, expected {:?}",
2137                        key, actual, expected
2138                    ))
2139                } else {
2140                    Ok(serde_json::json!({"asserted": key, "value": actual}))
2141                }
2142            }
2143        }
2144    }
2145
2146    // --- Checkpoint and resume ---
2147
2148    /// Save a checkpoint of the current runtime state.
2149    pub async fn save_checkpoint(&self) -> Checkpoint {
2150        let state = self.state.snapshot();
2151        let tools: Vec<String> = self.tools.read().await.keys().cloned().collect();
2152        let log = self.log.lock().await;
2153        let events: Vec<Value> = log
2154            .events()
2155            .iter()
2156            .map(|e| serde_json::to_value(e).unwrap_or_default())
2157            .collect();
2158
2159        Checkpoint {
2160            checkpoint_id: Uuid::new_v4().to_string(),
2161            created_at: chrono::Utc::now(),
2162            state,
2163            events,
2164            tools,
2165            metadata: HashMap::new(),
2166        }
2167    }
2168
2169    /// Save checkpoint to a JSON file.
2170    pub async fn save_checkpoint_to_file(&self, path: &str) -> Result<(), String> {
2171        let checkpoint = self.save_checkpoint().await;
2172        let json = serde_json::to_string_pretty(&checkpoint)
2173            .map_err(|e| format!("serialize error: {}", e))?;
2174        tokio::fs::write(path, json)
2175            .await
2176            .map_err(|e| format!("write error: {}", e))?;
2177        Ok(())
2178    }
2179
2180    /// Load a checkpoint from a JSON file and restore state.
2181    pub async fn load_checkpoint_from_file(&self, path: &str) -> Result<Checkpoint, String> {
2182        let json = tokio::fs::read_to_string(path)
2183            .await
2184            .map_err(|e| format!("read error: {}", e))?;
2185        let checkpoint: Checkpoint =
2186            serde_json::from_str(&json).map_err(|e| format!("deserialize error: {}", e))?;
2187        self.restore_checkpoint(&checkpoint).await;
2188        Ok(checkpoint)
2189    }
2190
2191    /// Restore runtime state from a checkpoint.
2192    pub async fn restore_checkpoint(&self, checkpoint: &Checkpoint) {
2193        // Replace state completely — don't merge, don't create synthetic transitions
2194        self.state.replace_all(checkpoint.state.clone());
2195        // Clear idempotency cache — stale results from pre-checkpoint execution
2196        // must not bypass validation/policy on the restored state
2197        self.idempotency_cache.lock().await.clear();
2198        // Restore tools (as name-only schemas; full schemas are not persisted in checkpoint)
2199        let mut tools = self.tools.write().await;
2200        tools.clear();
2201        for tool_name in &checkpoint.tools {
2202            let schema = ToolSchema {
2203                name: tool_name.clone(),
2204                description: String::new(),
2205                parameters: serde_json::Value::Object(Default::default()),
2206                returns: None,
2207                idempotent: false,
2208                cache_ttl_secs: None,
2209                rate_limit: None,
2210            };
2211            tools.insert(tool_name.clone(), schema);
2212        }
2213    }
2214
2215    /// Register a subprocess tool and set up the subprocess executor.
2216    /// If no executor exists, creates a new SubprocessToolExecutor.
2217    /// If one already exists, creates a new SubprocessToolExecutor with the
2218    /// existing executor as fallback.
2219    pub async fn register_subprocess_tool(
2220        &self,
2221        name: &str,
2222        tool: crate::subprocess::SubprocessTool,
2223    ) {
2224        use crate::subprocess::SubprocessToolExecutor;
2225
2226        let schema = ToolSchema {
2227            name: name.to_string(),
2228            description: format!("Subprocess tool: {}", tool.command),
2229            parameters: serde_json::Value::Object(Default::default()),
2230            returns: None,
2231            idempotent: false,
2232            cache_ttl_secs: None,
2233            rate_limit: None,
2234        };
2235        self.register_tool_schema(schema).await;
2236
2237        let mut guard = self.tool_executor.lock().await;
2238        let mut executor = match guard.take() {
2239            Some(existing) => {
2240                let mut sub = SubprocessToolExecutor::new();
2241                sub = sub.with_fallback(existing);
2242                sub
2243            }
2244            None => SubprocessToolExecutor::new(),
2245        };
2246        executor.register(name, tool);
2247        *guard = Some(std::sync::Arc::new(executor));
2248    }
2249}
2250
2251impl Default for Runtime {
2252    fn default() -> Self {
2253        Self::new()
2254    }
2255}