Skip to main content

car_engine/
executor.rs

1//! Core execution engine — propose → validate → execute → commit.
2
3use crate::cache::ResultCache;
4use crate::capabilities::CapabilitySet;
5use crate::checkpoint::Checkpoint;
6use crate::rate_limit::{RateLimit, RateLimiter};
7use car_eventlog::{EventKind, EventLog, SpanStatus};
8use car_ir::{
9    build_dag, Action, ActionProposal, ActionResult, ActionStatus, ActionType, CostSummary,
10    FailureBehavior, ProposalResult, ToolSchema,
11};
12use car_policy::PolicyEngine;
13use car_state::StateStore;
14use car_validator::validate_action;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
20use tokio::time::timeout;
21use tracing::instrument;
22use uuid::Uuid;
23
24/// Retry backoff constants.
25const RETRY_BASE_DELAY_MS: u64 = 100;
26const RETRY_BACKOFF_FACTOR: u64 = 2;
27
28// ---------------------------------------------------------------------------
29// Replan types — failure recovery via model callback
30// ---------------------------------------------------------------------------
31
32/// Callback trait for replanning failed proposals.
33/// Implement this to let the runtime ask the model for an alternative plan
34/// when a proposal aborts.
35#[async_trait::async_trait]
36pub trait ReplanCallback: Send + Sync {
37    async fn replan(&self, ctx: &ReplanContext) -> Result<ActionProposal, String>;
38}
39
40/// Context provided to the replan callback so the model can generate an alternative.
41#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42pub struct ReplanContext {
43    /// Original proposal ID.
44    pub proposal_id: String,
45    /// Which replan attempt this is (1-indexed; attempt 1 = first replan after initial failure).
46    pub attempt: u32,
47    /// Actions that failed and caused the abort.
48    pub failed_actions: Vec<FailedActionSummary>,
49    /// Action IDs that succeeded before the abort (now rolled back).
50    pub completed_action_ids: Vec<String>,
51    /// State snapshot after rollback.
52    pub state_snapshot: HashMap<String, Value>,
53    /// How many replans remain.
54    pub replans_remaining: u32,
55    /// Original proposal source (model name, agent, etc.).
56    pub original_source: String,
57    /// Total actions in the original proposal.
58    pub original_action_count: usize,
59    /// The original goal/context from the proposal (for generating alternatives).
60    pub original_context: HashMap<String, Value>,
61}
62
63/// Summary of a failed action, included in ReplanContext.
64#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
65pub struct FailedActionSummary {
66    pub action_id: String,
67    pub tool: Option<String>,
68    pub error: String,
69    pub parameters: HashMap<String, Value>,
70}
71
72/// Configuration for the replan loop.
73#[derive(Debug, Clone)]
74pub struct ReplanConfig {
75    /// Maximum number of replan attempts. 0 = disabled (default).
76    pub max_replans: u32,
77    /// Delay in milliseconds between replan attempts. Prevents burning through
78    /// attempts instantly if the model returns garbage fast. 0 = no delay.
79    pub delay_ms: u64,
80    /// If true, replan proposals are scored via car-planner's verify() before
81    /// execution. Proposals with errors are rejected without executing.
82    /// Prevents the engine from running a worse plan than the one that failed.
83    pub verify_before_execute: bool,
84}
85
86impl Default for ReplanConfig {
87    fn default() -> Self {
88        Self {
89            max_replans: 0,
90            delay_ms: 0,
91            verify_before_execute: true,
92        }
93    }
94}
95
96/// Trait for tool execution. Implement this to provide tools to the runtime.
97///
98/// In-process: implement directly with function calls.
99/// Daemon mode: implement by sending JSON-RPC to the client.
100#[async_trait::async_trait]
101pub trait ToolExecutor: Send + Sync {
102    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String>;
103}
104
105/// Deterministic key for idempotency deduplication.
106fn idempotency_key(action: &Action) -> String {
107    let sorted: std::collections::BTreeMap<_, _> = action.parameters.iter().collect();
108    let params = serde_json::to_string(&sorted).unwrap_or_default();
109    format!(
110        "{}:{}:{}",
111        serde_json::to_string(&action.action_type).unwrap_or_default(),
112        action.tool.as_deref().unwrap_or(""),
113        params
114    )
115}
116
117fn rejected_result(action_id: &str, error: String) -> ActionResult {
118    ActionResult {
119        action_id: action_id.to_string(),
120        status: ActionStatus::Rejected,
121        output: None,
122        error: Some(error),
123        state_changes: HashMap::new(),
124        duration_ms: None,
125        timestamp: chrono::Utc::now(),
126    }
127}
128
129/// Capture only the state keys relevant to an action (state_dependencies + expected_effects).
130/// Returns an empty map if the action declares no relevant keys (e.g., tool calls
131/// that don't interact with state).
132fn snapshot_relevant_keys(
133    state: &car_state::StateStore,
134    action: &Action,
135) -> HashMap<String, Value> {
136    let mut keys: std::collections::HashSet<&str> = std::collections::HashSet::new();
137    for dep in &action.state_dependencies {
138        keys.insert(dep.as_str());
139    }
140    for key in action.expected_effects.keys() {
141        keys.insert(key.as_str());
142    }
143    // For state_write actions, capture the key being written
144    if action.action_type == ActionType::StateWrite {
145        if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
146            keys.insert(key);
147        }
148    }
149
150    if keys.is_empty() {
151        // No declared state interaction — skip snapshot (empty map)
152        return HashMap::new();
153    }
154
155    keys.iter()
156        .filter_map(|&k| state.get(k).map(|v| (k.to_string(), v)))
157        .collect()
158}
159
160fn skipped_result(action_id: &str, reason: &str) -> ActionResult {
161    ActionResult {
162        action_id: action_id.to_string(),
163        status: ActionStatus::Skipped,
164        output: None,
165        error: Some(reason.to_string()),
166        state_changes: HashMap::new(),
167        duration_ms: None,
168        timestamp: chrono::Utc::now(),
169    }
170}
171
172/// Prefix on the `error` field of an `ActionResult` that distinguishes
173/// "the user pulled the plug" from "earlier abort cascaded." The
174/// `ActionStatus` itself is `Skipped` in both cases (introducing a
175/// new variant ripples through every IR consumer + FFI binding); the
176/// prefix lets callers like the A2A bridge tell the cases apart
177/// without string-matching a magic literal.
178pub const CANCELED_PREFIX: &str = "canceled: ";
179
180/// Result for an action that didn't run because the proposal was
181/// cancelled mid-flight. Distinct from `Skipped` so callers can tell
182/// "didn't run because of an earlier abort" from "didn't run because
183/// the user pulled the plug."
184fn canceled_result(action_id: &str, reason: &str) -> ActionResult {
185    ActionResult {
186        action_id: action_id.to_string(),
187        status: ActionStatus::Skipped,
188        output: None,
189        error: Some(format!("{}{}", CANCELED_PREFIX, reason)),
190        state_changes: HashMap::new(),
191        duration_ms: None,
192        timestamp: chrono::Utc::now(),
193    }
194}
195
196/// Format a tool result for feeding back to a model.
197pub fn format_tool_result(result: &ActionResult) -> String {
198    match result.status {
199        ActionStatus::Succeeded => match &result.output {
200            Some(v) => serde_json::to_string(v).unwrap_or_else(|_| v.to_string()),
201            None => String::new(),
202        },
203        ActionStatus::Rejected => format!("[REJECTED] {}", result.error.as_deref().unwrap_or("")),
204        ActionStatus::Failed => format!("[FAILED] {}", result.error.as_deref().unwrap_or("")),
205        _ => format!(
206            "[{:?}] {}",
207            result.status,
208            result.error.as_deref().unwrap_or("")
209        ),
210    }
211}
212
213/// Budget constraints for proposal execution.
214#[derive(Debug, Clone)]
215pub struct CostBudget {
216    pub max_tool_calls: Option<u32>,
217    pub max_duration_ms: Option<f64>,
218    pub max_actions: Option<u32>,
219}
220
221/// Common Agent Runtime — deterministic execution layer.
222///
223/// Lock ordering discipline (never hold multiple simultaneously, never hold sync locks across .await):
224/// 1. capabilities (RwLock, read-only during execution)
225/// 2. tools (RwLock, read-only during execution)
226/// 3. policies (RwLock, read-only during execution)
227/// 4. cost_budget (RwLock, read-only during execution)
228/// 5. log (TokioMutex, acquired/released per event)
229/// 6. tool_executor (TokioMutex, clone Arc and drop before await)
230/// 7. idempotency_cache (TokioMutex, acquired/released per check)
231///
232/// StateStore uses parking_lot::Mutex (sync) — NEVER hold across .await points.
233pub struct Runtime {
234    pub state: Arc<StateStore>,
235    pub tools: Arc<TokioRwLock<HashMap<String, ToolSchema>>>,
236    pub policies: Arc<TokioRwLock<PolicyEngine>>,
237    pub log: Arc<TokioMutex<EventLog>>,
238    pub rate_limiter: Arc<RateLimiter>,
239    pub result_cache: Arc<ResultCache>,
240    tool_executor: TokioMutex<Option<Arc<dyn ToolExecutor>>>,
241    idempotency_cache: TokioMutex<HashMap<String, ActionResult>>,
242    cost_budget: TokioRwLock<Option<CostBudget>>,
243    capabilities: TokioRwLock<Option<CapabilitySet>>,
244    inference_engine: Option<Arc<car_inference::InferenceEngine>>,
245    /// Optional memgine for skill learning (auto-distillation after execution).
246    memgine: Option<Arc<TokioMutex<car_memgine::MemgineEngine>>>,
247    /// Whether to auto-distill skills after each proposal execution.
248    auto_distill: bool,
249    /// Optional trajectory store for persisting execution traces.
250    trajectory_store: Option<Arc<car_memgine::TrajectoryStore>>,
251    /// Optional replan callback for failure recovery.
252    replan_callback: TokioMutex<Option<Arc<dyn ReplanCallback>>>,
253    /// Replan configuration.
254    replan_config: TokioRwLock<ReplanConfig>,
255    /// Canonical tool registry (optional — new code should use this).
256    pub registry: Arc<crate::registry::ToolRegistry>,
257}
258
259impl Runtime {
260    pub fn new() -> Self {
261        Self {
262            state: Arc::new(StateStore::new()),
263            tools: Arc::new(TokioRwLock::new(HashMap::new())),
264            policies: Arc::new(TokioRwLock::new(PolicyEngine::new())),
265            log: Arc::new(TokioMutex::new(EventLog::new())),
266            rate_limiter: Arc::new(RateLimiter::new()),
267            result_cache: Arc::new(ResultCache::new()),
268            tool_executor: TokioMutex::new(None),
269            idempotency_cache: TokioMutex::new(HashMap::new()),
270            cost_budget: TokioRwLock::new(None),
271            capabilities: TokioRwLock::new(None),
272            inference_engine: None,
273            memgine: None,
274            auto_distill: false,
275            trajectory_store: None,
276            replan_callback: TokioMutex::new(None),
277            replan_config: TokioRwLock::new(ReplanConfig::default()),
278            registry: Arc::new(crate::registry::ToolRegistry::new()),
279        }
280    }
281
282    /// Create a runtime with shared state, event log, and policies.
283    /// Each runtime gets its own tool set, executor, and idempotency cache.
284    pub fn with_shared(
285        state: Arc<StateStore>,
286        log: Arc<TokioMutex<EventLog>>,
287        policies: Arc<TokioRwLock<PolicyEngine>>,
288    ) -> Self {
289        Self {
290            state,
291            tools: Arc::new(TokioRwLock::new(HashMap::new())),
292            policies,
293            log,
294            rate_limiter: Arc::new(RateLimiter::new()),
295            result_cache: Arc::new(ResultCache::new()),
296            tool_executor: TokioMutex::new(None),
297            idempotency_cache: TokioMutex::new(HashMap::new()),
298            cost_budget: TokioRwLock::new(None),
299            capabilities: TokioRwLock::new(None),
300            inference_engine: None,
301            memgine: None,
302            auto_distill: false,
303            trajectory_store: None,
304            replan_callback: TokioMutex::new(None),
305            replan_config: TokioRwLock::new(ReplanConfig::default()),
306            registry: Arc::new(crate::registry::ToolRegistry::new()),
307        }
308    }
309
310    /// Attach a local inference engine. Registers `infer`, `embed`, `classify`
311    /// as built-in tools with real implementations.
312    pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
313        self.inference_engine = Some(engine);
314        // Register inference tool schemas (non-async init, use try_lock)
315        if let Ok(mut tools) = self.tools.try_write() {
316            for schema in car_inference::service::all_schemas() {
317                tools.insert(schema.name.clone(), schema);
318            }
319        }
320        self
321    }
322
323    /// Attach a memgine for automatic skill learning after execution.
324    /// When `auto_distill` is true, execution traces are automatically distilled
325    /// into skills and domains are evolved when underperforming.
326    pub fn with_learning(
327        mut self,
328        memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>,
329        auto_distill: bool,
330    ) -> Self {
331        self.memgine = Some(memgine);
332        self.auto_distill = auto_distill;
333        self
334    }
335
336    /// Attach a memgine with auto-distillation enabled (recommended default).
337    pub fn with_memgine(self, memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>) -> Self {
338        self.with_learning(memgine, true)
339    }
340
341    /// Attach a trajectory store for persisting execution traces.
342    pub fn with_trajectory_store(mut self, store: Arc<car_memgine::TrajectoryStore>) -> Self {
343        self.trajectory_store = Some(store);
344        self
345    }
346
347    pub fn with_executor(self, executor: Arc<dyn ToolExecutor>) -> Self {
348        // Use try_lock for non-async init context. Safe because we just created the mutex.
349        if let Ok(mut guard) = self.tool_executor.try_lock() {
350            *guard = Some(executor);
351        }
352        self
353    }
354
355    /// Set a tool executor for the next execute() call.
356    /// Used by NAPI bindings where executor varies per call.
357    pub async fn set_executor(&self, executor: Arc<dyn ToolExecutor>) {
358        *self.tool_executor.lock().await = Some(executor);
359    }
360
361    pub fn with_event_log(mut self, log: EventLog) -> Self {
362        self.log = Arc::new(TokioMutex::new(log));
363        self
364    }
365
366    /// Attach a replan callback for failure recovery (builder).
367    pub fn with_replan(self, callback: Arc<dyn ReplanCallback>, config: ReplanConfig) -> Self {
368        if let Ok(mut guard) = self.replan_callback.try_lock() {
369            *guard = Some(callback);
370        }
371        if let Ok(mut guard) = self.replan_config.try_write() {
372            *guard = config;
373        }
374        self
375    }
376
377    /// Set a replan callback at runtime.
378    pub async fn set_replan_callback(&self, callback: Arc<dyn ReplanCallback>) {
379        *self.replan_callback.lock().await = Some(callback);
380    }
381
382    /// Set replan configuration at runtime.
383    pub async fn set_replan_config(&self, config: ReplanConfig) {
384        *self.replan_config.write().await = config;
385    }
386
387    /// Register a tool with just a name (backward compatible).
388    pub async fn register_tool(&self, name: &str) {
389        let schema = ToolSchema {
390            name: name.to_string(),
391            description: String::new(),
392            parameters: serde_json::Value::Object(Default::default()),
393            returns: None,
394            idempotent: false,
395            cache_ttl_secs: None,
396            rate_limit: None,
397        };
398        self.register_tool_schema(schema).await;
399    }
400
401    /// Register a tool with full schema.
402    pub async fn register_tool_schema(&self, schema: ToolSchema) {
403        // Auto-configure cache if schema specifies it
404        if let Some(ttl) = schema.cache_ttl_secs {
405            self.result_cache.enable_caching(&schema.name, ttl).await;
406        }
407        // Auto-configure rate limit if schema specifies it
408        if let Some(ref rl) = schema.rate_limit {
409            self.rate_limiter
410                .set_limit(
411                    &schema.name,
412                    RateLimit {
413                        max_calls: rl.max_calls,
414                        interval_secs: rl.interval_secs,
415                    },
416                )
417                .await;
418        }
419        self.tools.write().await.insert(schema.name.clone(), schema);
420    }
421
422    /// Register a tool via the canonical registry.
423    /// This is the preferred way to register tools — it updates both the
424    /// registry and the legacy tools HashMap for backward compatibility.
425    pub async fn register_tool_entry(&self, entry: crate::registry::ToolEntry) {
426        let schema = entry.schema.clone();
427        self.registry.register(entry).await;
428        self.register_tool_schema(schema).await;
429    }
430
431    /// Register CAR's built-in agent utility stdlib.
432    ///
433    /// This is an opt-in convenience layer for common local-file and text tools.
434    /// Existing runtimes remain unchanged until this is called.
435    pub async fn register_agent_basics(&self) {
436        for entry in crate::agent_basics::entries() {
437            self.register_tool_entry(entry).await;
438        }
439    }
440
441    /// Get all registered tool schemas (for model prompt generation).
442    pub async fn tool_schemas(&self) -> Vec<ToolSchema> {
443        self.tools.read().await.values().cloned().collect()
444    }
445
446    /// Set a cost budget that limits proposal execution.
447    pub async fn set_cost_budget(&self, budget: CostBudget) {
448        *self.cost_budget.write().await = Some(budget);
449    }
450
451    /// Set per-agent capability permissions that restrict tools, state keys, and action count.
452    pub async fn set_capabilities(&self, caps: CapabilitySet) {
453        *self.capabilities.write().await = Some(caps);
454    }
455
456    /// Set a per-tool rate limit (token bucket).
457    ///
458    /// `max_calls` tokens are available per `interval_secs` window.
459    /// When the bucket is empty, `dispatch()` applies backpressure by
460    /// waiting until a token refills.
461    pub async fn set_rate_limit(&self, tool: &str, max_calls: u32, interval_secs: f64) {
462        self.rate_limiter
463            .set_limit(
464                tool,
465                RateLimit {
466                    max_calls,
467                    interval_secs,
468                },
469            )
470            .await;
471    }
472
473    /// Enable cross-proposal result caching for a tool with a TTL in seconds.
474    pub async fn enable_tool_cache(&self, tool: &str, ttl_secs: u64) {
475        self.result_cache.enable_caching(tool, ttl_secs).await;
476    }
477
478    /// Execute a proposal with automatic replanning on failure.
479    ///
480    /// If a `ReplanCallback` is registered and `max_replans > 0`, the runtime
481    /// will catch abort failures, roll back state, ask the model for an
482    /// alternative proposal via the callback, and re-execute. This transforms
483    /// "execute-and-hope" into "execute-and-recover."
484    ///
485    /// If no callback is registered or `max_replans == 0`, behaves identically
486    /// to a single `execute_inner()` call (zero overhead, fully backward compatible).
487    #[instrument(
488        name = "proposal.execute",
489        skip_all,
490        fields(
491            proposal_id = %proposal.id,
492            action_count = proposal.actions.len(),
493        )
494    )]
495    pub async fn execute(&self, proposal: &ActionProposal) -> ProposalResult {
496        // Forward to the cancel-aware variant with a never-cancelled
497        // token. Existing callers see no behaviour change.
498        let token = tokio_util::sync::CancellationToken::new();
499        self.execute_with_cancel(proposal, &token).await
500    }
501
502    /// Execute a proposal with cooperative cancellation.
503    ///
504    /// The runtime checks `token.is_cancelled()` at each DAG level
505    /// boundary. When set, every action that hadn't yet started runs
506    /// is reported as `Skipped` with `error = "canceled: ..."` so
507    /// callers can distinguish "user pulled the plug" from "earlier
508    /// abort cascaded." Actions already in flight continue to
509    /// completion — tool calls dispatched to user-provided executors
510    /// can't be safely interrupted from the engine.
511    ///
512    /// The CAR A2A bridge uses this so `tasks/cancel` produces a
513    /// `ProposalResult` with clean partial state rather than relying
514    /// on `JoinHandle::abort` to interrupt mid-await (which leaves
515    /// no record of which actions actually ran).
516    ///
517    /// **FFI exposure:** this method is intentionally not surfaced
518    /// through the NAPI / PyO3 / `car-server-core` JSON-RPC bindings.
519    /// Those consumers (Node, Python, WebSocket) don't currently
520    /// expose long-running async-task surfaces that need
521    /// cancellation; the bridge is the lone consumer. When a binding
522    /// gains a long-running task surface, the path is clear: add a
523    /// per-binding token registry keyed by some caller-provided id,
524    /// expose `cancelExecution(id)` / `cancel_execution(id)` /
525    /// `proposal.cancel { id }`, and have the runtime call
526    /// `execute_with_cancel` with the matching token. Skipping that
527    /// today avoids speculative API surface that bloats bindings
528    /// without a consumer.
529    pub async fn execute_with_cancel(
530        &self,
531        proposal: &ActionProposal,
532        cancel: &tokio_util::sync::CancellationToken,
533    ) -> ProposalResult {
534        let config = self.replan_config.read().await.clone();
535        let mut current_proposal = proposal.clone();
536        let mut attempt: u32 = 0;
537
538        loop {
539            let (result, state_before_map) =
540                self.execute_inner_with_cancel(&current_proposal, Some(cancel)).await;
541
542            // Check if we aborted
543            let aborted = result
544                .results
545                .iter()
546                .any(|r| r.status == ActionStatus::Failed);
547            if !aborted || attempt >= config.max_replans {
548                if aborted && attempt > 0 {
549                    // Exhausted all replan attempts
550                    let mut log = self.log.lock().await;
551                    log.append(
552                        EventKind::ReplanExhausted,
553                        None,
554                        Some(&proposal.id),
555                        [("attempts".to_string(), Value::from(attempt))].into(),
556                    );
557                }
558
559                // Persist trajectory
560                let outcome = if !aborted {
561                    if attempt > 0 {
562                        car_memgine::TrajectoryOutcome::ReplanSuccess
563                    } else {
564                        car_memgine::TrajectoryOutcome::Success
565                    }
566                } else if attempt > 0 {
567                    car_memgine::TrajectoryOutcome::ReplanExhausted
568                } else {
569                    car_memgine::TrajectoryOutcome::Failed
570                };
571                if let Some(err) = self.persist_trajectory(
572                    proposal,
573                    &current_proposal,
574                    &result,
575                    outcome,
576                    attempt,
577                    &state_before_map,
578                ) {
579                    let mut log = self.log.lock().await;
580                    log.append(
581                        EventKind::ActionFailed,
582                        None,
583                        Some(&proposal.id),
584                        [(
585                            "trajectory_persist_error".to_string(),
586                            Value::from(err.as_str()),
587                        )]
588                        .into(),
589                    );
590                }
591
592                return result;
593            }
594
595            // Get replan callback (clone Arc, drop lock immediately)
596            let callback = {
597                let guard = self.replan_callback.lock().await;
598                guard.clone()
599            };
600            let Some(callback) = callback else {
601                // No callback registered — persist trajectory and return
602                if let Some(err) = self.persist_trajectory(
603                    proposal,
604                    &current_proposal,
605                    &result,
606                    car_memgine::TrajectoryOutcome::Failed,
607                    attempt,
608                    &state_before_map,
609                ) {
610                    let mut log = self.log.lock().await;
611                    log.append(
612                        EventKind::ActionFailed,
613                        None,
614                        Some(&proposal.id),
615                        [(
616                            "trajectory_persist_error".to_string(),
617                            Value::from(err.as_str()),
618                        )]
619                        .into(),
620                    );
621                }
622                return result;
623            };
624
625            // Build replan context
626            let failed_actions: Vec<FailedActionSummary> = result
627                .results
628                .iter()
629                .filter(|r| r.status == ActionStatus::Failed)
630                .map(|r| {
631                    let action = current_proposal
632                        .actions
633                        .iter()
634                        .find(|a| a.id == r.action_id);
635                    FailedActionSummary {
636                        action_id: r.action_id.clone(),
637                        tool: action.and_then(|a| a.tool.clone()),
638                        error: r.error.clone().unwrap_or_default(),
639                        parameters: action.map(|a| a.parameters.clone()).unwrap_or_default(),
640                    }
641                })
642                .collect();
643
644            let completed_action_ids: Vec<String> = result
645                .results
646                .iter()
647                .filter(|r| r.status == ActionStatus::Succeeded)
648                .map(|r| r.action_id.clone())
649                .collect();
650
651            let ctx = ReplanContext {
652                proposal_id: proposal.id.clone(),
653                attempt: attempt + 1,
654                failed_actions,
655                completed_action_ids,
656                state_snapshot: self.state.snapshot(),
657                replans_remaining: config.max_replans.saturating_sub(attempt + 1),
658                original_source: proposal.source.clone(),
659                original_action_count: proposal.actions.len(),
660                original_context: proposal.context.clone(),
661            };
662
663            // Backoff delay between replan attempts
664            if config.delay_ms > 0 {
665                tokio::time::sleep(Duration::from_millis(config.delay_ms)).await;
666            }
667
668            // Log replan attempt
669            {
670                let mut log = self.log.lock().await;
671                log.append(
672                    EventKind::ReplanAttempted,
673                    None,
674                    Some(&proposal.id),
675                    [
676                        ("attempt".to_string(), Value::from(attempt + 1)),
677                        (
678                            "failed_count".to_string(),
679                            Value::from(ctx.failed_actions.len()),
680                        ),
681                    ]
682                    .into(),
683                );
684            }
685
686            // Call the model for a new plan
687            match callback.replan(&ctx).await {
688                Ok(new_proposal) => {
689                    // Quality gate: verify replan proposal before executing
690                    if config.verify_before_execute {
691                        let tools_guard = self.tools.read().await;
692                        let tool_names: std::collections::HashSet<String> =
693                            tools_guard.keys().cloned().collect();
694                        drop(tools_guard);
695
696                        let current_state = self.state.snapshot();
697                        let vr = car_verify::verify(
698                            &new_proposal,
699                            Some(&current_state),
700                            Some(&tool_names),
701                            100,
702                        );
703                        if !vr.valid {
704                            let error_msgs: Vec<String> = vr
705                                .issues
706                                .iter()
707                                .filter(|i| i.severity == "error")
708                                .map(|i| i.message.clone())
709                                .collect();
710                            let mut log = self.log.lock().await;
711                            log.append(
712                                EventKind::ReplanRejected,
713                                None,
714                                Some(&proposal.id),
715                                [
716                                    ("errors".to_string(), Value::from(error_msgs.join("; "))),
717                                    ("attempt".to_string(), Value::from(attempt + 1)),
718                                ]
719                                .into(),
720                            );
721                            // Don't execute a broken replan — count as failed attempt
722                            attempt += 1;
723                            continue;
724                        }
725                    }
726
727                    // Log accepted proposal
728                    {
729                        let mut log = self.log.lock().await;
730                        log.append(
731                            EventKind::ReplanProposalReceived,
732                            None,
733                            Some(&proposal.id),
734                            [
735                                ("attempt".to_string(), Value::from(attempt + 1)),
736                                (
737                                    "new_action_count".to_string(),
738                                    Value::from(new_proposal.actions.len()),
739                                ),
740                            ]
741                            .into(),
742                        );
743                    }
744                    current_proposal = new_proposal;
745                    attempt += 1;
746                }
747                Err(e) => {
748                    // Replan callback itself failed — log and return original failure
749                    let mut log = self.log.lock().await;
750                    log.append(
751                        EventKind::ReplanExhausted,
752                        None,
753                        Some(&proposal.id),
754                        [
755                            ("reason".to_string(), Value::from("callback_error")),
756                            ("error".to_string(), Value::from(e.as_str())),
757                            ("attempt".to_string(), Value::from(attempt + 1)),
758                        ]
759                        .into(),
760                    );
761                    if let Some(err) = self.persist_trajectory(
762                        proposal,
763                        &current_proposal,
764                        &result,
765                        car_memgine::TrajectoryOutcome::Failed,
766                        attempt,
767                        &state_before_map,
768                    ) {
769                        log.append(
770                            EventKind::ActionFailed,
771                            None,
772                            Some(&proposal.id),
773                            [(
774                                "trajectory_persist_error".to_string(),
775                                Value::from(err.as_str()),
776                            )]
777                            .into(),
778                        );
779                    }
780                    return result;
781                }
782            }
783        }
784    }
785
786    /// Persist a trajectory to the store if configured.
787    fn persist_trajectory(
788        &self,
789        proposal: &ActionProposal,
790        current_proposal: &ActionProposal,
791        result: &ProposalResult,
792        outcome: car_memgine::TrajectoryOutcome,
793        attempt: u32,
794        state_before_map: &HashMap<String, HashMap<String, Value>>,
795    ) -> Option<String> {
796        let store = self.trajectory_store.as_ref()?;
797
798        let trace_events: Vec<car_memgine::TraceEvent> = result
799            .results
800            .iter()
801            .map(|r| {
802                let kind = match r.status {
803                    ActionStatus::Succeeded => "action_succeeded",
804                    ActionStatus::Failed => "action_failed",
805                    ActionStatus::Rejected => "action_rejected",
806                    ActionStatus::Skipped => "action_skipped",
807                    _ => "unknown",
808                };
809                let tool = current_proposal
810                    .actions
811                    .iter()
812                    .find(|a| a.id == r.action_id)
813                    .and_then(|a| a.tool.clone());
814                let reward = match r.status {
815                    ActionStatus::Succeeded => Some(1.0),
816                    ActionStatus::Failed => Some(0.0),
817                    ActionStatus::Rejected => Some(0.0),
818                    ActionStatus::Skipped => None,
819                    _ => None,
820                };
821                car_memgine::TraceEvent {
822                    kind: kind.to_string(),
823                    action_id: Some(r.action_id.clone()),
824                    tool,
825                    data: r
826                        .error
827                        .as_ref()
828                        .map(|e| serde_json::json!({"error": e}))
829                        .unwrap_or(serde_json::json!({})),
830                    duration_ms: r.duration_ms,
831                    state_before: state_before_map.get(&r.action_id).cloned(),
832                    state_after: if !r.state_changes.is_empty() {
833                        Some(r.state_changes.clone())
834                    } else {
835                        None
836                    },
837                    reward,
838                }
839            })
840            .collect();
841
842        let trajectory = car_memgine::Trajectory {
843            proposal_id: proposal.id.clone(),
844            source: proposal.source.clone(),
845            action_count: current_proposal.actions.len(),
846            events: trace_events,
847            outcome,
848            timestamp: chrono::Utc::now(),
849            duration_ms: result.cost.total_duration_ms,
850            replan_attempts: attempt,
851        };
852
853        match store.append(&trajectory) {
854            Ok(()) => None,
855            Err(e) => Some(e.to_string()),
856        }
857    }
858
859    /// Score N candidate proposals, execute the best valid one, fall back to
860    /// next-best on failure. Combines car-planner scoring with engine execution.
861    ///
862    /// Returns the result from whichever proposal was executed (best or fallback).
863    /// If all candidates fail verification, returns an error result for the first.
864    pub async fn plan_and_execute(
865        &self,
866        candidates: &[ActionProposal],
867        planner_config: Option<car_planner::PlannerConfig>,
868        feedback: Option<&car_planner::ToolFeedback>,
869    ) -> ProposalResult {
870        if candidates.is_empty() {
871            return ProposalResult {
872                proposal_id: "empty".to_string(),
873                results: vec![],
874                cost: car_ir::CostSummary::default(),
875            };
876        }
877
878        // Score all candidates
879        let planner = car_planner::Planner::new(planner_config.unwrap_or_default());
880        let tools_guard = self.tools.read().await;
881        let tool_names: std::collections::HashSet<String> = tools_guard.keys().cloned().collect();
882        drop(tools_guard);
883
884        let pre_plan_snapshot = self.state.snapshot();
885        let pre_plan_transitions = self.state.transition_count();
886        let ranked = planner.rank_with_feedback(
887            candidates,
888            Some(&pre_plan_snapshot),
889            Some(&tool_names),
890            feedback,
891        );
892
893        // Try each valid candidate in score order
894        let mut first_failure: Option<ProposalResult> = None;
895        for scored in &ranked {
896            if !scored.valid {
897                continue;
898            }
899
900            // Restore clean state before each candidate (don't rely on execute's
901            // internal rollback — it only fires on Abort, not Skip/Retry failures)
902            self.state
903                .restore(pre_plan_snapshot.clone(), pre_plan_transitions);
904
905            let proposal = &candidates[scored.index];
906            let result = self.execute(proposal).await;
907
908            if result.all_succeeded() {
909                return result;
910            }
911
912            tracing::info!(
913                proposal_id = %proposal.id,
914                score = scored.score,
915                "plan_and_execute: proposal failed, trying next candidate"
916            );
917
918            if first_failure.is_none() {
919                first_failure = Some(result);
920            }
921        }
922
923        // Return the first failure result (don't re-execute — avoids duplicate side effects)
924        first_failure.unwrap_or_else(|| ProposalResult {
925            proposal_id: candidates[0].id.clone(),
926            results: vec![],
927            cost: car_ir::CostSummary::default(),
928        })
929    }
930
931    /// Execute a single proposal through the runtime loop (no replanning).
932    /// Returns (result, state_before_map) where state_before_map has per-action snapshots.
933    async fn execute_inner_with_cancel(
934        &self,
935        proposal: &ActionProposal,
936        cancel: Option<&tokio_util::sync::CancellationToken>,
937    ) -> (ProposalResult, HashMap<String, HashMap<String, Value>>) {
938        // Generate trace_id for this proposal execution
939        let trace_id = Uuid::new_v4().to_string();
940
941        // Begin root span for proposal execution
942        let root_span_id = {
943            let mut log = self.log.lock().await;
944            log.begin_span(
945                "proposal.execute",
946                &trace_id,
947                None,
948                [("proposal_id".to_string(), Value::from(proposal.id.as_str()))].into(),
949            )
950        };
951
952        // Log proposal received
953        {
954            let mut log = self.log.lock().await;
955            log.append(
956                EventKind::ProposalReceived,
957                None,
958                Some(&proposal.id),
959                [
960                    ("source".to_string(), Value::from(proposal.source.as_str())),
961                    (
962                        "action_count".to_string(),
963                        Value::from(proposal.actions.len()),
964                    ),
965                ]
966                .into(),
967            );
968        }
969
970        // Capability check: max_actions budget for entire proposal
971        {
972            let caps = self.capabilities.read().await;
973            if let Some(ref cap) = *caps {
974                if !cap.actions_within_budget(proposal.actions.len() as u32) {
975                    let mut action_results = Vec::new();
976                    for action in &proposal.actions {
977                        action_results.push(rejected_result(
978                            &action.id,
979                            format!(
980                                "capability denied: proposal has {} actions, max allowed is {:?}",
981                                proposal.actions.len(),
982                                cap.max_actions
983                            ),
984                        ));
985                    }
986                    return (
987                        ProposalResult {
988                            proposal_id: proposal.id.clone(),
989                            results: action_results,
990                            cost: CostSummary::default(),
991                        },
992                        HashMap::new(),
993                    );
994                }
995            }
996        }
997
998        // Snapshot for rollback
999        let snapshot = self.state.snapshot();
1000        let transition_count = self.state.transition_count();
1001
1002        let mut results: Vec<ActionResult> = Vec::new();
1003        // Per-action state snapshots captured before execution (for TraceEvent.state_before).
1004        let mut state_before_map: HashMap<String, HashMap<String, Value>> = HashMap::new();
1005        let mut aborted = false;
1006        let mut budget_exceeded = false;
1007        let mut total_retries: u32 = 0;
1008
1009        // Running cost counters for budget enforcement
1010        let mut running_tool_calls: u32 = 0;
1011        let mut running_actions: u32 = 0;
1012        let mut running_duration_ms: f64 = 0.0;
1013
1014        // Snapshot the budget once
1015        let budget = self.cost_budget.read().await.clone();
1016
1017        // Build DAG
1018        let levels = build_dag(&proposal.actions);
1019
1020        let mut canceled = false;
1021        for level in &levels {
1022            // Cooperative cancellation check at the level boundary.
1023            // Actions already in flight aren't interrupted (we can't
1024            // safely cancel a tool call dispatched to a user-provided
1025            // executor), but every action that hadn't started runs
1026            // is recorded as canceled with a clear reason.
1027            if !canceled {
1028                if let Some(token) = cancel {
1029                    if token.is_cancelled() {
1030                        canceled = true;
1031                    }
1032                }
1033            }
1034            if canceled {
1035                for &idx in level {
1036                    results.push(canceled_result(
1037                        &proposal.actions[idx].id,
1038                        "cancellation requested by caller",
1039                    ));
1040                }
1041                continue;
1042            }
1043            if aborted || budget_exceeded {
1044                let skip_reason = if budget_exceeded {
1045                    "cost budget exceeded"
1046                } else {
1047                    "skipped due to earlier abort"
1048                };
1049                for &idx in level {
1050                    results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1051                }
1052                continue;
1053            }
1054
1055            // Check if any action in this level has ABORT behavior
1056            let has_abort = level
1057                .iter()
1058                .any(|&i| proposal.actions[i].failure_behavior == FailureBehavior::Abort);
1059
1060            if level.len() == 1 || has_abort {
1061                // Sequential execution
1062                for &idx in level {
1063                    if aborted || budget_exceeded {
1064                        let skip_reason = if budget_exceeded {
1065                            "cost budget exceeded"
1066                        } else {
1067                            "skipped due to abort"
1068                        };
1069                        results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1070                        continue;
1071                    }
1072
1073                    // Budget check before execution
1074                    if let Some(ref b) = budget {
1075                        if let Some(max) = b.max_actions {
1076                            if running_actions >= max {
1077                                budget_exceeded = true;
1078                                results.push(skipped_result(
1079                                    &proposal.actions[idx].id,
1080                                    "cost budget exceeded",
1081                                ));
1082                                continue;
1083                            }
1084                        }
1085                        if let Some(max) = b.max_tool_calls {
1086                            if proposal.actions[idx].action_type == ActionType::ToolCall
1087                                && running_tool_calls >= max
1088                            {
1089                                budget_exceeded = true;
1090                                results.push(skipped_result(
1091                                    &proposal.actions[idx].id,
1092                                    "cost budget exceeded",
1093                                ));
1094                                continue;
1095                            }
1096                        }
1097                        if let Some(max) = b.max_duration_ms {
1098                            if running_duration_ms >= max {
1099                                budget_exceeded = true;
1100                                results.push(skipped_result(
1101                                    &proposal.actions[idx].id,
1102                                    "cost budget exceeded",
1103                                ));
1104                                continue;
1105                            }
1106                        }
1107                    }
1108
1109                    state_before_map.insert(
1110                        proposal.actions[idx].id.clone(),
1111                        snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1112                    );
1113                    let (ar, action_retries) = self
1114                        .process_action(
1115                            &proposal.actions[idx],
1116                            &proposal.id,
1117                            &trace_id,
1118                            &root_span_id,
1119                        )
1120                        .await;
1121                    total_retries += action_retries;
1122
1123                    // Update running counters
1124                    if ar.status == ActionStatus::Succeeded
1125                        && proposal.actions[idx].action_type == ActionType::ToolCall
1126                    {
1127                        running_tool_calls += 1;
1128                    }
1129                    if ar.status != ActionStatus::Skipped {
1130                        running_actions += 1;
1131                    }
1132                    if let Some(d) = ar.duration_ms {
1133                        running_duration_ms += d;
1134                    }
1135
1136                    if ar.status == ActionStatus::Failed
1137                        && proposal.actions[idx].failure_behavior == FailureBehavior::Abort
1138                    {
1139                        aborted = true;
1140                    }
1141                    results.push(ar);
1142                }
1143            } else {
1144                // Concurrent execution via futures::join_all
1145                // Snapshot only relevant keys per action (all see same pre-level state)
1146                for &idx in level {
1147                    state_before_map.insert(
1148                        proposal.actions[idx].id.clone(),
1149                        snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1150                    );
1151                }
1152                let futs: Vec<_> = level
1153                    .iter()
1154                    .map(|&idx| {
1155                        self.process_action(
1156                            &proposal.actions[idx],
1157                            &proposal.id,
1158                            &trace_id,
1159                            &root_span_id,
1160                        )
1161                    })
1162                    .collect();
1163                let level_results = futures::future::join_all(futs).await;
1164
1165                for (i, (ar, action_retries)) in level_results.into_iter().enumerate() {
1166                    let idx = level[i];
1167                    total_retries += action_retries;
1168                    if ar.status == ActionStatus::Succeeded
1169                        && proposal.actions[idx].action_type == ActionType::ToolCall
1170                    {
1171                        running_tool_calls += 1;
1172                    }
1173                    if ar.status != ActionStatus::Skipped {
1174                        running_actions += 1;
1175                    }
1176                    if let Some(d) = ar.duration_ms {
1177                        running_duration_ms += d;
1178                    }
1179                    results.push(ar);
1180                }
1181            }
1182        }
1183
1184        // Handle rollback
1185        if aborted {
1186            self.state.restore(snapshot.clone(), transition_count);
1187
1188            let mut log = self.log.lock().await;
1189            log.append(
1190                EventKind::StateSnapshot,
1191                None,
1192                Some(&proposal.id),
1193                [(
1194                    "state".to_string(),
1195                    serde_json::to_value(&snapshot).unwrap_or_default(),
1196                )]
1197                .into(),
1198            );
1199            log.append(
1200                EventKind::StateRollback,
1201                None,
1202                Some(&proposal.id),
1203                [(
1204                    "rolled_back_to".to_string(),
1205                    Value::from("pre-proposal snapshot"),
1206                )]
1207                .into(),
1208            );
1209
1210            // Clear idempotency cache for rolled-back actions
1211            let mut cache = self.idempotency_cache.lock().await;
1212            for r in &results {
1213                if r.status == ActionStatus::Succeeded {
1214                    for action in &proposal.actions {
1215                        if action.id == r.action_id && action.idempotent {
1216                            cache.remove(&idempotency_key(action));
1217                        }
1218                    }
1219                }
1220            }
1221        }
1222
1223        // Sort results to match original action order
1224        let action_order: HashMap<String, usize> = proposal
1225            .actions
1226            .iter()
1227            .enumerate()
1228            .map(|(i, a)| (a.id.clone(), i))
1229            .collect();
1230        results.sort_by_key(|r| {
1231            action_order
1232                .get(&r.action_id)
1233                .copied()
1234                .unwrap_or(usize::MAX)
1235        });
1236
1237        // Compute cost summary from results
1238        let mut cost = CostSummary::default();
1239        for r in &results {
1240            let action = action_order
1241                .get(&r.action_id)
1242                .and_then(|&i| proposal.actions.get(i));
1243            match r.status {
1244                ActionStatus::Succeeded => {
1245                    cost.actions_executed += 1;
1246                    if let Some(a) = action {
1247                        if a.action_type == ActionType::ToolCall {
1248                            cost.tool_calls += 1;
1249                        }
1250                    }
1251                }
1252                ActionStatus::Failed | ActionStatus::Rejected => {
1253                    cost.actions_executed += 1;
1254                }
1255                ActionStatus::Skipped => {
1256                    cost.actions_skipped += 1;
1257                }
1258                _ => {}
1259            }
1260            if let Some(d) = r.duration_ms {
1261                cost.total_duration_ms += d;
1262            }
1263        }
1264
1265        // Set retries from inline counter
1266        cost.retries = total_retries;
1267
1268        // End root span — Ok if no abort, Error if aborted
1269        {
1270            let span_status = if aborted {
1271                SpanStatus::Error
1272            } else {
1273                SpanStatus::Ok
1274            };
1275            let mut log = self.log.lock().await;
1276            log.end_span(&root_span_id, span_status);
1277        }
1278
1279        let proposal_result = ProposalResult {
1280            proposal_id: proposal.id.clone(),
1281            results,
1282            cost,
1283        };
1284
1285        // Post-execution: auto-distill skills from this execution trace
1286        if self.auto_distill {
1287            if let Some(ref memgine) = self.memgine {
1288                // Convert results to TraceEvents for distillation
1289                let trace_events: Vec<car_memgine::TraceEvent> = proposal_result
1290                    .results
1291                    .iter()
1292                    .map(|r| {
1293                        let kind = match r.status {
1294                            ActionStatus::Succeeded => "action_succeeded",
1295                            ActionStatus::Failed => "action_failed",
1296                            ActionStatus::Rejected => "action_rejected",
1297                            ActionStatus::Skipped => "action_skipped",
1298                            _ => "unknown",
1299                        };
1300                        // Find the matching action to get the tool name
1301                        let tool = proposal
1302                            .actions
1303                            .iter()
1304                            .find(|a| a.id == r.action_id)
1305                            .and_then(|a| a.tool.clone());
1306                        let mut data = serde_json::Map::new();
1307                        if let Some(ref e) = r.error {
1308                            data.insert("error".into(), Value::from(e.as_str()));
1309                        }
1310                        if let Some(ref o) = r.output {
1311                            data.insert("output".into(), o.clone());
1312                        }
1313                        car_memgine::TraceEvent {
1314                            kind: kind.to_string(),
1315                            action_id: Some(r.action_id.clone()),
1316                            tool,
1317                            data: Value::Object(data),
1318                            duration_ms: r.duration_ms,
1319                            reward: match r.status {
1320                                ActionStatus::Succeeded => Some(1.0),
1321                                ActionStatus::Failed | ActionStatus::Rejected => Some(0.0),
1322                                _ => None,
1323                            },
1324                            ..Default::default()
1325                        }
1326                    })
1327                    .collect();
1328
1329                let mut engine = memgine.lock().await;
1330                let skills = engine.distill_skills(&trace_events).await;
1331                if !skills.is_empty() {
1332                    let count = skills.len();
1333                    engine.ingest_distilled_skills(&skills);
1334
1335                    // Log the distillation event
1336                    let mut log = self.log.lock().await;
1337                    log.append(
1338                        EventKind::SkillDistilled,
1339                        None,
1340                        Some(&proposal_result.proposal_id),
1341                        [
1342                            ("skills_count".to_string(), Value::from(count)),
1343                            (
1344                                "skill_names".to_string(),
1345                                Value::from(
1346                                    skills.iter().map(|s| s.name.as_str()).collect::<Vec<_>>(),
1347                                ),
1348                            ),
1349                        ]
1350                        .into(),
1351                    );
1352
1353                    // Check if any domains need evolution
1354                    let threshold = engine.evolution_threshold();
1355                    let domains = engine.domains_needing_evolution(threshold);
1356                    for domain in &domains {
1357                        // Collect failed events for this domain
1358                        let failed: Vec<car_memgine::TraceEvent> = trace_events
1359                            .iter()
1360                            .filter(|e| {
1361                                matches!(e.kind.as_str(), "action_failed" | "action_rejected")
1362                            })
1363                            .cloned()
1364                            .collect();
1365                        if !failed.is_empty() {
1366                            let evolved = engine.evolve_skills(&failed, domain).await;
1367                            if !evolved.is_empty() {
1368                                log.append(
1369                                    EventKind::EvolutionTriggered,
1370                                    None,
1371                                    Some(&proposal_result.proposal_id),
1372                                    [
1373                                        ("domain".to_string(), Value::from(domain.as_str())),
1374                                        ("new_skills".to_string(), Value::from(evolved.len())),
1375                                    ]
1376                                    .into(),
1377                                );
1378                            }
1379                        }
1380                    }
1381                }
1382            }
1383        }
1384
1385        (proposal_result, state_before_map)
1386    }
1387
1388    /// Process a single action: idempotency → validate → policy → execute.
1389    /// Returns (ActionResult, retries_count).
1390    async fn process_action(
1391        &self,
1392        action: &Action,
1393        proposal_id: &str,
1394        trace_id: &str,
1395        parent_span_id: &str,
1396    ) -> (ActionResult, u32) {
1397        // Derive action type name for span naming
1398        let action_type_name = serde_json::to_string(&action.action_type)
1399            .unwrap_or_default()
1400            .trim_matches('"')
1401            .to_string();
1402        let span_name = format!("action.{}", action_type_name);
1403
1404        // Begin child span for this action
1405        let action_span_id = {
1406            let mut attrs: HashMap<String, Value> = HashMap::new();
1407            attrs.insert("action_id".to_string(), Value::from(action.id.as_str()));
1408            if let Some(ref tool) = action.tool {
1409                attrs.insert("tool".to_string(), Value::from(tool.as_str()));
1410            }
1411            let mut log = self.log.lock().await;
1412            log.begin_span(&span_name, trace_id, Some(parent_span_id), attrs)
1413        };
1414
1415        // Execute the action pipeline and capture result
1416        let (result, retries) = self.process_action_inner(action, proposal_id).await;
1417
1418        // End action span based on result status
1419        let span_status = match result.status {
1420            ActionStatus::Succeeded => SpanStatus::Ok,
1421            ActionStatus::Failed | ActionStatus::Rejected => SpanStatus::Error,
1422            _ => SpanStatus::Unset,
1423        };
1424        {
1425            let mut log = self.log.lock().await;
1426            log.end_span(&action_span_id, span_status);
1427        }
1428
1429        (result, retries)
1430    }
1431
1432    /// Inner action processing: idempotency -> validate -> policy -> execute.
1433    /// Returns (ActionResult, retries_count).
1434    #[instrument(
1435        name = "action.process",
1436        skip_all,
1437        fields(
1438            action_id = %action.id,
1439            action_type = ?action.action_type,
1440            tool = action.tool.as_deref().unwrap_or("none"),
1441        )
1442    )]
1443    async fn process_action_inner(
1444        &self,
1445        action: &Action,
1446        proposal_id: &str,
1447    ) -> (ActionResult, u32) {
1448        // Idempotency check
1449        if action.idempotent {
1450            let key = idempotency_key(action);
1451            let cache = self.idempotency_cache.lock().await;
1452            if let Some(cached) = cache.get(&key) {
1453                let mut log = self.log.lock().await;
1454                log.append(
1455                    EventKind::ActionDeduplicated,
1456                    Some(&action.id),
1457                    Some(proposal_id),
1458                    [(
1459                        "cached_action_id".to_string(),
1460                        Value::from(cached.action_id.as_str()),
1461                    )]
1462                    .into(),
1463                );
1464                return (
1465                    ActionResult {
1466                        action_id: action.id.clone(),
1467                        status: cached.status.clone(),
1468                        output: cached.output.clone(),
1469                        error: cached.error.clone(),
1470                        state_changes: cached.state_changes.clone(),
1471                        duration_ms: Some(0.0),
1472                        timestamp: chrono::Utc::now(),
1473                    },
1474                    0,
1475                );
1476            }
1477        }
1478
1479        // Capability check
1480        {
1481            let caps = self.capabilities.read().await;
1482            if let Some(ref cap) = *caps {
1483                // Check tool capability for ToolCall actions
1484                if action.action_type == ActionType::ToolCall {
1485                    if let Some(ref tool_name) = action.tool {
1486                        if !cap.tool_allowed(tool_name) {
1487                            let mut log = self.log.lock().await;
1488                            log.append(
1489                                EventKind::ActionRejected,
1490                                Some(&action.id),
1491                                Some(proposal_id),
1492                                HashMap::new(),
1493                            );
1494                            return (
1495                                rejected_result(
1496                                    &action.id,
1497                                    format!("capability denied: tool '{}' not allowed", tool_name),
1498                                ),
1499                                0,
1500                            );
1501                        }
1502                    }
1503                }
1504
1505                // Check state key capability for StateWrite/StateRead actions
1506                if action.action_type == ActionType::StateWrite
1507                    || action.action_type == ActionType::StateRead
1508                {
1509                    if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
1510                        if !cap.state_key_allowed(key) {
1511                            let mut log = self.log.lock().await;
1512                            log.append(
1513                                EventKind::ActionRejected,
1514                                Some(&action.id),
1515                                Some(proposal_id),
1516                                HashMap::new(),
1517                            );
1518                            return (
1519                                rejected_result(
1520                                    &action.id,
1521                                    format!("capability denied: state key '{}' not allowed", key),
1522                                ),
1523                                0,
1524                            );
1525                        }
1526                    }
1527                }
1528            }
1529        }
1530
1531        // Validate
1532        let tools = self.tools.read().await;
1533        let validation = validate_action(action, &self.state, &tools);
1534        drop(tools);
1535
1536        if !validation.valid() {
1537            let error = validation
1538                .errors
1539                .iter()
1540                .map(|e| e.reason.as_str())
1541                .collect::<Vec<_>>()
1542                .join("; ");
1543            let mut log = self.log.lock().await;
1544            log.append(
1545                EventKind::ActionRejected,
1546                Some(&action.id),
1547                Some(proposal_id),
1548                HashMap::new(),
1549            );
1550            return (rejected_result(&action.id, error), 0);
1551        }
1552
1553        // Policy check
1554        {
1555            let policies = self.policies.read().await;
1556            let violations = policies.check(action, &self.state);
1557            if !violations.is_empty() {
1558                let error = violations
1559                    .iter()
1560                    .map(|v| format!("policy '{}': {}", v.policy_name, v.reason))
1561                    .collect::<Vec<_>>()
1562                    .join("; ");
1563                let mut log = self.log.lock().await;
1564                log.append(
1565                    EventKind::PolicyViolation,
1566                    Some(&action.id),
1567                    Some(proposal_id),
1568                    HashMap::new(),
1569                );
1570                return (rejected_result(&action.id, error), 0);
1571            }
1572        }
1573
1574        // Validated
1575        {
1576            let mut log = self.log.lock().await;
1577            log.append(
1578                EventKind::ActionValidated,
1579                Some(&action.id),
1580                Some(proposal_id),
1581                HashMap::new(),
1582            );
1583        }
1584
1585        // Execute with retry
1586        let (result, retries) = self.execute_with_retry(action, proposal_id).await;
1587
1588        // Cache idempotent results
1589        if action.idempotent && result.status == ActionStatus::Succeeded {
1590            let mut cache = self.idempotency_cache.lock().await;
1591            cache.insert(idempotency_key(action), result.clone());
1592        }
1593
1594        tracing::info!(
1595            status = ?result.status,
1596            duration_ms = result.duration_ms,
1597            "action completed"
1598        );
1599
1600        (result, retries)
1601    }
1602
1603    /// Execute with retry logic and timeout.
1604    /// Returns (ActionResult, retries_count).
1605    async fn execute_with_retry(&self, action: &Action, proposal_id: &str) -> (ActionResult, u32) {
1606        let max_attempts = if action.failure_behavior == FailureBehavior::Retry {
1607            action.max_retries + 1
1608        } else {
1609            1
1610        };
1611
1612        let mut last_error: Option<String> = None;
1613        let mut retries: u32 = 0;
1614
1615        for attempt in 0..max_attempts {
1616            if attempt > 0 {
1617                retries += 1;
1618                let delay = RETRY_BASE_DELAY_MS * RETRY_BACKOFF_FACTOR.pow(attempt as u32 - 1);
1619                tokio::time::sleep(Duration::from_millis(delay)).await;
1620                let mut log = self.log.lock().await;
1621                log.append(
1622                    EventKind::ActionRetrying,
1623                    Some(&action.id),
1624                    Some(proposal_id),
1625                    [("attempt".to_string(), Value::from(attempt + 1))].into(),
1626                );
1627            }
1628
1629            {
1630                let mut log = self.log.lock().await;
1631                log.append(
1632                    EventKind::ActionExecuting,
1633                    Some(&action.id),
1634                    Some(proposal_id),
1635                    HashMap::new(),
1636                );
1637            }
1638
1639            let start = std::time::Instant::now();
1640            let transitions_before = self.state.transition_count();
1641
1642            // Execute with optional timeout
1643            let exec_result = if let Some(timeout_ms) = action.timeout_ms {
1644                match timeout(Duration::from_millis(timeout_ms), self.dispatch(action)).await {
1645                    Ok(r) => r,
1646                    Err(_) => Err(format!("action timed out after {}ms", timeout_ms)),
1647                }
1648            } else {
1649                self.dispatch(action).await
1650            };
1651
1652            let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
1653
1654            match exec_result {
1655                Ok(output) => {
1656                    // Commit post-effects
1657                    let mut state_changes: HashMap<String, Value> = HashMap::new();
1658                    for (key, value) in &action.expected_effects {
1659                        self.state.set(key, value.clone(), &action.id);
1660                        state_changes.insert(key.clone(), value.clone());
1661                    }
1662
1663                    // Capture state changes from dispatch
1664                    for t in self.state.transitions_since(transitions_before) {
1665                        if !state_changes.contains_key(&t.key) {
1666                            if let Some(v) = t.new_value {
1667                                state_changes.insert(t.key.clone(), v);
1668                            }
1669                        }
1670                    }
1671
1672                    let mut log = self.log.lock().await;
1673                    log.append(
1674                        EventKind::ActionSucceeded,
1675                        Some(&action.id),
1676                        Some(proposal_id),
1677                        [("duration_ms".to_string(), Value::from(duration_ms))].into(),
1678                    );
1679
1680                    if !state_changes.is_empty() {
1681                        log.append(
1682                            EventKind::StateChanged,
1683                            Some(&action.id),
1684                            Some(proposal_id),
1685                            [(
1686                                "changes".to_string(),
1687                                serde_json::to_value(&state_changes).unwrap_or_default(),
1688                            )]
1689                            .into(),
1690                        );
1691                    }
1692
1693                    return (
1694                        ActionResult {
1695                            action_id: action.id.clone(),
1696                            status: ActionStatus::Succeeded,
1697                            output: Some(output),
1698                            error: None,
1699                            state_changes,
1700                            duration_ms: Some(duration_ms),
1701                            timestamp: chrono::Utc::now(),
1702                        },
1703                        retries,
1704                    );
1705                }
1706                Err(e) => {
1707                    last_error = Some(e.clone());
1708                    let mut log = self.log.lock().await;
1709                    log.append(
1710                        EventKind::ActionFailed,
1711                        Some(&action.id),
1712                        Some(proposal_id),
1713                        [
1714                            ("error".to_string(), Value::from(e.as_str())),
1715                            ("attempt".to_string(), Value::from(attempt + 1)),
1716                        ]
1717                        .into(),
1718                    );
1719                }
1720            }
1721        }
1722
1723        // All attempts exhausted
1724        if action.failure_behavior == FailureBehavior::Skip {
1725            return (
1726                skipped_result(
1727                    &action.id,
1728                    last_error.as_deref().unwrap_or("all attempts exhausted"),
1729                ),
1730                retries,
1731            );
1732        }
1733
1734        (
1735            ActionResult {
1736                action_id: action.id.clone(),
1737                status: ActionStatus::Failed,
1738                output: None,
1739                error: last_error,
1740                state_changes: HashMap::new(),
1741                duration_ms: None,
1742                timestamp: chrono::Utc::now(),
1743            },
1744            retries,
1745        )
1746    }
1747
1748    /// Dispatch an action to the appropriate handler.
1749    async fn dispatch(&self, action: &Action) -> Result<Value, String> {
1750        match action.action_type {
1751            ActionType::ToolCall => {
1752                let tool_name = action.tool.as_deref().ok_or("tool_call has no tool")?;
1753                let params = Value::Object(
1754                    action
1755                        .parameters
1756                        .iter()
1757                        .map(|(k, v)| (k.clone(), v.clone()))
1758                        .collect(),
1759                );
1760
1761                // Check cross-proposal result cache.
1762                if let Some(cached) = self.result_cache.get(tool_name, &params).await {
1763                    return Ok(cached);
1764                }
1765
1766                // Apply rate limit backpressure before executing.
1767                self.rate_limiter.acquire(tool_name).await;
1768
1769                // Try built-in inference tools first when the inference engine is available.
1770                if matches!(
1771                    tool_name,
1772                    "infer" | "infer.grounded" | "embed" | "classify" | "transcribe" | "synthesize"
1773                ) {
1774                    if let Some(ref engine) = self.inference_engine {
1775                        // For "infer.grounded" or "infer" with memgine available,
1776                        // build context from memory and attach it to the request.
1777                        let params = {
1778                            let should_ground =
1779                                tool_name == "infer.grounded" || tool_name == "infer";
1780                            if should_ground {
1781                                if let Some(ref memgine) = self.memgine {
1782                                    if let Some(prompt) =
1783                                        params.get("prompt").and_then(|v| v.as_str())
1784                                    {
1785                                        let ctx = {
1786                                            let mut m = memgine.lock().await;
1787                                            m.build_context(prompt)
1788                                        };
1789                                        if !ctx.is_empty() {
1790                                            let mut p = params.clone();
1791                                            if let Some(obj) = p.as_object_mut() {
1792                                                obj.insert("context".to_string(), Value::from(ctx));
1793                                            }
1794                                            p
1795                                        } else {
1796                                            params
1797                                        }
1798                                    } else {
1799                                        params
1800                                    }
1801                                } else {
1802                                    params
1803                                }
1804                            } else {
1805                                params
1806                            }
1807                        };
1808
1809                        // Route "infer.grounded" to "infer" for the service layer
1810                        let effective_tool = if tool_name == "infer.grounded" {
1811                            "infer"
1812                        } else {
1813                            tool_name
1814                        };
1815                        let result =
1816                            car_inference::service::execute_tool(engine, effective_tool, &params)
1817                                .await
1818                                .map_err(|e| e.to_string());
1819
1820                        if let Ok(ref value) = result {
1821                            self.result_cache
1822                                .put(tool_name, &params, value.clone())
1823                                .await;
1824                        }
1825
1826                        return result;
1827                    }
1828                }
1829
1830                // Built-in memory consolidation tool.
1831                if tool_name == "memory.consolidate" {
1832                    if let Some(ref memgine) = self.memgine {
1833                        let report = {
1834                            let mut m = memgine.lock().await;
1835                            m.consolidate().await
1836                        };
1837                        // Log the consolidation event
1838                        {
1839                            let mut log = self.log.lock().await;
1840                            log.append(
1841                                EventKind::Consolidated,
1842                                None,
1843                                None,
1844                                [
1845                                    (
1846                                        "expired_pruned".to_string(),
1847                                        Value::from(report.expired_pruned),
1848                                    ),
1849                                    (
1850                                        "superseded_gc".to_string(),
1851                                        Value::from(report.superseded_gc),
1852                                    ),
1853                                    (
1854                                        "stale_embeddings_removed".to_string(),
1855                                        Value::from(report.stale_embeddings_removed),
1856                                    ),
1857                                    (
1858                                        "nodes_embedded".to_string(),
1859                                        Value::from(report.nodes_embedded),
1860                                    ),
1861                                    (
1862                                        "domains_evolved".to_string(),
1863                                        Value::from(report.domains_evolved.clone()),
1864                                    ),
1865                                    ("total_nodes".to_string(), Value::from(report.total_nodes)),
1866                                    ("total_edges".to_string(), Value::from(report.total_edges)),
1867                                ]
1868                                .into(),
1869                            );
1870                        }
1871                        return Ok(serde_json::to_value(&report).unwrap_or(Value::Null));
1872                    } else {
1873                        return Err(
1874                            "memory.consolidate requires memgine (attach with with_learning)"
1875                                .into(),
1876                        );
1877                    }
1878                }
1879
1880                // Prefer a configured tool_executor for any tool it claims to handle.
1881                // Fall through to agent_basics only when the configured executor is absent
1882                // or explicitly returns "unknown tool" — this prevents agent_basics' built-in
1883                // read_file/write_file (which resolve paths via std::env::current_dir) from
1884                // silently overriding an executor that carries its own working_dir.
1885                let configured = {
1886                    let guard = self.tool_executor.lock().await;
1887                    guard.as_ref().cloned()
1888                };
1889
1890                if let Some(ref executor) = configured {
1891                    let result = executor.execute(tool_name, &params).await;
1892                    let fall_through = matches!(&result, Err(e) if e.starts_with("unknown tool"));
1893                    if !fall_through {
1894                        if let Ok(ref value) = result {
1895                            self.result_cache
1896                                .put(tool_name, &params, value.clone())
1897                                .await;
1898                        }
1899                        return result;
1900                    }
1901                }
1902
1903                if let Some(result) = crate::agent_basics::execute(tool_name, &params).await {
1904                    if let Ok(ref value) = result {
1905                        self.result_cache
1906                            .put(tool_name, &params, value.clone())
1907                            .await;
1908                    }
1909                    return result;
1910                }
1911
1912                Err(format!("no handler for tool '{}'", tool_name))
1913            }
1914            ActionType::StateWrite => {
1915                let key = action
1916                    .parameters
1917                    .get("key")
1918                    .and_then(|v| v.as_str())
1919                    .ok_or("state_write requires 'key' parameter")?;
1920                let value = action
1921                    .parameters
1922                    .get("value")
1923                    .cloned()
1924                    .unwrap_or(Value::Null);
1925                self.state.set(key, value, &action.id);
1926                Ok(Value::from(format!("written: {}", key)))
1927            }
1928            ActionType::StateRead => {
1929                let key = action
1930                    .parameters
1931                    .get("key")
1932                    .and_then(|v| v.as_str())
1933                    .ok_or("state_read requires 'key' parameter")?;
1934                Ok(self.state.get(key).unwrap_or(Value::Null))
1935            }
1936            ActionType::Assertion => {
1937                let key = action
1938                    .parameters
1939                    .get("key")
1940                    .and_then(|v| v.as_str())
1941                    .ok_or("assertion requires 'key' parameter")?;
1942                let expected = action
1943                    .parameters
1944                    .get("expected")
1945                    .cloned()
1946                    .unwrap_or(Value::Null);
1947                let actual = self.state.get(key).unwrap_or(Value::Null);
1948                if actual != expected {
1949                    Err(format!(
1950                        "assertion failed: state['{}'] = {:?}, expected {:?}",
1951                        key, actual, expected
1952                    ))
1953                } else {
1954                    Ok(serde_json::json!({"asserted": key, "value": actual}))
1955                }
1956            }
1957        }
1958    }
1959
1960    // --- Checkpoint and resume ---
1961
1962    /// Save a checkpoint of the current runtime state.
1963    pub async fn save_checkpoint(&self) -> Checkpoint {
1964        let state = self.state.snapshot();
1965        let tools: Vec<String> = self.tools.read().await.keys().cloned().collect();
1966        let log = self.log.lock().await;
1967        let events: Vec<Value> = log
1968            .events()
1969            .iter()
1970            .map(|e| serde_json::to_value(e).unwrap_or_default())
1971            .collect();
1972
1973        Checkpoint {
1974            checkpoint_id: Uuid::new_v4().to_string(),
1975            created_at: chrono::Utc::now(),
1976            state,
1977            events,
1978            tools,
1979            metadata: HashMap::new(),
1980        }
1981    }
1982
1983    /// Save checkpoint to a JSON file.
1984    pub async fn save_checkpoint_to_file(&self, path: &str) -> Result<(), String> {
1985        let checkpoint = self.save_checkpoint().await;
1986        let json = serde_json::to_string_pretty(&checkpoint)
1987            .map_err(|e| format!("serialize error: {}", e))?;
1988        tokio::fs::write(path, json)
1989            .await
1990            .map_err(|e| format!("write error: {}", e))?;
1991        Ok(())
1992    }
1993
1994    /// Load a checkpoint from a JSON file and restore state.
1995    pub async fn load_checkpoint_from_file(&self, path: &str) -> Result<Checkpoint, String> {
1996        let json = tokio::fs::read_to_string(path)
1997            .await
1998            .map_err(|e| format!("read error: {}", e))?;
1999        let checkpoint: Checkpoint =
2000            serde_json::from_str(&json).map_err(|e| format!("deserialize error: {}", e))?;
2001        self.restore_checkpoint(&checkpoint).await;
2002        Ok(checkpoint)
2003    }
2004
2005    /// Restore runtime state from a checkpoint.
2006    pub async fn restore_checkpoint(&self, checkpoint: &Checkpoint) {
2007        // Replace state completely — don't merge, don't create synthetic transitions
2008        self.state.replace_all(checkpoint.state.clone());
2009        // Clear idempotency cache — stale results from pre-checkpoint execution
2010        // must not bypass validation/policy on the restored state
2011        self.idempotency_cache.lock().await.clear();
2012        // Restore tools (as name-only schemas; full schemas are not persisted in checkpoint)
2013        let mut tools = self.tools.write().await;
2014        tools.clear();
2015        for tool_name in &checkpoint.tools {
2016            let schema = ToolSchema {
2017                name: tool_name.clone(),
2018                description: String::new(),
2019                parameters: serde_json::Value::Object(Default::default()),
2020                returns: None,
2021                idempotent: false,
2022                cache_ttl_secs: None,
2023                rate_limit: None,
2024            };
2025            tools.insert(tool_name.clone(), schema);
2026        }
2027    }
2028
2029    /// Register a subprocess tool and set up the subprocess executor.
2030    /// If no executor exists, creates a new SubprocessToolExecutor.
2031    /// If one already exists, creates a new SubprocessToolExecutor with the
2032    /// existing executor as fallback.
2033    pub async fn register_subprocess_tool(
2034        &self,
2035        name: &str,
2036        tool: crate::subprocess::SubprocessTool,
2037    ) {
2038        use crate::subprocess::SubprocessToolExecutor;
2039
2040        let schema = ToolSchema {
2041            name: name.to_string(),
2042            description: format!("Subprocess tool: {}", tool.command),
2043            parameters: serde_json::Value::Object(Default::default()),
2044            returns: None,
2045            idempotent: false,
2046            cache_ttl_secs: None,
2047            rate_limit: None,
2048        };
2049        self.register_tool_schema(schema).await;
2050
2051        let mut guard = self.tool_executor.lock().await;
2052        let mut executor = match guard.take() {
2053            Some(existing) => {
2054                let mut sub = SubprocessToolExecutor::new();
2055                sub = sub.with_fallback(existing);
2056                sub
2057            }
2058            None => SubprocessToolExecutor::new(),
2059        };
2060        executor.register(name, tool);
2061        *guard = Some(std::sync::Arc::new(executor));
2062    }
2063}
2064
2065impl Default for Runtime {
2066    fn default() -> Self {
2067        Self::new()
2068    }
2069}