Skip to main content

car_engine/
executor.rs

1//! Core execution engine — propose → validate → execute → commit.
2
3use crate::cache::ResultCache;
4use crate::capabilities::CapabilitySet;
5use crate::checkpoint::Checkpoint;
6use crate::rate_limit::{RateLimit, RateLimiter};
7use car_eventlog::{EventKind, EventLog, SpanStatus};
8use car_ir::{
9    build_dag, Action, ActionProposal, ActionResult, ActionStatus, ActionType, CostSummary,
10    FailureBehavior, ProposalResult, ToolSchema,
11};
12use car_policy::PolicyEngine;
13use car_state::StateStore;
14use car_validator::validate_action;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
20use tokio::time::timeout;
21use tracing::instrument;
22use uuid::Uuid;
23
24/// Retry backoff constants.
25const RETRY_BASE_DELAY_MS: u64 = 100;
26const RETRY_BACKOFF_FACTOR: u64 = 2;
27
28// ---------------------------------------------------------------------------
29// Replan types — failure recovery via model callback
30// ---------------------------------------------------------------------------
31
32/// Callback trait for replanning failed proposals.
33/// Implement this to let the runtime ask the model for an alternative plan
34/// when a proposal aborts.
35#[async_trait::async_trait]
36pub trait ReplanCallback: Send + Sync {
37    async fn replan(&self, ctx: &ReplanContext) -> Result<ActionProposal, String>;
38}
39
40/// Context provided to the replan callback so the model can generate an alternative.
41#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42pub struct ReplanContext {
43    /// Original proposal ID.
44    pub proposal_id: String,
45    /// Which replan attempt this is (1-indexed; attempt 1 = first replan after initial failure).
46    pub attempt: u32,
47    /// Actions that failed and caused the abort.
48    pub failed_actions: Vec<FailedActionSummary>,
49    /// Action IDs that succeeded before the abort (now rolled back).
50    pub completed_action_ids: Vec<String>,
51    /// State snapshot after rollback.
52    pub state_snapshot: HashMap<String, Value>,
53    /// How many replans remain.
54    pub replans_remaining: u32,
55    /// Original proposal source (model name, agent, etc.).
56    pub original_source: String,
57    /// Total actions in the original proposal.
58    pub original_action_count: usize,
59    /// The original goal/context from the proposal (for generating alternatives).
60    pub original_context: HashMap<String, Value>,
61}
62
63/// Summary of a failed action, included in ReplanContext.
64#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
65pub struct FailedActionSummary {
66    pub action_id: String,
67    pub tool: Option<String>,
68    pub error: String,
69    pub parameters: HashMap<String, Value>,
70}
71
72/// Configuration for the replan loop.
73#[derive(Debug, Clone)]
74pub struct ReplanConfig {
75    /// Maximum number of replan attempts. 0 = disabled (default).
76    pub max_replans: u32,
77    /// Delay in milliseconds between replan attempts. Prevents burning through
78    /// attempts instantly if the model returns garbage fast. 0 = no delay.
79    pub delay_ms: u64,
80    /// If true, replan proposals are scored via car-planner's verify() before
81    /// execution. Proposals with errors are rejected without executing.
82    /// Prevents the engine from running a worse plan than the one that failed.
83    pub verify_before_execute: bool,
84}
85
86impl Default for ReplanConfig {
87    fn default() -> Self {
88        Self {
89            max_replans: 0,
90            delay_ms: 0,
91            verify_before_execute: true,
92        }
93    }
94}
95
96/// Trait for tool execution. Implement this to provide tools to the runtime.
97///
98/// In-process: implement directly with function calls.
99/// Daemon mode: implement by sending JSON-RPC to the client.
100#[async_trait::async_trait]
101pub trait ToolExecutor: Send + Sync {
102    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String>;
103
104    /// Variant that also carries the originating proposal `Action.id`
105    /// and its `timeout_ms` budget.
106    ///
107    /// `action_id`: WS-based executors (`car-server-core::WsToolExecutor`)
108    /// use it so the daemon-initiated `tools.execute` request to the client
109    /// carries the same id the host's process-wide handler is keyed on
110    /// — without this round-trip the host can't disambiguate concurrent
111    /// callbacks for the same tool (Parslee-ai/car-releases#43 follow-up).
112    ///
113    /// `timeout_ms`: the action's per-call budget. WS executors MUST bound
114    /// their callback wait by this (falling back to a default when `None`)
115    /// so the daemon→host wait and the executor's own action deadline stay
116    /// coordinated — otherwise a hardcoded inner wait reaps a call the outer
117    /// deadline still permits (Parslee-ai/car#259). In-process executors
118    /// that don't need either can keep the default forward to [`execute`].
119    async fn execute_with_action(
120        &self,
121        tool: &str,
122        params: &Value,
123        _action_id: &str,
124        _timeout_ms: Option<u64>,
125    ) -> Result<Value, String> {
126        self.execute(tool, params).await
127    }
128}
129
130/// Deterministic key for idempotency deduplication.
131fn idempotency_key(action: &Action) -> String {
132    let sorted: std::collections::BTreeMap<_, _> = action.parameters.iter().collect();
133    let params = serde_json::to_string(&sorted).unwrap_or_default();
134    format!(
135        "{}:{}:{}",
136        serde_json::to_string(&action.action_type).unwrap_or_default(),
137        action.tool.as_deref().unwrap_or(""),
138        params
139    )
140}
141
142fn rejected_result(action_id: &str, error: String) -> ActionResult {
143    ActionResult {
144        action_id: action_id.to_string(),
145        status: ActionStatus::Rejected,
146        output: None,
147        error: Some(error),
148        state_changes: HashMap::new(),
149        duration_ms: None,
150        timestamp: chrono::Utc::now(),
151    }
152}
153
154/// Capture only the state keys relevant to an action (state_dependencies + expected_effects).
155/// Returns an empty map if the action declares no relevant keys (e.g., tool calls
156/// that don't interact with state).
157fn snapshot_relevant_keys(
158    state: &car_state::StateStore,
159    action: &Action,
160) -> HashMap<String, Value> {
161    let mut keys: std::collections::HashSet<&str> = std::collections::HashSet::new();
162    for dep in &action.state_dependencies {
163        keys.insert(dep.as_str());
164    }
165    for key in action.expected_effects.keys() {
166        keys.insert(key.as_str());
167    }
168    // For state_write actions, capture the key being written
169    if action.action_type == ActionType::StateWrite {
170        if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
171            keys.insert(key);
172        }
173    }
174
175    if keys.is_empty() {
176        // No declared state interaction — skip snapshot (empty map)
177        return HashMap::new();
178    }
179
180    keys.iter()
181        .filter_map(|&k| state.get(k).map(|v| (k.to_string(), v)))
182        .collect()
183}
184
185fn skipped_result(action_id: &str, reason: &str) -> ActionResult {
186    ActionResult {
187        action_id: action_id.to_string(),
188        status: ActionStatus::Skipped,
189        output: None,
190        error: Some(reason.to_string()),
191        state_changes: HashMap::new(),
192        duration_ms: None,
193        timestamp: chrono::Utc::now(),
194    }
195}
196
197/// Prefix on the `error` field of an `ActionResult` that distinguishes
198/// "the user pulled the plug" from "earlier abort cascaded." The
199/// `ActionStatus` itself is `Skipped` in both cases (introducing a
200/// new variant ripples through every IR consumer + FFI binding); the
201/// prefix lets callers like the A2A bridge tell the cases apart
202/// without string-matching a magic literal.
203pub const CANCELED_PREFIX: &str = "canceled: ";
204
205/// Result for an action that didn't run because the proposal was
206/// cancelled mid-flight. Distinct from `Skipped` so callers can tell
207/// "didn't run because of an earlier abort" from "didn't run because
208/// the user pulled the plug."
209fn canceled_result(action_id: &str, reason: &str) -> ActionResult {
210    ActionResult {
211        action_id: action_id.to_string(),
212        status: ActionStatus::Skipped,
213        output: None,
214        error: Some(format!("{}{}", CANCELED_PREFIX, reason)),
215        state_changes: HashMap::new(),
216        duration_ms: None,
217        timestamp: chrono::Utc::now(),
218    }
219}
220
221/// Format a tool result for feeding back to a model.
222pub fn format_tool_result(result: &ActionResult) -> String {
223    match result.status {
224        ActionStatus::Succeeded => match &result.output {
225            Some(v) => serde_json::to_string(v).unwrap_or_else(|_| v.to_string()),
226            None => String::new(),
227        },
228        ActionStatus::Rejected => format!("[REJECTED] {}", result.error.as_deref().unwrap_or("")),
229        ActionStatus::Failed => format!("[FAILED] {}", result.error.as_deref().unwrap_or("")),
230        _ => format!(
231            "[{:?}] {}",
232            result.status,
233            result.error.as_deref().unwrap_or("")
234        ),
235    }
236}
237
238/// Budget constraints for proposal execution.
239#[derive(Debug, Clone)]
240pub struct CostBudget {
241    pub max_tool_calls: Option<u32>,
242    pub max_duration_ms: Option<f64>,
243    pub max_actions: Option<u32>,
244}
245
246/// Common Agent Runtime — deterministic execution layer.
247///
248/// Lock ordering discipline (never hold multiple simultaneously, never hold sync locks across .await):
249/// 1. capabilities (RwLock, read-only during execution)
250/// 2. tools (RwLock, read-only during execution)
251/// 3. policies (RwLock, read-only during execution)
252/// 4. session_policies (RwLock to find the per-session engine, then read inner Arc)
253/// 5. cost_budget (RwLock, read-only during execution)
254/// 6. log (TokioMutex, acquired/released per event)
255/// 7. tool_executor (TokioMutex, clone Arc and drop before await)
256/// 8. idempotency_cache (TokioMutex, acquired/released per check)
257///
258/// StateStore uses parking_lot::Mutex (sync) — NEVER hold across .await points.
259pub struct Runtime {
260    pub state: Arc<StateStore>,
261    pub tools: Arc<TokioRwLock<HashMap<String, ToolSchema>>>,
262    pub policies: Arc<TokioRwLock<PolicyEngine>>,
263    /// Per-session policy registries. Hosts that multiplex multiple
264    /// concurrent agent sessions over a single `Runtime` (IDE-style
265    /// frontends with per-project rules, multi-tenant servers) call
266    /// [`Runtime::open_session`] to mint an id, then
267    /// [`Runtime::register_policy_in_session`] to attach session-scoped
268    /// rules. Validation under that session walks the global registry
269    /// AND the session's — both must pass. Sessions can deny what
270    /// global allows; sessions cannot allow what global denies.
271    /// Closing a session drops its registry and any closures it holds.
272    /// See `docs/proposals/per-session-policy-scoping.md`.
273    pub session_policies: Arc<TokioRwLock<HashMap<String, Arc<TokioRwLock<PolicyEngine>>>>>,
274    pub log: Arc<TokioMutex<EventLog>>,
275    pub rate_limiter: Arc<RateLimiter>,
276    pub result_cache: Arc<ResultCache>,
277    tool_executor: TokioMutex<Option<Arc<dyn ToolExecutor>>>,
278    idempotency_cache: TokioMutex<HashMap<String, ActionResult>>,
279    cost_budget: TokioRwLock<Option<CostBudget>>,
280    capabilities: TokioRwLock<Option<CapabilitySet>>,
281    inference_engine: Option<Arc<car_inference::InferenceEngine>>,
282    /// Optional memgine for skill learning (auto-distillation after execution).
283    memgine: Option<Arc<TokioMutex<car_memgine::MemgineEngine>>>,
284    /// Whether to auto-distill skills after each proposal execution.
285    auto_distill: bool,
286    /// Optional trajectory store for persisting execution traces.
287    trajectory_store: Option<Arc<car_memgine::TrajectoryStore>>,
288    /// Optional replan callback for failure recovery.
289    replan_callback: TokioMutex<Option<Arc<dyn ReplanCallback>>>,
290    /// Replan configuration.
291    replan_config: TokioRwLock<ReplanConfig>,
292    /// Canonical tool registry (optional — new code should use this).
293    pub registry: Arc<crate::registry::ToolRegistry>,
294}
295
296impl Runtime {
297    pub fn new() -> Self {
298        Self {
299            state: Arc::new(StateStore::new()),
300            tools: Arc::new(TokioRwLock::new(HashMap::new())),
301            policies: Arc::new(TokioRwLock::new(PolicyEngine::new())),
302            session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
303            log: Arc::new(TokioMutex::new(EventLog::new())),
304            rate_limiter: Arc::new(RateLimiter::new()),
305            result_cache: Arc::new(ResultCache::new()),
306            tool_executor: TokioMutex::new(None),
307            idempotency_cache: TokioMutex::new(HashMap::new()),
308            cost_budget: TokioRwLock::new(None),
309            capabilities: TokioRwLock::new(None),
310            inference_engine: None,
311            memgine: None,
312            auto_distill: false,
313            trajectory_store: None,
314            replan_callback: TokioMutex::new(None),
315            replan_config: TokioRwLock::new(ReplanConfig::default()),
316            registry: Arc::new(crate::registry::ToolRegistry::new()),
317        }
318    }
319
320    /// Create a runtime with shared state, event log, and policies.
321    /// Each runtime gets its own tool set, executor, and idempotency cache.
322    pub fn with_shared(
323        state: Arc<StateStore>,
324        log: Arc<TokioMutex<EventLog>>,
325        policies: Arc<TokioRwLock<PolicyEngine>>,
326    ) -> Self {
327        Self {
328            state,
329            tools: Arc::new(TokioRwLock::new(HashMap::new())),
330            policies,
331            // Session-policy registries are per-runtime — sharing them
332            // across embedders that share global policies would defeat
333            // the isolation point. Hosts that genuinely want shared
334            // sessions should drive them through one shared Runtime.
335            session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
336            log,
337            rate_limiter: Arc::new(RateLimiter::new()),
338            result_cache: Arc::new(ResultCache::new()),
339            tool_executor: TokioMutex::new(None),
340            idempotency_cache: TokioMutex::new(HashMap::new()),
341            cost_budget: TokioRwLock::new(None),
342            capabilities: TokioRwLock::new(None),
343            inference_engine: None,
344            memgine: None,
345            auto_distill: false,
346            trajectory_store: None,
347            replan_callback: TokioMutex::new(None),
348            replan_config: TokioRwLock::new(ReplanConfig::default()),
349            registry: Arc::new(crate::registry::ToolRegistry::new()),
350        }
351    }
352
353    // ─── Session policy lifecycle ───────────────────────────────────
354    //
355    // Per-session policy scoping. Policies registered against a
356    // session apply to proposals executed under that session id (via
357    // [`Self::execute_with_session`] / [`Self::execute_with_session_and_cancel`]).
358    // Policies registered globally always apply, on top of any
359    // session-scoped layer. See `docs/proposals/per-session-policy-scoping.md`.
360
361    /// Mint a new session id and pre-register an empty policy engine
362    /// under it. Hosts call this once per concurrent agent context
363    /// (an IDE project window, a multi-tenant client, etc.) and pair
364    /// it with [`Self::close_session`] when the context ends.
365    ///
366    /// Returns the opaque id to pass to subsequent
367    /// [`Self::register_policy_in_session`] / [`Self::execute_with_session`]
368    /// calls. Ids are UUIDs so collisions across concurrent calls
369    /// don't matter.
370    pub async fn open_session(&self) -> String {
371        let id = Uuid::new_v4().to_string();
372        let mut sessions = self.session_policies.write().await;
373        sessions.insert(id.clone(), Arc::new(TokioRwLock::new(PolicyEngine::new())));
374        id
375    }
376
377    /// Drop the session and every policy scoped to it. Returns true
378    /// if a session by that id existed; false if it didn't (already
379    /// closed, never opened, etc.). Idempotent in effect — closing a
380    /// missing session is a no-op the caller is free to ignore.
381    pub async fn close_session(&self, session_id: &str) -> bool {
382        let mut sessions = self.session_policies.write().await;
383        sessions.remove(session_id).is_some()
384    }
385
386    /// Register a policy under a specific session id. The policy
387    /// applies only when a proposal is executed under that session;
388    /// proposals executed without a session (the default) only see
389    /// global policies.
390    ///
391    /// Returns `Err(...)` if the session is unknown — callers either
392    /// forgot to call [`Self::open_session`] or are using a
393    /// stale/closed id.
394    pub async fn register_policy_in_session(
395        &self,
396        session_id: &str,
397        name: &str,
398        check: car_policy::PolicyCheck,
399        description: &str,
400    ) -> Result<(), String> {
401        let engine = {
402            let sessions = self.session_policies.read().await;
403            sessions
404                .get(session_id)
405                .cloned()
406                .ok_or_else(|| format!("unknown session id '{session_id}'"))?
407        };
408        let mut engine = engine.write().await;
409        engine.register(name, check, description);
410        Ok(())
411    }
412
413    /// True if a session with this id is currently open. Mostly for
414    /// tests and FFI surface validation — production code should
415    /// trust the id it just opened.
416    pub async fn session_exists(&self, session_id: &str) -> bool {
417        self.session_policies.read().await.contains_key(session_id)
418    }
419
420    /// Attach a local inference engine. Registers `infer`, `embed`, `classify`
421    /// as built-in tools with real implementations.
422    pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
423        self.inference_engine = Some(engine);
424        // Register inference tool schemas (non-async init, use try_lock)
425        if let Ok(mut tools) = self.tools.try_write() {
426            for schema in car_inference::service::all_schemas() {
427                tools.insert(schema.name.clone(), schema);
428            }
429        }
430        self
431    }
432
433    /// Attach a memgine for automatic skill learning after execution.
434    /// When `auto_distill` is true, execution traces are automatically distilled
435    /// into skills and domains are evolved when underperforming.
436    pub fn with_learning(
437        mut self,
438        memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>,
439        auto_distill: bool,
440    ) -> Self {
441        self.memgine = Some(memgine);
442        self.auto_distill = auto_distill;
443        self
444    }
445
446    /// Attach a memgine with auto-distillation enabled (recommended default).
447    pub fn with_memgine(self, memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>) -> Self {
448        self.with_learning(memgine, true)
449    }
450
451    /// Attach a trajectory store for persisting execution traces.
452    pub fn with_trajectory_store(mut self, store: Arc<car_memgine::TrajectoryStore>) -> Self {
453        self.trajectory_store = Some(store);
454        self
455    }
456
457    pub fn with_executor(self, executor: Arc<dyn ToolExecutor>) -> Self {
458        // Use try_lock for non-async init context. Safe because we just created the mutex.
459        if let Ok(mut guard) = self.tool_executor.try_lock() {
460            *guard = Some(executor);
461        }
462        self
463    }
464
465    /// Set a tool executor for the next execute() call.
466    /// Used by NAPI bindings where executor varies per call.
467    pub async fn set_executor(&self, executor: Arc<dyn ToolExecutor>) {
468        *self.tool_executor.lock().await = Some(executor);
469    }
470
471    pub fn with_event_log(mut self, log: EventLog) -> Self {
472        self.log = Arc::new(TokioMutex::new(log));
473        self
474    }
475
476    /// Attach a replan callback for failure recovery (builder).
477    pub fn with_replan(self, callback: Arc<dyn ReplanCallback>, config: ReplanConfig) -> Self {
478        if let Ok(mut guard) = self.replan_callback.try_lock() {
479            *guard = Some(callback);
480        }
481        if let Ok(mut guard) = self.replan_config.try_write() {
482            *guard = config;
483        }
484        self
485    }
486
487    /// Set a replan callback at runtime.
488    pub async fn set_replan_callback(&self, callback: Arc<dyn ReplanCallback>) {
489        *self.replan_callback.lock().await = Some(callback);
490    }
491
492    /// Set replan configuration at runtime.
493    pub async fn set_replan_config(&self, config: ReplanConfig) {
494        *self.replan_config.write().await = config;
495    }
496
497    /// Register a tool with just a name (backward compatible).
498    pub async fn register_tool(&self, name: &str) {
499        let schema = ToolSchema {
500            name: name.to_string(),
501            description: String::new(),
502            parameters: serde_json::Value::Object(Default::default()),
503            returns: None,
504            idempotent: false,
505            cache_ttl_secs: None,
506            rate_limit: None,
507        };
508        self.register_tool_schema(schema).await;
509    }
510
511    /// Register a tool with full schema.
512    pub async fn register_tool_schema(&self, schema: ToolSchema) {
513        // Auto-configure cache if schema specifies it
514        if let Some(ttl) = schema.cache_ttl_secs {
515            self.result_cache.enable_caching(&schema.name, ttl).await;
516        }
517        // Auto-configure rate limit if schema specifies it
518        if let Some(ref rl) = schema.rate_limit {
519            self.rate_limiter
520                .set_limit(
521                    &schema.name,
522                    RateLimit {
523                        max_calls: rl.max_calls,
524                        interval_secs: rl.interval_secs,
525                    },
526                )
527                .await;
528        }
529        self.tools.write().await.insert(schema.name.clone(), schema);
530    }
531
532    /// Register a tool via the canonical registry.
533    /// This is the preferred way to register tools — it updates both the
534    /// registry and the legacy tools HashMap for backward compatibility.
535    pub async fn register_tool_entry(&self, entry: crate::registry::ToolEntry) {
536        let schema = entry.schema.clone();
537        self.registry.register(entry).await;
538        self.register_tool_schema(schema).await;
539    }
540
541    /// Register CAR's built-in agent utility stdlib.
542    ///
543    /// This is an opt-in convenience layer for common local-file and text tools.
544    /// Existing runtimes remain unchanged until this is called.
545    pub async fn register_agent_basics(&self) {
546        for entry in crate::agent_basics::entries() {
547            self.register_tool_entry(entry).await;
548        }
549    }
550
551    /// Get all registered tool schemas (for model prompt generation).
552    pub async fn tool_schemas(&self) -> Vec<ToolSchema> {
553        self.tools.read().await.values().cloned().collect()
554    }
555
556    /// Set a cost budget that limits proposal execution.
557    pub async fn set_cost_budget(&self, budget: CostBudget) {
558        *self.cost_budget.write().await = Some(budget);
559    }
560
561    /// Set per-agent capability permissions that restrict tools, state keys, and action count.
562    pub async fn set_capabilities(&self, caps: CapabilitySet) {
563        *self.capabilities.write().await = Some(caps);
564    }
565
566    /// Set a per-tool rate limit (token bucket).
567    ///
568    /// `max_calls` tokens are available per `interval_secs` window.
569    /// When the bucket is empty, `dispatch()` applies backpressure by
570    /// waiting until a token refills.
571    pub async fn set_rate_limit(&self, tool: &str, max_calls: u32, interval_secs: f64) {
572        self.rate_limiter
573            .set_limit(
574                tool,
575                RateLimit {
576                    max_calls,
577                    interval_secs,
578                },
579            )
580            .await;
581    }
582
583    /// Enable cross-proposal result caching for a tool with a TTL in seconds.
584    pub async fn enable_tool_cache(&self, tool: &str, ttl_secs: u64) {
585        self.result_cache.enable_caching(tool, ttl_secs).await;
586    }
587
588    /// Execute a proposal with automatic replanning on failure.
589    ///
590    /// If a `ReplanCallback` is registered and `max_replans > 0`, the runtime
591    /// will catch abort failures, roll back state, ask the model for an
592    /// alternative proposal via the callback, and re-execute. This transforms
593    /// "execute-and-hope" into "execute-and-recover."
594    ///
595    /// If no callback is registered or `max_replans == 0`, behaves identically
596    /// to a single `execute_inner()` call (zero overhead, fully backward compatible).
597    #[instrument(
598        name = "proposal.execute",
599        skip_all,
600        fields(
601            proposal_id = %proposal.id,
602            action_count = proposal.actions.len(),
603        )
604    )]
605    pub async fn execute(&self, proposal: &ActionProposal) -> ProposalResult {
606        // Forward to the cancel-aware variant with a never-cancelled
607        // token. Existing callers see no behaviour change.
608        let token = tokio_util::sync::CancellationToken::new();
609        self.execute_with_cancel(proposal, &token).await
610    }
611
612    /// Execute a proposal scoped to a specific session id.
613    ///
614    /// Validation walks the global policy registry plus the session's
615    /// own registry — both must pass for an action to run. Session
616    /// policies can deny what global allows; they cannot allow what
617    /// global denies (validation is conjunctive).
618    ///
619    /// Returns the same [`ProposalResult`] shape as [`Self::execute`].
620    /// Errors with an action-level rejection if the session id is
621    /// unknown — callers should check via [`Self::session_exists`] or
622    /// trust an id they minted via [`Self::open_session`].
623    pub async fn execute_with_session(
624        &self,
625        proposal: &ActionProposal,
626        session_id: &str,
627    ) -> ProposalResult {
628        let token = tokio_util::sync::CancellationToken::new();
629        self.execute_with_session_and_cancel(proposal, session_id, &token)
630            .await
631    }
632
633    /// Combined session-scoped + cancellable execute. The session id
634    /// is passed verbatim to the per-action policy check; the cancel
635    /// token behaves identically to [`Self::execute_with_cancel`].
636    pub async fn execute_with_session_and_cancel(
637        &self,
638        proposal: &ActionProposal,
639        session_id: &str,
640        cancel: &tokio_util::sync::CancellationToken,
641    ) -> ProposalResult {
642        self.execute_with_optional_session(proposal, Some(session_id), None, cancel)
643            .await
644    }
645
646    /// Execute a proposal with cooperative cancellation.
647    ///
648    /// The runtime checks `token.is_cancelled()` at each DAG level
649    /// boundary. When set, every action that hadn't yet started runs
650    /// is reported as `Skipped` with `error = "canceled: ..."` so
651    /// callers can distinguish "user pulled the plug" from "earlier
652    /// abort cascaded." Actions already in flight continue to
653    /// completion — tool calls dispatched to user-provided executors
654    /// can't be safely interrupted from the engine.
655    ///
656    /// The CAR A2A bridge uses this so `tasks/cancel` produces a
657    /// `ProposalResult` with clean partial state rather than relying
658    /// on `JoinHandle::abort` to interrupt mid-await (which leaves
659    /// no record of which actions actually ran).
660    ///
661    /// **FFI exposure:** this method is intentionally not surfaced
662    /// through the NAPI / PyO3 / `car-server-core` JSON-RPC bindings.
663    /// Those consumers (Node, Python, WebSocket) don't currently
664    /// expose long-running async-task surfaces that need
665    /// cancellation; the bridge is the lone consumer. When a binding
666    /// gains a long-running task surface, the path is clear: add a
667    /// per-binding token registry keyed by some caller-provided id,
668    /// expose `cancelExecution(id)` / `cancel_execution(id)` /
669    /// `proposal.cancel { id }`, and have the runtime call
670    /// `execute_with_cancel` with the matching token. Skipping that
671    /// today avoids speculative API surface that bloats bindings
672    /// without a consumer.
673    pub async fn execute_with_cancel(
674        &self,
675        proposal: &ActionProposal,
676        cancel: &tokio_util::sync::CancellationToken,
677    ) -> ProposalResult {
678        self.execute_with_optional_session(proposal, None, None, cancel)
679            .await
680    }
681
682    /// Execute a proposal with an attached [`RuntimeScope`]
683    /// (Parslee-ai/car#187 phase 3).
684    ///
685    /// Same contract as [`Self::execute`] plus a per-execution
686    /// identity surface — typically built by the car-a2a dispatcher
687    /// from the verified `Identity` and cooperative `a2a_caller`
688    /// metadata on the inbound `ActionProposal`. The scope is
689    /// recorded on the event log so downstream audit / log analysis
690    /// can see which caller / tenant issued each action.
691    ///
692    /// **What this enforces today**: scope is captured + logged.
693    /// Memgine queries and state-store ops still hit global
694    /// namespaces — those follow-ups are tracked under #187.
695    /// Tool / policy code that needs per-tenant behaviour right now
696    /// should keep reading `proposal.context["a2a_caller_verified"]`
697    /// directly (the phase 1 / 2 surface).
698    pub async fn execute_scoped(
699        &self,
700        proposal: &ActionProposal,
701        scope: &crate::scope::RuntimeScope,
702    ) -> ProposalResult {
703        let token = tokio_util::sync::CancellationToken::new();
704        self.execute_scoped_with_cancel(proposal, scope, &token)
705            .await
706    }
707
708    /// Combined scoped + cancellable execute. Mirrors the shape of
709    /// [`Self::execute_with_session_and_cancel`] for symmetry — both
710    /// add a side-channel (session id / scope) on top of the
711    /// cancellable form.
712    pub async fn execute_scoped_with_cancel(
713        &self,
714        proposal: &ActionProposal,
715        scope: &crate::scope::RuntimeScope,
716        cancel: &tokio_util::sync::CancellationToken,
717    ) -> ProposalResult {
718        self.execute_with_optional_session(proposal, None, Some(scope), cancel)
719            .await
720    }
721
722    /// Internal entry point that backs both
723    /// [`Self::execute_with_cancel`] (no session) and
724    /// [`Self::execute_with_session_and_cancel`]. Holds the replan
725    /// loop and threads `session_id` into the per-action validation
726    /// path so session-scoped policies stack on top of global ones.
727    async fn execute_with_optional_session(
728        &self,
729        proposal: &ActionProposal,
730        session_id: Option<&str>,
731        scope: Option<&crate::scope::RuntimeScope>,
732        cancel: &tokio_util::sync::CancellationToken,
733    ) -> ProposalResult {
734        let config = self.replan_config.read().await.clone();
735        let mut current_proposal = proposal.clone();
736        let mut attempt: u32 = 0;
737
738        // Phase 3 foundation (Parslee-ai/car#187): record the scope
739        // on the event log so audit / log analysis can correlate
740        // actions to the caller / tenant that triggered them. Only
741        // logged when at least one identity field is set — keeps
742        // the existing in-process call sites free of noise.
743        if let Some(s) = scope {
744            if !s.is_unscoped() {
745                let mut props: HashMap<String, Value> = HashMap::new();
746                if let Some(cid) = &s.caller_id {
747                    props.insert("caller_id".to_string(), Value::from(cid.as_str()));
748                }
749                if let Some(tid) = &s.tenant_id {
750                    props.insert("tenant_id".to_string(), Value::from(tid.as_str()));
751                }
752                if !s.claims.is_empty() {
753                    if let Ok(claims_json) = serde_json::to_value(&s.claims) {
754                        props.insert("claims".to_string(), claims_json);
755                    }
756                }
757                let mut log = self.log.lock().await;
758                log.append(EventKind::SessionScope, None, Some(&proposal.id), props);
759            }
760        }
761
762        loop {
763            let (result, state_before_map) = self
764                .execute_inner_with_cancel(&current_proposal, Some(cancel), session_id, scope)
765                .await;
766
767            // Check if we aborted
768            let aborted = result
769                .results
770                .iter()
771                .any(|r| r.status == ActionStatus::Failed);
772            if !aborted || attempt >= config.max_replans {
773                if aborted && attempt > 0 {
774                    // Exhausted all replan attempts
775                    let mut log = self.log.lock().await;
776                    log.append(
777                        EventKind::ReplanExhausted,
778                        None,
779                        Some(&proposal.id),
780                        [("attempts".to_string(), Value::from(attempt))].into(),
781                    );
782                }
783
784                // Persist trajectory
785                let outcome = if !aborted {
786                    if attempt > 0 {
787                        car_memgine::TrajectoryOutcome::ReplanSuccess
788                    } else {
789                        car_memgine::TrajectoryOutcome::Success
790                    }
791                } else if attempt > 0 {
792                    car_memgine::TrajectoryOutcome::ReplanExhausted
793                } else {
794                    car_memgine::TrajectoryOutcome::Failed
795                };
796                if let Some(err) = self.persist_trajectory(
797                    proposal,
798                    &current_proposal,
799                    &result,
800                    outcome,
801                    attempt,
802                    &state_before_map,
803                ) {
804                    let mut log = self.log.lock().await;
805                    log.append(
806                        EventKind::ActionFailed,
807                        None,
808                        Some(&proposal.id),
809                        [(
810                            "trajectory_persist_error".to_string(),
811                            Value::from(err.as_str()),
812                        )]
813                        .into(),
814                    );
815                }
816
817                return result;
818            }
819
820            // Get replan callback (clone Arc, drop lock immediately)
821            let callback = {
822                let guard = self.replan_callback.lock().await;
823                guard.clone()
824            };
825            let Some(callback) = callback else {
826                // No callback registered — persist trajectory and return
827                if let Some(err) = self.persist_trajectory(
828                    proposal,
829                    &current_proposal,
830                    &result,
831                    car_memgine::TrajectoryOutcome::Failed,
832                    attempt,
833                    &state_before_map,
834                ) {
835                    let mut log = self.log.lock().await;
836                    log.append(
837                        EventKind::ActionFailed,
838                        None,
839                        Some(&proposal.id),
840                        [(
841                            "trajectory_persist_error".to_string(),
842                            Value::from(err.as_str()),
843                        )]
844                        .into(),
845                    );
846                }
847                return result;
848            };
849
850            // Build replan context
851            let failed_actions: Vec<FailedActionSummary> = result
852                .results
853                .iter()
854                .filter(|r| r.status == ActionStatus::Failed)
855                .map(|r| {
856                    let action = current_proposal
857                        .actions
858                        .iter()
859                        .find(|a| a.id == r.action_id);
860                    FailedActionSummary {
861                        action_id: r.action_id.clone(),
862                        tool: action.and_then(|a| a.tool.clone()),
863                        error: r.error.clone().unwrap_or_default(),
864                        parameters: action.map(|a| a.parameters.clone()).unwrap_or_default(),
865                    }
866                })
867                .collect();
868
869            let completed_action_ids: Vec<String> = result
870                .results
871                .iter()
872                .filter(|r| r.status == ActionStatus::Succeeded)
873                .map(|r| r.action_id.clone())
874                .collect();
875
876            let ctx = ReplanContext {
877                proposal_id: proposal.id.clone(),
878                attempt: attempt + 1,
879                failed_actions,
880                completed_action_ids,
881                state_snapshot: self.state.snapshot(),
882                replans_remaining: config.max_replans.saturating_sub(attempt + 1),
883                original_source: proposal.source.clone(),
884                original_action_count: proposal.actions.len(),
885                original_context: proposal.context.clone(),
886            };
887
888            // Backoff delay between replan attempts
889            if config.delay_ms > 0 {
890                tokio::time::sleep(Duration::from_millis(config.delay_ms)).await;
891            }
892
893            // Log replan attempt
894            {
895                let mut log = self.log.lock().await;
896                log.append(
897                    EventKind::ReplanAttempted,
898                    None,
899                    Some(&proposal.id),
900                    [
901                        ("attempt".to_string(), Value::from(attempt + 1)),
902                        (
903                            "failed_count".to_string(),
904                            Value::from(ctx.failed_actions.len()),
905                        ),
906                    ]
907                    .into(),
908                );
909            }
910
911            // Call the model for a new plan
912            match callback.replan(&ctx).await {
913                Ok(new_proposal) => {
914                    // Quality gate: verify replan proposal before executing
915                    if config.verify_before_execute {
916                        let current_state = self.state.snapshot();
917                        // Verify against the full registered schemas
918                        // (not just names) so a replan with a bad
919                        // parameter type / missing required field is
920                        // rejected here, per register_tool_schema's
921                        // contract (car-releases#56). The read guard is
922                        // held across the synchronous verify call.
923                        let tools_guard = self.tools.read().await;
924                        let vr = car_verify::verify_with_schemas(
925                            &new_proposal,
926                            Some(&current_state),
927                            Some(&tools_guard),
928                            100,
929                        );
930                        drop(tools_guard);
931                        if !vr.valid {
932                            let error_msgs: Vec<String> = vr
933                                .issues
934                                .iter()
935                                .filter(|i| i.severity == "error")
936                                .map(|i| i.message.clone())
937                                .collect();
938                            let mut log = self.log.lock().await;
939                            log.append(
940                                EventKind::ReplanRejected,
941                                None,
942                                Some(&proposal.id),
943                                [
944                                    ("errors".to_string(), Value::from(error_msgs.join("; "))),
945                                    ("attempt".to_string(), Value::from(attempt + 1)),
946                                ]
947                                .into(),
948                            );
949                            // Don't execute a broken replan — count as failed attempt
950                            attempt += 1;
951                            continue;
952                        }
953                    }
954
955                    // Log accepted proposal
956                    {
957                        let mut log = self.log.lock().await;
958                        log.append(
959                            EventKind::ReplanProposalReceived,
960                            None,
961                            Some(&proposal.id),
962                            [
963                                ("attempt".to_string(), Value::from(attempt + 1)),
964                                (
965                                    "new_action_count".to_string(),
966                                    Value::from(new_proposal.actions.len()),
967                                ),
968                            ]
969                            .into(),
970                        );
971                    }
972                    current_proposal = new_proposal;
973                    attempt += 1;
974                }
975                Err(e) => {
976                    // Replan callback itself failed — log and return original failure
977                    let mut log = self.log.lock().await;
978                    log.append(
979                        EventKind::ReplanExhausted,
980                        None,
981                        Some(&proposal.id),
982                        [
983                            ("reason".to_string(), Value::from("callback_error")),
984                            ("error".to_string(), Value::from(e.as_str())),
985                            ("attempt".to_string(), Value::from(attempt + 1)),
986                        ]
987                        .into(),
988                    );
989                    if let Some(err) = self.persist_trajectory(
990                        proposal,
991                        &current_proposal,
992                        &result,
993                        car_memgine::TrajectoryOutcome::Failed,
994                        attempt,
995                        &state_before_map,
996                    ) {
997                        log.append(
998                            EventKind::ActionFailed,
999                            None,
1000                            Some(&proposal.id),
1001                            [(
1002                                "trajectory_persist_error".to_string(),
1003                                Value::from(err.as_str()),
1004                            )]
1005                            .into(),
1006                        );
1007                    }
1008                    return result;
1009                }
1010            }
1011        }
1012    }
1013
1014    /// Persist a trajectory to the store if configured.
1015    fn persist_trajectory(
1016        &self,
1017        proposal: &ActionProposal,
1018        current_proposal: &ActionProposal,
1019        result: &ProposalResult,
1020        outcome: car_memgine::TrajectoryOutcome,
1021        attempt: u32,
1022        state_before_map: &HashMap<String, HashMap<String, Value>>,
1023    ) -> Option<String> {
1024        let store = self.trajectory_store.as_ref()?;
1025
1026        let trace_events: Vec<car_memgine::TraceEvent> = result
1027            .results
1028            .iter()
1029            .map(|r| {
1030                let kind = match r.status {
1031                    ActionStatus::Succeeded => "action_succeeded",
1032                    ActionStatus::Failed => "action_failed",
1033                    ActionStatus::Rejected => "action_rejected",
1034                    ActionStatus::Skipped => "action_skipped",
1035                    _ => "unknown",
1036                };
1037                let tool = current_proposal
1038                    .actions
1039                    .iter()
1040                    .find(|a| a.id == r.action_id)
1041                    .and_then(|a| a.tool.clone());
1042                let reward = match r.status {
1043                    ActionStatus::Succeeded => Some(1.0),
1044                    ActionStatus::Failed => Some(0.0),
1045                    ActionStatus::Rejected => Some(0.0),
1046                    ActionStatus::Skipped => None,
1047                    _ => None,
1048                };
1049                car_memgine::TraceEvent {
1050                    kind: kind.to_string(),
1051                    action_id: Some(r.action_id.clone()),
1052                    tool,
1053                    data: r
1054                        .error
1055                        .as_ref()
1056                        .map(|e| serde_json::json!({"error": e}))
1057                        .unwrap_or(serde_json::json!({})),
1058                    duration_ms: r.duration_ms,
1059                    state_before: state_before_map.get(&r.action_id).cloned(),
1060                    state_after: if !r.state_changes.is_empty() {
1061                        Some(r.state_changes.clone())
1062                    } else {
1063                        None
1064                    },
1065                    reward,
1066                }
1067            })
1068            .collect();
1069
1070        let trajectory = car_memgine::Trajectory {
1071            proposal_id: proposal.id.clone(),
1072            source: proposal.source.clone(),
1073            action_count: current_proposal.actions.len(),
1074            events: trace_events,
1075            outcome,
1076            timestamp: chrono::Utc::now(),
1077            duration_ms: result.cost.total_duration_ms,
1078            replan_attempts: attempt,
1079        };
1080
1081        match store.append(&trajectory) {
1082            Ok(()) => None,
1083            Err(e) => Some(e.to_string()),
1084        }
1085    }
1086
1087    /// Score N candidate proposals, execute the best valid one, fall back to
1088    /// next-best on failure. Combines car-planner scoring with engine execution.
1089    ///
1090    /// Returns the result from whichever proposal was executed (best or fallback).
1091    /// If all candidates fail verification, returns an error result for the first.
1092    pub async fn plan_and_execute(
1093        &self,
1094        candidates: &[ActionProposal],
1095        planner_config: Option<car_planner::PlannerConfig>,
1096        feedback: Option<&car_planner::ToolFeedback>,
1097    ) -> ProposalResult {
1098        if candidates.is_empty() {
1099            return ProposalResult {
1100                proposal_id: "empty".to_string(),
1101                results: vec![],
1102                cost: car_ir::CostSummary::default(),
1103            };
1104        }
1105
1106        // Score all candidates
1107        let planner = car_planner::Planner::new(planner_config.unwrap_or_default());
1108        let tools_guard = self.tools.read().await;
1109        let tool_names: std::collections::HashSet<String> = tools_guard.keys().cloned().collect();
1110        drop(tools_guard);
1111
1112        let pre_plan_snapshot = self.state.snapshot();
1113        let pre_plan_transitions = self.state.transition_count();
1114        let ranked = planner.rank_with_feedback(
1115            candidates,
1116            Some(&pre_plan_snapshot),
1117            Some(&tool_names),
1118            feedback,
1119        );
1120
1121        // Try each valid candidate in score order
1122        let mut first_failure: Option<ProposalResult> = None;
1123        for scored in &ranked {
1124            if !scored.valid {
1125                continue;
1126            }
1127
1128            // Restore clean state before each candidate (don't rely on execute's
1129            // internal rollback — it only fires on Abort, not Skip/Retry failures)
1130            self.state
1131                .restore(pre_plan_snapshot.clone(), pre_plan_transitions);
1132
1133            let proposal = &candidates[scored.index];
1134            let result = self.execute(proposal).await;
1135
1136            if result.all_succeeded() {
1137                return result;
1138            }
1139
1140            tracing::info!(
1141                proposal_id = %proposal.id,
1142                score = scored.score,
1143                "plan_and_execute: proposal failed, trying next candidate"
1144            );
1145
1146            if first_failure.is_none() {
1147                first_failure = Some(result);
1148            }
1149        }
1150
1151        // Return the first failure result (don't re-execute — avoids duplicate side effects)
1152        first_failure.unwrap_or_else(|| ProposalResult {
1153            proposal_id: candidates[0].id.clone(),
1154            results: vec![],
1155            cost: car_ir::CostSummary::default(),
1156        })
1157    }
1158
1159    /// Execute a single proposal through the runtime loop (no replanning).
1160    /// Returns (result, state_before_map) where state_before_map has per-action snapshots.
1161    ///
1162    /// `session_id`, when `Some`, scopes per-action policy validation
1163    /// to the named session in addition to global policies. Both
1164    /// layers must pass for an action to run; the session layer cannot
1165    /// loosen what global denies.
1166    async fn execute_inner_with_cancel(
1167        &self,
1168        proposal: &ActionProposal,
1169        cancel: Option<&tokio_util::sync::CancellationToken>,
1170        session_id: Option<&str>,
1171        scope: Option<&crate::scope::RuntimeScope>,
1172    ) -> (ProposalResult, HashMap<String, HashMap<String, Value>>) {
1173        // Generate trace_id for this proposal execution
1174        let trace_id = Uuid::new_v4().to_string();
1175
1176        // Begin root span for proposal execution
1177        let root_span_id = {
1178            let mut log = self.log.lock().await;
1179            log.begin_span(
1180                "proposal.execute",
1181                &trace_id,
1182                None,
1183                [("proposal_id".to_string(), Value::from(proposal.id.as_str()))].into(),
1184            )
1185        };
1186
1187        // Log proposal received
1188        {
1189            let mut log = self.log.lock().await;
1190            log.append(
1191                EventKind::ProposalReceived,
1192                None,
1193                Some(&proposal.id),
1194                [
1195                    ("source".to_string(), Value::from(proposal.source.as_str())),
1196                    (
1197                        "action_count".to_string(),
1198                        Value::from(proposal.actions.len()),
1199                    ),
1200                ]
1201                .into(),
1202            );
1203        }
1204
1205        // Capability check: max_actions budget for entire proposal
1206        {
1207            let caps = self.capabilities.read().await;
1208            if let Some(ref cap) = *caps {
1209                if !cap.actions_within_budget(proposal.actions.len() as u32) {
1210                    let mut action_results = Vec::new();
1211                    for action in &proposal.actions {
1212                        action_results.push(rejected_result(
1213                            &action.id,
1214                            format!(
1215                                "capability denied: proposal has {} actions, max allowed is {:?}",
1216                                proposal.actions.len(),
1217                                cap.max_actions
1218                            ),
1219                        ));
1220                    }
1221                    return (
1222                        ProposalResult {
1223                            proposal_id: proposal.id.clone(),
1224                            results: action_results,
1225                            cost: CostSummary::default(),
1226                        },
1227                        HashMap::new(),
1228                    );
1229                }
1230            }
1231        }
1232
1233        // Snapshot for rollback
1234        let snapshot = self.state.snapshot();
1235        let transition_count = self.state.transition_count();
1236
1237        let mut results: Vec<ActionResult> = Vec::new();
1238        // Per-action state snapshots captured before execution (for TraceEvent.state_before).
1239        let mut state_before_map: HashMap<String, HashMap<String, Value>> = HashMap::new();
1240        let mut aborted = false;
1241        let mut budget_exceeded = false;
1242        let mut total_retries: u32 = 0;
1243
1244        // Running cost counters for budget enforcement
1245        let mut running_tool_calls: u32 = 0;
1246        let mut running_actions: u32 = 0;
1247        let mut running_duration_ms: f64 = 0.0;
1248
1249        // Snapshot the budget once
1250        let budget = self.cost_budget.read().await.clone();
1251
1252        // Build DAG
1253        let levels = build_dag(&proposal.actions);
1254
1255        let mut canceled = false;
1256        for level in &levels {
1257            // Cooperative cancellation check at the level boundary.
1258            // Actions already in flight aren't interrupted (we can't
1259            // safely cancel a tool call dispatched to a user-provided
1260            // executor), but every action that hadn't started runs
1261            // is recorded as canceled with a clear reason.
1262            if !canceled {
1263                if let Some(token) = cancel {
1264                    if token.is_cancelled() {
1265                        canceled = true;
1266                    }
1267                }
1268            }
1269            if canceled {
1270                for &idx in level {
1271                    results.push(canceled_result(
1272                        &proposal.actions[idx].id,
1273                        "cancellation requested by caller",
1274                    ));
1275                }
1276                continue;
1277            }
1278            if aborted || budget_exceeded {
1279                let skip_reason = if budget_exceeded {
1280                    "cost budget exceeded"
1281                } else {
1282                    "skipped due to earlier abort"
1283                };
1284                for &idx in level {
1285                    results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1286                }
1287                continue;
1288            }
1289
1290            // Check if any action in this level has ABORT behavior
1291            let has_abort = level
1292                .iter()
1293                .any(|&i| proposal.actions[i].failure_behavior == FailureBehavior::Abort);
1294
1295            if level.len() == 1 || has_abort {
1296                // Sequential execution
1297                for &idx in level {
1298                    if aborted || budget_exceeded {
1299                        let skip_reason = if budget_exceeded {
1300                            "cost budget exceeded"
1301                        } else {
1302                            "skipped due to abort"
1303                        };
1304                        results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1305                        continue;
1306                    }
1307
1308                    // Budget check before execution
1309                    if let Some(ref b) = budget {
1310                        if let Some(max) = b.max_actions {
1311                            if running_actions >= max {
1312                                budget_exceeded = true;
1313                                results.push(skipped_result(
1314                                    &proposal.actions[idx].id,
1315                                    "cost budget exceeded",
1316                                ));
1317                                continue;
1318                            }
1319                        }
1320                        if let Some(max) = b.max_tool_calls {
1321                            if proposal.actions[idx].action_type == ActionType::ToolCall
1322                                && running_tool_calls >= max
1323                            {
1324                                budget_exceeded = true;
1325                                results.push(skipped_result(
1326                                    &proposal.actions[idx].id,
1327                                    "cost budget exceeded",
1328                                ));
1329                                continue;
1330                            }
1331                        }
1332                        if let Some(max) = b.max_duration_ms {
1333                            if running_duration_ms >= max {
1334                                budget_exceeded = true;
1335                                results.push(skipped_result(
1336                                    &proposal.actions[idx].id,
1337                                    "cost budget exceeded",
1338                                ));
1339                                continue;
1340                            }
1341                        }
1342                    }
1343
1344                    state_before_map.insert(
1345                        proposal.actions[idx].id.clone(),
1346                        snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1347                    );
1348                    let (ar, action_retries) = self
1349                        .process_action(
1350                            &proposal.actions[idx],
1351                            &proposal.id,
1352                            &trace_id,
1353                            &root_span_id,
1354                            session_id,
1355                            scope,
1356                        )
1357                        .await;
1358                    total_retries += action_retries;
1359
1360                    // Update running counters
1361                    if ar.status == ActionStatus::Succeeded
1362                        && proposal.actions[idx].action_type == ActionType::ToolCall
1363                    {
1364                        running_tool_calls += 1;
1365                    }
1366                    if ar.status != ActionStatus::Skipped {
1367                        running_actions += 1;
1368                    }
1369                    if let Some(d) = ar.duration_ms {
1370                        running_duration_ms += d;
1371                    }
1372
1373                    if ar.status == ActionStatus::Failed
1374                        && proposal.actions[idx].failure_behavior == FailureBehavior::Abort
1375                    {
1376                        aborted = true;
1377                    }
1378                    results.push(ar);
1379                }
1380            } else {
1381                // Concurrent execution via futures::join_all
1382                // Snapshot only relevant keys per action (all see same pre-level state)
1383                for &idx in level {
1384                    state_before_map.insert(
1385                        proposal.actions[idx].id.clone(),
1386                        snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1387                    );
1388                }
1389                let futs: Vec<_> = level
1390                    .iter()
1391                    .map(|&idx| {
1392                        self.process_action(
1393                            &proposal.actions[idx],
1394                            &proposal.id,
1395                            &trace_id,
1396                            &root_span_id,
1397                            session_id,
1398                            scope,
1399                        )
1400                    })
1401                    .collect();
1402                let level_results = futures::future::join_all(futs).await;
1403
1404                for (i, (ar, action_retries)) in level_results.into_iter().enumerate() {
1405                    let idx = level[i];
1406                    total_retries += action_retries;
1407                    if ar.status == ActionStatus::Succeeded
1408                        && proposal.actions[idx].action_type == ActionType::ToolCall
1409                    {
1410                        running_tool_calls += 1;
1411                    }
1412                    if ar.status != ActionStatus::Skipped {
1413                        running_actions += 1;
1414                    }
1415                    if let Some(d) = ar.duration_ms {
1416                        running_duration_ms += d;
1417                    }
1418                    results.push(ar);
1419                }
1420            }
1421        }
1422
1423        // Handle rollback
1424        if aborted {
1425            self.state.restore(snapshot.clone(), transition_count);
1426
1427            let mut log = self.log.lock().await;
1428            log.append(
1429                EventKind::StateSnapshot,
1430                None,
1431                Some(&proposal.id),
1432                [(
1433                    "state".to_string(),
1434                    serde_json::to_value(&snapshot).unwrap_or_default(),
1435                )]
1436                .into(),
1437            );
1438            log.append(
1439                EventKind::StateRollback,
1440                None,
1441                Some(&proposal.id),
1442                [(
1443                    "rolled_back_to".to_string(),
1444                    Value::from("pre-proposal snapshot"),
1445                )]
1446                .into(),
1447            );
1448
1449            // Clear idempotency cache for rolled-back actions
1450            let mut cache = self.idempotency_cache.lock().await;
1451            for r in &results {
1452                if r.status == ActionStatus::Succeeded {
1453                    for action in &proposal.actions {
1454                        if action.id == r.action_id && action.idempotent {
1455                            cache.remove(&idempotency_key(action));
1456                        }
1457                    }
1458                }
1459            }
1460        }
1461
1462        // Sort results to match original action order
1463        let action_order: HashMap<String, usize> = proposal
1464            .actions
1465            .iter()
1466            .enumerate()
1467            .map(|(i, a)| (a.id.clone(), i))
1468            .collect();
1469        results.sort_by_key(|r| {
1470            action_order
1471                .get(&r.action_id)
1472                .copied()
1473                .unwrap_or(usize::MAX)
1474        });
1475
1476        // Compute cost summary from results
1477        let mut cost = CostSummary::default();
1478        for r in &results {
1479            let action = action_order
1480                .get(&r.action_id)
1481                .and_then(|&i| proposal.actions.get(i));
1482            match r.status {
1483                ActionStatus::Succeeded => {
1484                    cost.actions_executed += 1;
1485                    if let Some(a) = action {
1486                        if a.action_type == ActionType::ToolCall {
1487                            cost.tool_calls += 1;
1488                        }
1489                    }
1490                }
1491                ActionStatus::Failed | ActionStatus::Rejected => {
1492                    cost.actions_executed += 1;
1493                }
1494                ActionStatus::Skipped => {
1495                    cost.actions_skipped += 1;
1496                }
1497                _ => {}
1498            }
1499            if let Some(d) = r.duration_ms {
1500                cost.total_duration_ms += d;
1501            }
1502        }
1503
1504        // Set retries from inline counter
1505        cost.retries = total_retries;
1506
1507        // End root span — Ok if no abort, Error if aborted
1508        {
1509            let span_status = if aborted {
1510                SpanStatus::Error
1511            } else {
1512                SpanStatus::Ok
1513            };
1514            let mut log = self.log.lock().await;
1515            log.end_span(&root_span_id, span_status);
1516        }
1517
1518        let proposal_result = ProposalResult {
1519            proposal_id: proposal.id.clone(),
1520            results,
1521            cost,
1522        };
1523
1524        // Post-execution: auto-distill skills from this execution trace
1525        if self.auto_distill {
1526            if let Some(ref memgine) = self.memgine {
1527                // Convert results to TraceEvents for distillation
1528                let trace_events: Vec<car_memgine::TraceEvent> = proposal_result
1529                    .results
1530                    .iter()
1531                    .map(|r| {
1532                        let kind = match r.status {
1533                            ActionStatus::Succeeded => "action_succeeded",
1534                            ActionStatus::Failed => "action_failed",
1535                            ActionStatus::Rejected => "action_rejected",
1536                            ActionStatus::Skipped => "action_skipped",
1537                            _ => "unknown",
1538                        };
1539                        // Find the matching action to get the tool name
1540                        let tool = proposal
1541                            .actions
1542                            .iter()
1543                            .find(|a| a.id == r.action_id)
1544                            .and_then(|a| a.tool.clone());
1545                        let mut data = serde_json::Map::new();
1546                        if let Some(ref e) = r.error {
1547                            data.insert("error".into(), Value::from(e.as_str()));
1548                        }
1549                        if let Some(ref o) = r.output {
1550                            data.insert("output".into(), o.clone());
1551                        }
1552                        car_memgine::TraceEvent {
1553                            kind: kind.to_string(),
1554                            action_id: Some(r.action_id.clone()),
1555                            tool,
1556                            data: Value::Object(data),
1557                            duration_ms: r.duration_ms,
1558                            reward: match r.status {
1559                                ActionStatus::Succeeded => Some(1.0),
1560                                ActionStatus::Failed | ActionStatus::Rejected => Some(0.0),
1561                                _ => None,
1562                            },
1563                            ..Default::default()
1564                        }
1565                    })
1566                    .collect();
1567
1568                let mut engine = memgine.lock().await;
1569                let skills = engine.distill_skills(&trace_events).await;
1570                if !skills.is_empty() {
1571                    let count = skills.len();
1572                    // Validation-gated ingest (SkillOpt-inspired). Distilled
1573                    // skills are NOT trusted straight into the active pool; each
1574                    // enters as a PROVISIONAL candidate on trial and must prove
1575                    // itself (or beat its incumbent) before the promotion gate in
1576                    // consolidate() makes it Active. Tenant is threaded through so
1577                    // candidates are stamped for the calling tenant's namespace.
1578                    let tenant = scope.and_then(|s| s.tenant_id.as_deref());
1579                    let provisional = engine.ingest_provisional_candidates(&skills, tenant);
1580
1581                    // Log the distillation event
1582                    let mut log = self.log.lock().await;
1583                    log.append(
1584                        EventKind::SkillDistilled,
1585                        None,
1586                        Some(&proposal_result.proposal_id),
1587                        [
1588                            ("skills_count".to_string(), Value::from(count)),
1589                            (
1590                                "provisional_ingested".to_string(),
1591                                Value::from(provisional),
1592                            ),
1593                            (
1594                                "skill_names".to_string(),
1595                                Value::from(
1596                                    skills.iter().map(|s| s.name.as_str()).collect::<Vec<_>>(),
1597                                ),
1598                            ),
1599                        ]
1600                        .into(),
1601                    );
1602
1603                    // Check if any domains need evolution
1604                    let threshold = engine.evolution_threshold();
1605                    let domains = engine.domains_needing_evolution(threshold);
1606                    for domain in &domains {
1607                        // Collect failed events for this domain
1608                        let failed: Vec<car_memgine::TraceEvent> = trace_events
1609                            .iter()
1610                            .filter(|e| {
1611                                matches!(e.kind.as_str(), "action_failed" | "action_rejected")
1612                            })
1613                            .cloned()
1614                            .collect();
1615                        if !failed.is_empty() {
1616                            let evolved = engine.evolve_skills(&failed, domain).await;
1617                            if !evolved.is_empty() {
1618                                log.append(
1619                                    EventKind::EvolutionTriggered,
1620                                    None,
1621                                    Some(&proposal_result.proposal_id),
1622                                    [
1623                                        ("domain".to_string(), Value::from(domain.as_str())),
1624                                        ("new_skills".to_string(), Value::from(evolved.len())),
1625                                    ]
1626                                    .into(),
1627                                );
1628                            }
1629                        }
1630                    }
1631                }
1632            }
1633        }
1634
1635        (proposal_result, state_before_map)
1636    }
1637
1638    /// Process a single action: idempotency → validate → policy → execute.
1639    /// Returns (ActionResult, retries_count).
1640    async fn process_action(
1641        &self,
1642        action: &Action,
1643        proposal_id: &str,
1644        trace_id: &str,
1645        parent_span_id: &str,
1646        session_id: Option<&str>,
1647        scope: Option<&crate::scope::RuntimeScope>,
1648    ) -> (ActionResult, u32) {
1649        // Derive action type name for span naming
1650        let action_type_name = serde_json::to_string(&action.action_type)
1651            .unwrap_or_default()
1652            .trim_matches('"')
1653            .to_string();
1654        let span_name = format!("action.{}", action_type_name);
1655
1656        // Begin child span for this action
1657        let action_span_id = {
1658            let mut attrs: HashMap<String, Value> = HashMap::new();
1659            attrs.insert("action_id".to_string(), Value::from(action.id.as_str()));
1660            if let Some(ref tool) = action.tool {
1661                attrs.insert("tool".to_string(), Value::from(tool.as_str()));
1662            }
1663            let mut log = self.log.lock().await;
1664            log.begin_span(&span_name, trace_id, Some(parent_span_id), attrs)
1665        };
1666
1667        // Execute the action pipeline and capture result
1668        let (result, retries) = self
1669            .process_action_inner(action, proposal_id, session_id, scope)
1670            .await;
1671
1672        // End action span based on result status
1673        let span_status = match result.status {
1674            ActionStatus::Succeeded => SpanStatus::Ok,
1675            ActionStatus::Failed | ActionStatus::Rejected => SpanStatus::Error,
1676            _ => SpanStatus::Unset,
1677        };
1678        {
1679            let mut log = self.log.lock().await;
1680            log.end_span(&action_span_id, span_status);
1681        }
1682
1683        (result, retries)
1684    }
1685
1686    /// Inner action processing: idempotency -> validate -> policy -> execute.
1687    /// Returns (ActionResult, retries_count).
1688    #[instrument(
1689        name = "action.process",
1690        skip_all,
1691        fields(
1692            action_id = %action.id,
1693            action_type = ?action.action_type,
1694            tool = action.tool.as_deref().unwrap_or("none"),
1695        )
1696    )]
1697    async fn process_action_inner(
1698        &self,
1699        action: &Action,
1700        proposal_id: &str,
1701        session_id: Option<&str>,
1702        scope: Option<&crate::scope::RuntimeScope>,
1703    ) -> (ActionResult, u32) {
1704        // Idempotency check
1705        if action.idempotent {
1706            let key = idempotency_key(action);
1707            let cache = self.idempotency_cache.lock().await;
1708            if let Some(cached) = cache.get(&key) {
1709                let mut log = self.log.lock().await;
1710                log.append(
1711                    EventKind::ActionDeduplicated,
1712                    Some(&action.id),
1713                    Some(proposal_id),
1714                    [(
1715                        "cached_action_id".to_string(),
1716                        Value::from(cached.action_id.as_str()),
1717                    )]
1718                    .into(),
1719                );
1720                return (
1721                    ActionResult {
1722                        action_id: action.id.clone(),
1723                        status: cached.status.clone(),
1724                        output: cached.output.clone(),
1725                        error: cached.error.clone(),
1726                        state_changes: cached.state_changes.clone(),
1727                        duration_ms: Some(0.0),
1728                        timestamp: chrono::Utc::now(),
1729                    },
1730                    0,
1731                );
1732            }
1733        }
1734
1735        // Capability check
1736        {
1737            let caps = self.capabilities.read().await;
1738            if let Some(ref cap) = *caps {
1739                // Check tool capability for ToolCall actions
1740                if action.action_type == ActionType::ToolCall {
1741                    if let Some(ref tool_name) = action.tool {
1742                        if !cap.tool_allowed(tool_name) {
1743                            let mut log = self.log.lock().await;
1744                            log.append(
1745                                EventKind::ActionRejected,
1746                                Some(&action.id),
1747                                Some(proposal_id),
1748                                HashMap::new(),
1749                            );
1750                            return (
1751                                rejected_result(
1752                                    &action.id,
1753                                    format!("capability denied: tool '{}' not allowed", tool_name),
1754                                ),
1755                                0,
1756                            );
1757                        }
1758                    }
1759                }
1760
1761                // Check state key capability for StateWrite/StateRead actions
1762                if action.action_type == ActionType::StateWrite
1763                    || action.action_type == ActionType::StateRead
1764                {
1765                    if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
1766                        if !cap.state_key_allowed(key) {
1767                            let mut log = self.log.lock().await;
1768                            log.append(
1769                                EventKind::ActionRejected,
1770                                Some(&action.id),
1771                                Some(proposal_id),
1772                                HashMap::new(),
1773                            );
1774                            return (
1775                                rejected_result(
1776                                    &action.id,
1777                                    format!("capability denied: state key '{}' not allowed", key),
1778                                ),
1779                                0,
1780                            );
1781                        }
1782                    }
1783                }
1784            }
1785        }
1786
1787        // Validate
1788        let tools = self.tools.read().await;
1789        let validation = validate_action(action, &self.state, &tools);
1790        drop(tools);
1791
1792        if !validation.valid() {
1793            let error = validation
1794                .errors
1795                .iter()
1796                .map(|e| e.reason.as_str())
1797                .collect::<Vec<_>>()
1798                .join("; ");
1799            let mut log = self.log.lock().await;
1800            log.append(
1801                EventKind::ActionRejected,
1802                Some(&action.id),
1803                Some(proposal_id),
1804                HashMap::new(),
1805            );
1806            return (rejected_result(&action.id, error), 0);
1807        }
1808
1809        // Policy check — global registry plus, when the proposal is
1810        // executed under a session, that session's registry. Both
1811        // layers must pass for the action to proceed; session
1812        // policies are additive deny rules — they can deny what
1813        // global allows but cannot allow what global denies.
1814        {
1815            let mut violations = {
1816                let policies = self.policies.read().await;
1817                policies.check(action, &self.state)
1818            };
1819            if let Some(sid) = session_id {
1820                // Snapshot the per-session engine handle out from under
1821                // the outer registry lock so the inner check holds
1822                // only the engine's own RwLock — preserves the
1823                // documented lock-ordering discipline.
1824                let session_engine = {
1825                    let sessions = self.session_policies.read().await;
1826                    sessions.get(sid).cloned()
1827                };
1828                if let Some(engine) = session_engine {
1829                    let engine = engine.read().await;
1830                    violations.extend(engine.check(action, &self.state));
1831                } else {
1832                    // Unknown session id — refuse the action rather
1833                    // than silently fall back to global-only. A
1834                    // proposal submitted under a closed session
1835                    // shouldn't run with looser rules than the caller
1836                    // intended.
1837                    let mut log = self.log.lock().await;
1838                    log.append(
1839                        EventKind::PolicyViolation,
1840                        Some(&action.id),
1841                        Some(proposal_id),
1842                        HashMap::new(),
1843                    );
1844                    return (
1845                        rejected_result(
1846                            &action.id,
1847                            format!(
1848                                "unknown session id '{sid}' — open one via Runtime::open_session before executing under a session"
1849                            ),
1850                        ),
1851                        0,
1852                    );
1853                }
1854            }
1855            if !violations.is_empty() {
1856                let error = violations
1857                    .iter()
1858                    .map(|v| format!("policy '{}': {}", v.policy_name, v.reason))
1859                    .collect::<Vec<_>>()
1860                    .join("; ");
1861                let mut log = self.log.lock().await;
1862                log.append(
1863                    EventKind::PolicyViolation,
1864                    Some(&action.id),
1865                    Some(proposal_id),
1866                    HashMap::new(),
1867                );
1868                return (rejected_result(&action.id, error), 0);
1869            }
1870        }
1871
1872        // Validated
1873        {
1874            let mut log = self.log.lock().await;
1875            log.append(
1876                EventKind::ActionValidated,
1877                Some(&action.id),
1878                Some(proposal_id),
1879                HashMap::new(),
1880            );
1881        }
1882
1883        // Execute with retry
1884        let (result, retries) = self.execute_with_retry(action, proposal_id, scope).await;
1885
1886        // Cache idempotent results
1887        if action.idempotent && result.status == ActionStatus::Succeeded {
1888            let mut cache = self.idempotency_cache.lock().await;
1889            cache.insert(idempotency_key(action), result.clone());
1890        }
1891
1892        tracing::info!(
1893            status = ?result.status,
1894            duration_ms = result.duration_ms,
1895            "action completed"
1896        );
1897
1898        (result, retries)
1899    }
1900
1901    /// Execute with retry logic and timeout.
1902    /// Returns (ActionResult, retries_count).
1903    async fn execute_with_retry(
1904        &self,
1905        action: &Action,
1906        proposal_id: &str,
1907        scope: Option<&crate::scope::RuntimeScope>,
1908    ) -> (ActionResult, u32) {
1909        let max_attempts = if action.failure_behavior == FailureBehavior::Retry {
1910            action.max_retries + 1
1911        } else {
1912            1
1913        };
1914
1915        let mut last_error: Option<String> = None;
1916        let mut retries: u32 = 0;
1917
1918        for attempt in 0..max_attempts {
1919            if attempt > 0 {
1920                retries += 1;
1921                let delay = RETRY_BASE_DELAY_MS * RETRY_BACKOFF_FACTOR.pow(attempt as u32 - 1);
1922                tokio::time::sleep(Duration::from_millis(delay)).await;
1923                let mut log = self.log.lock().await;
1924                log.append(
1925                    EventKind::ActionRetrying,
1926                    Some(&action.id),
1927                    Some(proposal_id),
1928                    [("attempt".to_string(), Value::from(attempt + 1))].into(),
1929                );
1930            }
1931
1932            {
1933                let mut log = self.log.lock().await;
1934                log.append(
1935                    EventKind::ActionExecuting,
1936                    Some(&action.id),
1937                    Some(proposal_id),
1938                    HashMap::new(),
1939                );
1940            }
1941
1942            let start = std::time::Instant::now();
1943            let transitions_before = self.state.transition_count();
1944
1945            // Execute with optional timeout
1946            let exec_result = if let Some(timeout_ms) = action.timeout_ms {
1947                match timeout(
1948                    Duration::from_millis(timeout_ms),
1949                    self.dispatch(action, scope),
1950                )
1951                .await
1952                {
1953                    Ok(r) => r,
1954                    Err(_) => Err(format!("action timed out after {}ms", timeout_ms)),
1955                }
1956            } else {
1957                self.dispatch(action, scope).await
1958            };
1959
1960            let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
1961
1962            match exec_result {
1963                Ok(output) => {
1964                    // Commit post-effects. Tenant-scoped when the
1965                    // proposal carries a scope (Parslee-ai/car#187
1966                    // phase 3) — distinct tenants write to disjoint
1967                    // namespaces so concurrent multi-tenant proposals
1968                    // don't trample each other's keys.
1969                    let tenant_for_effects = scope.and_then(|s| s.tenant_id.as_deref());
1970                    let scoped_state = self.state.scoped(tenant_for_effects);
1971                    let mut state_changes: HashMap<String, Value> = HashMap::new();
1972                    for (key, value) in &action.expected_effects {
1973                        scoped_state.set(key, value.clone(), &action.id);
1974                        state_changes.insert(key.clone(), value.clone());
1975                    }
1976
1977                    // Capture state changes from dispatch
1978                    for t in self.state.transitions_since(transitions_before) {
1979                        if !state_changes.contains_key(&t.key) {
1980                            if let Some(v) = t.new_value {
1981                                state_changes.insert(t.key.clone(), v);
1982                            }
1983                        }
1984                    }
1985
1986                    let mut log = self.log.lock().await;
1987                    log.append(
1988                        EventKind::ActionSucceeded,
1989                        Some(&action.id),
1990                        Some(proposal_id),
1991                        [("duration_ms".to_string(), Value::from(duration_ms))].into(),
1992                    );
1993
1994                    if !state_changes.is_empty() {
1995                        log.append(
1996                            EventKind::StateChanged,
1997                            Some(&action.id),
1998                            Some(proposal_id),
1999                            [(
2000                                "changes".to_string(),
2001                                serde_json::to_value(&state_changes).unwrap_or_default(),
2002                            )]
2003                            .into(),
2004                        );
2005                    }
2006
2007                    return (
2008                        ActionResult {
2009                            action_id: action.id.clone(),
2010                            status: ActionStatus::Succeeded,
2011                            output: Some(output),
2012                            error: None,
2013                            state_changes,
2014                            duration_ms: Some(duration_ms),
2015                            timestamp: chrono::Utc::now(),
2016                        },
2017                        retries,
2018                    );
2019                }
2020                Err(e) => {
2021                    last_error = Some(e.clone());
2022                    let mut log = self.log.lock().await;
2023                    log.append(
2024                        EventKind::ActionFailed,
2025                        Some(&action.id),
2026                        Some(proposal_id),
2027                        [
2028                            ("error".to_string(), Value::from(e.as_str())),
2029                            ("attempt".to_string(), Value::from(attempt + 1)),
2030                        ]
2031                        .into(),
2032                    );
2033                }
2034            }
2035        }
2036
2037        // All attempts exhausted
2038        if action.failure_behavior == FailureBehavior::Skip {
2039            return (
2040                skipped_result(
2041                    &action.id,
2042                    last_error.as_deref().unwrap_or("all attempts exhausted"),
2043                ),
2044                retries,
2045            );
2046        }
2047
2048        (
2049            ActionResult {
2050                action_id: action.id.clone(),
2051                status: ActionStatus::Failed,
2052                output: None,
2053                error: last_error,
2054                state_changes: HashMap::new(),
2055                duration_ms: None,
2056                timestamp: chrono::Utc::now(),
2057            },
2058            retries,
2059        )
2060    }
2061
2062    /// Dispatch an action to the appropriate handler.
2063    ///
2064    /// `scope` is the per-execution caller / tenant surface
2065    /// (Parslee-ai/car#187 phase 3). When the scope carries a
2066    /// tenant id, state R/W operations (`StateWrite`, `StateRead`,
2067    /// `Assertion`) route through `StateStore::scoped(tenant_id)`
2068    /// so distinct tenants can't see each other's keys. Unscoped
2069    /// proposals get the legacy flat-namespace behaviour
2070    /// automatically.
2071    async fn dispatch(
2072        &self,
2073        action: &Action,
2074        scope: Option<&crate::scope::RuntimeScope>,
2075    ) -> Result<Value, String> {
2076        match action.action_type {
2077            ActionType::ToolCall => {
2078                let tool_name = action.tool.as_deref().ok_or("tool_call has no tool")?;
2079                let params = Value::Object(
2080                    action
2081                        .parameters
2082                        .iter()
2083                        .map(|(k, v)| (k.clone(), v.clone()))
2084                        .collect(),
2085                );
2086
2087                // Check cross-proposal result cache.
2088                if let Some(cached) = self.result_cache.get(tool_name, &params).await {
2089                    return Ok(cached);
2090                }
2091
2092                // Apply rate limit backpressure before executing.
2093                self.rate_limiter.acquire(tool_name).await;
2094
2095                // Try built-in inference tools first when the inference engine is available.
2096                if matches!(
2097                    tool_name,
2098                    "infer" | "infer.grounded" | "embed" | "classify" | "transcribe" | "synthesize"
2099                ) {
2100                    if let Some(ref engine) = self.inference_engine {
2101                        // For "infer.grounded" or "infer" with memgine available,
2102                        // build context from memory and attach it to the request.
2103                        let params = {
2104                            let should_ground =
2105                                tool_name == "infer.grounded" || tool_name == "infer";
2106                            if should_ground {
2107                                if let Some(ref memgine) = self.memgine {
2108                                    if let Some(prompt) =
2109                                        params.get("prompt").and_then(|v| v.as_str())
2110                                    {
2111                                        let ctx = {
2112                                            let mut m = memgine.lock().await;
2113                                            m.build_context(prompt)
2114                                        };
2115                                        if !ctx.is_empty() {
2116                                            let mut p = params.clone();
2117                                            if let Some(obj) = p.as_object_mut() {
2118                                                obj.insert("context".to_string(), Value::from(ctx));
2119                                            }
2120                                            p
2121                                        } else {
2122                                            params
2123                                        }
2124                                    } else {
2125                                        params
2126                                    }
2127                                } else {
2128                                    params
2129                                }
2130                            } else {
2131                                params
2132                            }
2133                        };
2134
2135                        // Route "infer.grounded" to "infer" for the service layer
2136                        let effective_tool = if tool_name == "infer.grounded" {
2137                            "infer"
2138                        } else {
2139                            tool_name
2140                        };
2141                        let result =
2142                            car_inference::service::execute_tool(engine, effective_tool, &params)
2143                                .await
2144                                .map_err(|e| e.to_string());
2145
2146                        if let Ok(ref value) = result {
2147                            self.result_cache
2148                                .put(tool_name, &params, value.clone())
2149                                .await;
2150                        }
2151
2152                        return result;
2153                    }
2154                }
2155
2156                // Built-in memory consolidation tool.
2157                if tool_name == "memory.consolidate" {
2158                    if let Some(ref memgine) = self.memgine {
2159                        let report = {
2160                            let mut m = memgine.lock().await;
2161                            m.consolidate().await
2162                        };
2163                        // Log the consolidation event
2164                        {
2165                            let mut log = self.log.lock().await;
2166                            log.append(
2167                                EventKind::Consolidated,
2168                                None,
2169                                None,
2170                                [
2171                                    (
2172                                        "expired_pruned".to_string(),
2173                                        Value::from(report.expired_pruned),
2174                                    ),
2175                                    (
2176                                        "superseded_gc".to_string(),
2177                                        Value::from(report.superseded_gc),
2178                                    ),
2179                                    (
2180                                        "stale_embeddings_removed".to_string(),
2181                                        Value::from(report.stale_embeddings_removed),
2182                                    ),
2183                                    (
2184                                        "nodes_embedded".to_string(),
2185                                        Value::from(report.nodes_embedded),
2186                                    ),
2187                                    (
2188                                        "domains_evolved".to_string(),
2189                                        Value::from(report.domains_evolved.clone()),
2190                                    ),
2191                                    ("total_nodes".to_string(), Value::from(report.total_nodes)),
2192                                    ("total_edges".to_string(), Value::from(report.total_edges)),
2193                                ]
2194                                .into(),
2195                            );
2196                            // Per-candidate gate telemetry (SkillOpt-inspired):
2197                            // one event per promotion/rejection so the skill
2198                            // bank's evolution is auditable.
2199                            for key in &report.candidates_promoted {
2200                                log.append(
2201                                    EventKind::CandidatePromoted,
2202                                    None,
2203                                    None,
2204                                    [("candidate".to_string(), Value::from(key.as_str()))].into(),
2205                                );
2206                            }
2207                            for key in &report.candidates_rejected {
2208                                log.append(
2209                                    EventKind::CandidateRejected,
2210                                    None,
2211                                    None,
2212                                    [("candidate".to_string(), Value::from(key.as_str()))].into(),
2213                                );
2214                            }
2215                        }
2216                        return Ok(serde_json::to_value(&report).unwrap_or(Value::Null));
2217                    } else {
2218                        return Err(
2219                            "memory.consolidate requires memgine (attach with with_learning)"
2220                                .into(),
2221                        );
2222                    }
2223                }
2224
2225                // Prefer a configured tool_executor for any tool it claims to handle.
2226                // Fall through to agent_basics only when the configured executor is absent
2227                // or explicitly returns "unknown tool" — this prevents agent_basics' built-in
2228                // read_file/write_file (which resolve paths via std::env::current_dir) from
2229                // silently overriding an executor that carries its own working_dir.
2230                let configured = {
2231                    let guard = self.tool_executor.lock().await;
2232                    guard.as_ref().cloned()
2233                };
2234
2235                if let Some(ref executor) = configured {
2236                    let result = executor
2237                        .execute_with_action(tool_name, &params, &action.id, action.timeout_ms)
2238                        .await;
2239                    let fall_through = matches!(&result, Err(e) if e.starts_with("unknown tool"));
2240                    if !fall_through {
2241                        if let Ok(ref value) = result {
2242                            self.result_cache
2243                                .put(tool_name, &params, value.clone())
2244                                .await;
2245                        }
2246                        return result;
2247                    }
2248                }
2249
2250                if let Some(result) = crate::agent_basics::execute(tool_name, &params).await {
2251                    if let Ok(ref value) = result {
2252                        self.result_cache
2253                            .put(tool_name, &params, value.clone())
2254                            .await;
2255                    }
2256                    return result;
2257                }
2258
2259                Err(format!("no handler for tool '{}'", tool_name))
2260            }
2261            ActionType::StateWrite => {
2262                let key = action
2263                    .parameters
2264                    .get("key")
2265                    .and_then(|v| v.as_str())
2266                    .ok_or("state_write requires 'key' parameter")?;
2267                let value = action
2268                    .parameters
2269                    .get("value")
2270                    .cloned()
2271                    .unwrap_or(Value::Null);
2272                let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2273                self.state.scoped(tenant).set(key, value, &action.id);
2274                Ok(Value::from(format!("written: {}", key)))
2275            }
2276            ActionType::StateRead => {
2277                let key = action
2278                    .parameters
2279                    .get("key")
2280                    .and_then(|v| v.as_str())
2281                    .ok_or("state_read requires 'key' parameter")?;
2282                let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2283                Ok(self.state.scoped(tenant).get(key).unwrap_or(Value::Null))
2284            }
2285            ActionType::Assertion => {
2286                let key = action
2287                    .parameters
2288                    .get("key")
2289                    .and_then(|v| v.as_str())
2290                    .ok_or("assertion requires 'key' parameter")?;
2291                let expected = action
2292                    .parameters
2293                    .get("expected")
2294                    .cloned()
2295                    .unwrap_or(Value::Null);
2296                let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2297                let actual = self.state.scoped(tenant).get(key).unwrap_or(Value::Null);
2298                if actual != expected {
2299                    Err(format!(
2300                        "assertion failed: state['{}'] = {:?}, expected {:?}",
2301                        key, actual, expected
2302                    ))
2303                } else {
2304                    Ok(serde_json::json!({"asserted": key, "value": actual}))
2305                }
2306            }
2307        }
2308    }
2309
2310    // --- Checkpoint and resume ---
2311
2312    /// Save a checkpoint of the current runtime state.
2313    pub async fn save_checkpoint(&self) -> Checkpoint {
2314        let state = self.state.snapshot();
2315        let tools: Vec<String> = self.tools.read().await.keys().cloned().collect();
2316        let log = self.log.lock().await;
2317        let events: Vec<Value> = log
2318            .events()
2319            .iter()
2320            .map(|e| serde_json::to_value(e).unwrap_or_default())
2321            .collect();
2322
2323        Checkpoint {
2324            checkpoint_id: Uuid::new_v4().to_string(),
2325            created_at: chrono::Utc::now(),
2326            state,
2327            events,
2328            tools,
2329            metadata: HashMap::new(),
2330        }
2331    }
2332
2333    /// Save checkpoint to a JSON file.
2334    pub async fn save_checkpoint_to_file(&self, path: &str) -> Result<(), String> {
2335        let checkpoint = self.save_checkpoint().await;
2336        let json = serde_json::to_string_pretty(&checkpoint)
2337            .map_err(|e| format!("serialize error: {}", e))?;
2338        tokio::fs::write(path, json)
2339            .await
2340            .map_err(|e| format!("write error: {}", e))?;
2341        Ok(())
2342    }
2343
2344    /// Load a checkpoint from a JSON file and restore state.
2345    pub async fn load_checkpoint_from_file(&self, path: &str) -> Result<Checkpoint, String> {
2346        let json = tokio::fs::read_to_string(path)
2347            .await
2348            .map_err(|e| format!("read error: {}", e))?;
2349        let checkpoint: Checkpoint =
2350            serde_json::from_str(&json).map_err(|e| format!("deserialize error: {}", e))?;
2351        self.restore_checkpoint(&checkpoint).await;
2352        Ok(checkpoint)
2353    }
2354
2355    /// Restore runtime state from a checkpoint.
2356    pub async fn restore_checkpoint(&self, checkpoint: &Checkpoint) {
2357        // Replace state completely — don't merge, don't create synthetic transitions
2358        self.state.replace_all(checkpoint.state.clone());
2359        // Clear idempotency cache — stale results from pre-checkpoint execution
2360        // must not bypass validation/policy on the restored state
2361        self.idempotency_cache.lock().await.clear();
2362        // Restore tools (as name-only schemas; full schemas are not persisted in checkpoint)
2363        let mut tools = self.tools.write().await;
2364        tools.clear();
2365        for tool_name in &checkpoint.tools {
2366            let schema = ToolSchema {
2367                name: tool_name.clone(),
2368                description: String::new(),
2369                parameters: serde_json::Value::Object(Default::default()),
2370                returns: None,
2371                idempotent: false,
2372                cache_ttl_secs: None,
2373                rate_limit: None,
2374            };
2375            tools.insert(tool_name.clone(), schema);
2376        }
2377    }
2378
2379    /// Register a subprocess tool and set up the subprocess executor.
2380    /// If no executor exists, creates a new SubprocessToolExecutor.
2381    /// If one already exists, creates a new SubprocessToolExecutor with the
2382    /// existing executor as fallback.
2383    pub async fn register_subprocess_tool(
2384        &self,
2385        name: &str,
2386        tool: crate::subprocess::SubprocessTool,
2387    ) {
2388        use crate::subprocess::SubprocessToolExecutor;
2389
2390        let schema = ToolSchema {
2391            name: name.to_string(),
2392            description: format!("Subprocess tool: {}", tool.command),
2393            parameters: serde_json::Value::Object(Default::default()),
2394            returns: None,
2395            idempotent: false,
2396            cache_ttl_secs: None,
2397            rate_limit: None,
2398        };
2399        self.register_tool_schema(schema).await;
2400
2401        let mut guard = self.tool_executor.lock().await;
2402        let mut executor = match guard.take() {
2403            Some(existing) => {
2404                let mut sub = SubprocessToolExecutor::new();
2405                sub = sub.with_fallback(existing);
2406                sub
2407            }
2408            None => SubprocessToolExecutor::new(),
2409        };
2410        executor.register(name, tool);
2411        *guard = Some(std::sync::Arc::new(executor));
2412    }
2413}
2414
2415impl Default for Runtime {
2416    fn default() -> Self {
2417        Self::new()
2418    }
2419}