Skip to main content

car_engine/
executor.rs

1//! Core execution engine — propose → validate → execute → commit.
2
3use crate::cache::ResultCache;
4use crate::capabilities::CapabilitySet;
5use crate::checkpoint::Checkpoint;
6use crate::rate_limit::{RateLimit, RateLimiter};
7use car_eventlog::{EventKind, EventLog, SpanStatus};
8use car_ir::{
9    build_dag, Action, ActionProposal, ActionResult, ActionStatus, ActionType, CostSummary,
10    FailureBehavior, ProposalResult, ToolSchema,
11};
12use car_policy::PolicyEngine;
13use car_state::StateStore;
14use car_validator::validate_action;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
20use tokio::time::timeout;
21use uuid::Uuid;
22
23/// Retry backoff constants.
24const RETRY_BASE_DELAY_MS: u64 = 100;
25const RETRY_BACKOFF_FACTOR: u64 = 2;
26
27/// Trait for tool execution. Implement this to provide tools to the runtime.
28///
29/// In-process: implement directly with function calls.
30/// Daemon mode: implement by sending JSON-RPC to the client.
31#[async_trait::async_trait]
32pub trait ToolExecutor: Send + Sync {
33    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String>;
34}
35
36/// Deterministic key for idempotency deduplication.
37fn idempotency_key(action: &Action) -> String {
38    let sorted: std::collections::BTreeMap<_, _> = action.parameters.iter().collect();
39    let params = serde_json::to_string(&sorted).unwrap_or_default();
40    format!(
41        "{}:{}:{}",
42        serde_json::to_string(&action.action_type).unwrap_or_default(),
43        action.tool.as_deref().unwrap_or(""),
44        params
45    )
46}
47
48fn rejected_result(action_id: &str, error: String) -> ActionResult {
49    ActionResult {
50        action_id: action_id.to_string(),
51        status: ActionStatus::Rejected,
52        output: None,
53        error: Some(error),
54        state_changes: HashMap::new(),
55        duration_ms: None,
56        timestamp: chrono::Utc::now(),
57    }
58}
59
60fn skipped_result(action_id: &str, reason: &str) -> ActionResult {
61    ActionResult {
62        action_id: action_id.to_string(),
63        status: ActionStatus::Skipped,
64        output: None,
65        error: Some(reason.to_string()),
66        state_changes: HashMap::new(),
67        duration_ms: None,
68        timestamp: chrono::Utc::now(),
69    }
70}
71
72/// Format a tool result for feeding back to a model.
73pub fn format_tool_result(result: &ActionResult) -> String {
74    match result.status {
75        ActionStatus::Succeeded => match &result.output {
76            Some(v) => serde_json::to_string(v).unwrap_or_else(|_| v.to_string()),
77            None => String::new(),
78        },
79        ActionStatus::Rejected => format!("[REJECTED] {}", result.error.as_deref().unwrap_or("")),
80        ActionStatus::Failed => format!("[FAILED] {}", result.error.as_deref().unwrap_or("")),
81        _ => format!(
82            "[{:?}] {}",
83            result.status,
84            result.error.as_deref().unwrap_or("")
85        ),
86    }
87}
88
89/// Budget constraints for proposal execution.
90#[derive(Debug, Clone)]
91pub struct CostBudget {
92    pub max_tool_calls: Option<u32>,
93    pub max_duration_ms: Option<f64>,
94    pub max_actions: Option<u32>,
95}
96
97/// Common Agent Runtime — deterministic execution layer.
98///
99/// Lock ordering discipline (never hold multiple simultaneously, never hold sync locks across .await):
100/// 1. capabilities (RwLock, read-only during execution)
101/// 2. tools (RwLock, read-only during execution)
102/// 3. policies (RwLock, read-only during execution)
103/// 4. cost_budget (RwLock, read-only during execution)
104/// 5. log (TokioMutex, acquired/released per event)
105/// 6. tool_executor (TokioMutex, clone Arc and drop before await)
106/// 7. idempotency_cache (TokioMutex, acquired/released per check)
107///
108/// StateStore uses parking_lot::Mutex (sync) — NEVER hold across .await points.
109pub struct Runtime {
110    pub state: Arc<StateStore>,
111    pub tools: Arc<TokioRwLock<HashMap<String, ToolSchema>>>,
112    pub policies: Arc<TokioRwLock<PolicyEngine>>,
113    pub log: Arc<TokioMutex<EventLog>>,
114    pub rate_limiter: Arc<RateLimiter>,
115    pub result_cache: Arc<ResultCache>,
116    tool_executor: TokioMutex<Option<Arc<dyn ToolExecutor>>>,
117    idempotency_cache: TokioMutex<HashMap<String, ActionResult>>,
118    cost_budget: TokioRwLock<Option<CostBudget>>,
119    capabilities: TokioRwLock<Option<CapabilitySet>>,
120    inference_engine: Option<Arc<car_inference::InferenceEngine>>,
121    /// Optional memgine for skill learning (auto-distillation after execution).
122    memgine: Option<Arc<TokioMutex<car_memgine::MemgineEngine>>>,
123    /// Whether to auto-distill skills after each proposal execution.
124    auto_distill: bool,
125}
126
127impl Runtime {
128    pub fn new() -> Self {
129        Self {
130            state: Arc::new(StateStore::new()),
131            tools: Arc::new(TokioRwLock::new(HashMap::new())),
132            policies: Arc::new(TokioRwLock::new(PolicyEngine::new())),
133            log: Arc::new(TokioMutex::new(EventLog::new())),
134            rate_limiter: Arc::new(RateLimiter::new()),
135            result_cache: Arc::new(ResultCache::new()),
136            tool_executor: TokioMutex::new(None),
137            idempotency_cache: TokioMutex::new(HashMap::new()),
138            cost_budget: TokioRwLock::new(None),
139            capabilities: TokioRwLock::new(None),
140            inference_engine: None,
141            memgine: None,
142            auto_distill: false,
143        }
144    }
145
146    /// Create a runtime with shared state, event log, and policies.
147    /// Each runtime gets its own tool set, executor, and idempotency cache.
148    pub fn with_shared(
149        state: Arc<StateStore>,
150        log: Arc<TokioMutex<EventLog>>,
151        policies: Arc<TokioRwLock<PolicyEngine>>,
152    ) -> Self {
153        Self {
154            state,
155            tools: Arc::new(TokioRwLock::new(HashMap::new())),
156            policies,
157            log,
158            rate_limiter: Arc::new(RateLimiter::new()),
159            result_cache: Arc::new(ResultCache::new()),
160            tool_executor: TokioMutex::new(None),
161            idempotency_cache: TokioMutex::new(HashMap::new()),
162            cost_budget: TokioRwLock::new(None),
163            capabilities: TokioRwLock::new(None),
164            inference_engine: None,
165            memgine: None,
166            auto_distill: false,
167        }
168    }
169
170    /// Attach a local inference engine. Registers `infer`, `embed`, `classify`
171    /// as built-in tools with real implementations.
172    pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
173        self.inference_engine = Some(engine);
174        // Register inference tool schemas (non-async init, use try_lock)
175        if let Ok(mut tools) = self.tools.try_write() {
176            for schema in car_inference::service::all_schemas() {
177                tools.insert(schema.name.clone(), schema);
178            }
179        }
180        self
181    }
182
183    /// Attach a memgine for automatic skill learning after execution.
184    /// Enables auto-distillation of execution traces into skills.
185    pub fn with_learning(
186        mut self,
187        memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>,
188        auto_distill: bool,
189    ) -> Self {
190        self.memgine = Some(memgine);
191        self.auto_distill = auto_distill;
192        self
193    }
194
195    pub fn with_executor(self, executor: Arc<dyn ToolExecutor>) -> Self {
196        // Use try_lock for non-async init context. Safe because we just created the mutex.
197        if let Ok(mut guard) = self.tool_executor.try_lock() {
198            *guard = Some(executor);
199        }
200        self
201    }
202
203    /// Set a tool executor for the next execute() call.
204    /// Used by NAPI bindings where executor varies per call.
205    pub async fn set_executor(&self, executor: Arc<dyn ToolExecutor>) {
206        *self.tool_executor.lock().await = Some(executor);
207    }
208
209    pub fn with_event_log(mut self, log: EventLog) -> Self {
210        self.log = Arc::new(TokioMutex::new(log));
211        self
212    }
213
214    /// Register a tool with just a name (backward compatible).
215    pub async fn register_tool(&self, name: &str) {
216        let schema = ToolSchema {
217            name: name.to_string(),
218            description: String::new(),
219            parameters: serde_json::Value::Object(Default::default()),
220            returns: None,
221            idempotent: false,
222            cache_ttl_secs: None,
223            rate_limit: None,
224        };
225        self.register_tool_schema(schema).await;
226    }
227
228    /// Register a tool with full schema.
229    pub async fn register_tool_schema(&self, schema: ToolSchema) {
230        // Auto-configure cache if schema specifies it
231        if let Some(ttl) = schema.cache_ttl_secs {
232            self.result_cache.enable_caching(&schema.name, ttl).await;
233        }
234        // Auto-configure rate limit if schema specifies it
235        if let Some(ref rl) = schema.rate_limit {
236            self.rate_limiter
237                .set_limit(
238                    &schema.name,
239                    RateLimit {
240                        max_calls: rl.max_calls,
241                        interval_secs: rl.interval_secs,
242                    },
243                )
244                .await;
245        }
246        self.tools.write().await.insert(schema.name.clone(), schema);
247    }
248
249    /// Get all registered tool schemas (for model prompt generation).
250    pub async fn tool_schemas(&self) -> Vec<ToolSchema> {
251        self.tools.read().await.values().cloned().collect()
252    }
253
254    /// Set a cost budget that limits proposal execution.
255    pub async fn set_cost_budget(&self, budget: CostBudget) {
256        *self.cost_budget.write().await = Some(budget);
257    }
258
259    /// Set per-agent capability permissions that restrict tools, state keys, and action count.
260    pub async fn set_capabilities(&self, caps: CapabilitySet) {
261        *self.capabilities.write().await = Some(caps);
262    }
263
264    /// Set a per-tool rate limit (token bucket).
265    ///
266    /// `max_calls` tokens are available per `interval_secs` window.
267    /// When the bucket is empty, `dispatch()` applies backpressure by
268    /// waiting until a token refills.
269    pub async fn set_rate_limit(&self, tool: &str, max_calls: u32, interval_secs: f64) {
270        self.rate_limiter
271            .set_limit(tool, RateLimit { max_calls, interval_secs })
272            .await;
273    }
274
275    /// Enable cross-proposal result caching for a tool with a TTL in seconds.
276    pub async fn enable_tool_cache(&self, tool: &str, ttl_secs: u64) {
277        self.result_cache.enable_caching(tool, ttl_secs).await;
278    }
279
280    /// Execute a full proposal through the runtime loop.
281    pub async fn execute(&self, proposal: &ActionProposal) -> ProposalResult {
282        // Generate trace_id for this proposal execution
283        let trace_id = Uuid::new_v4().to_string();
284
285        // Begin root span for proposal execution
286        let root_span_id = {
287            let mut log = self.log.lock().await;
288            log.begin_span(
289                "proposal.execute",
290                &trace_id,
291                None,
292                [("proposal_id".to_string(), Value::from(proposal.id.as_str()))].into(),
293            )
294        };
295
296        // Log proposal received
297        {
298            let mut log = self.log.lock().await;
299            log.append(
300                EventKind::ProposalReceived,
301                None,
302                Some(&proposal.id),
303                [
304                    ("source".to_string(), Value::from(proposal.source.as_str())),
305                    (
306                        "action_count".to_string(),
307                        Value::from(proposal.actions.len()),
308                    ),
309                ]
310                .into(),
311            );
312        }
313
314        // Capability check: max_actions budget for entire proposal
315        {
316            let caps = self.capabilities.read().await;
317            if let Some(ref cap) = *caps {
318                if !cap.actions_within_budget(proposal.actions.len() as u32) {
319                    let mut action_results = Vec::new();
320                    for action in &proposal.actions {
321                        action_results.push(rejected_result(
322                            &action.id,
323                            format!(
324                                "capability denied: proposal has {} actions, max allowed is {:?}",
325                                proposal.actions.len(),
326                                cap.max_actions
327                            ),
328                        ));
329                    }
330                    return ProposalResult {
331                        proposal_id: proposal.id.clone(),
332                        results: action_results,
333                        cost: CostSummary::default(),
334                    };
335                }
336            }
337        }
338
339        // Snapshot for rollback
340        let snapshot = self.state.snapshot();
341        let transition_count = self.state.transition_count();
342
343        let mut results: Vec<ActionResult> = Vec::new();
344        let mut aborted = false;
345        let mut budget_exceeded = false;
346        let mut total_retries: u32 = 0;
347
348        // Running cost counters for budget enforcement
349        let mut running_tool_calls: u32 = 0;
350        let mut running_actions: u32 = 0;
351        let mut running_duration_ms: f64 = 0.0;
352
353        // Snapshot the budget once
354        let budget = self.cost_budget.read().await.clone();
355
356        // Build DAG
357        let levels = build_dag(&proposal.actions);
358
359        for level in &levels {
360            if aborted || budget_exceeded {
361                let skip_reason = if budget_exceeded {
362                    "cost budget exceeded"
363                } else {
364                    "skipped due to earlier abort"
365                };
366                for &idx in level {
367                    results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
368                }
369                continue;
370            }
371
372            // Check if any action in this level has ABORT behavior
373            let has_abort = level
374                .iter()
375                .any(|&i| proposal.actions[i].failure_behavior == FailureBehavior::Abort);
376
377            if level.len() == 1 || has_abort {
378                // Sequential execution
379                for &idx in level {
380                    if aborted || budget_exceeded {
381                        let skip_reason = if budget_exceeded {
382                            "cost budget exceeded"
383                        } else {
384                            "skipped due to abort"
385                        };
386                        results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
387                        continue;
388                    }
389
390                    // Budget check before execution
391                    if let Some(ref b) = budget {
392                        if let Some(max) = b.max_actions {
393                            if running_actions >= max {
394                                budget_exceeded = true;
395                                results.push(skipped_result(&proposal.actions[idx].id, "cost budget exceeded"));
396                                continue;
397                            }
398                        }
399                        if let Some(max) = b.max_tool_calls {
400                            if proposal.actions[idx].action_type == ActionType::ToolCall
401                                && running_tool_calls >= max
402                            {
403                                budget_exceeded = true;
404                                results.push(skipped_result(&proposal.actions[idx].id, "cost budget exceeded"));
405                                continue;
406                            }
407                        }
408                        if let Some(max) = b.max_duration_ms {
409                            if running_duration_ms >= max {
410                                budget_exceeded = true;
411                                results.push(skipped_result(&proposal.actions[idx].id, "cost budget exceeded"));
412                                continue;
413                            }
414                        }
415                    }
416
417                    let (ar, action_retries) = self.process_action(&proposal.actions[idx], &proposal.id, &trace_id, &root_span_id).await;
418                    total_retries += action_retries;
419
420                    // Update running counters
421                    if ar.status == ActionStatus::Succeeded
422                        && proposal.actions[idx].action_type == ActionType::ToolCall
423                    {
424                        running_tool_calls += 1;
425                    }
426                    if ar.status != ActionStatus::Skipped {
427                        running_actions += 1;
428                    }
429                    if let Some(d) = ar.duration_ms {
430                        running_duration_ms += d;
431                    }
432
433                    if ar.status == ActionStatus::Failed
434                        && proposal.actions[idx].failure_behavior == FailureBehavior::Abort
435                    {
436                        aborted = true;
437                    }
438                    results.push(ar);
439                }
440            } else {
441                // Concurrent execution via futures::join_all
442                let futs: Vec<_> = level.iter()
443                    .map(|&idx| self.process_action(&proposal.actions[idx], &proposal.id, &trace_id, &root_span_id))
444                    .collect();
445                let level_results = futures::future::join_all(futs).await;
446
447                for (i, (ar, action_retries)) in level_results.into_iter().enumerate() {
448                    let idx = level[i];
449                    total_retries += action_retries;
450                    if ar.status == ActionStatus::Succeeded
451                        && proposal.actions[idx].action_type == ActionType::ToolCall
452                    {
453                        running_tool_calls += 1;
454                    }
455                    if ar.status != ActionStatus::Skipped {
456                        running_actions += 1;
457                    }
458                    if let Some(d) = ar.duration_ms {
459                        running_duration_ms += d;
460                    }
461                    results.push(ar);
462                }
463            }
464        }
465
466        // Handle rollback
467        if aborted {
468            self.state.restore(snapshot.clone(), transition_count);
469
470            let mut log = self.log.lock().await;
471            log.append(
472                EventKind::StateSnapshot,
473                None,
474                Some(&proposal.id),
475                [("state".to_string(), serde_json::to_value(&snapshot).unwrap_or_default())].into(),
476            );
477            log.append(
478                EventKind::StateRollback,
479                None,
480                Some(&proposal.id),
481                [("rolled_back_to".to_string(), Value::from("pre-proposal snapshot"))].into(),
482            );
483
484            // Clear idempotency cache for rolled-back actions
485            let mut cache = self.idempotency_cache.lock().await;
486            for r in &results {
487                if r.status == ActionStatus::Succeeded {
488                    for action in &proposal.actions {
489                        if action.id == r.action_id && action.idempotent {
490                            cache.remove(&idempotency_key(action));
491                        }
492                    }
493                }
494            }
495        }
496
497        // Sort results to match original action order
498        let action_order: HashMap<String, usize> = proposal
499            .actions
500            .iter()
501            .enumerate()
502            .map(|(i, a)| (a.id.clone(), i))
503            .collect();
504        results.sort_by_key(|r| action_order.get(&r.action_id).copied().unwrap_or(usize::MAX));
505
506        // Compute cost summary from results
507        let mut cost = CostSummary::default();
508        for r in &results {
509            let action = action_order
510                .get(&r.action_id)
511                .and_then(|&i| proposal.actions.get(i));
512            match r.status {
513                ActionStatus::Succeeded => {
514                    cost.actions_executed += 1;
515                    if let Some(a) = action {
516                        if a.action_type == ActionType::ToolCall {
517                            cost.tool_calls += 1;
518                        }
519                    }
520                }
521                ActionStatus::Failed | ActionStatus::Rejected => {
522                    cost.actions_executed += 1;
523                }
524                ActionStatus::Skipped => {
525                    cost.actions_skipped += 1;
526                }
527                _ => {}
528            }
529            if let Some(d) = r.duration_ms {
530                cost.total_duration_ms += d;
531            }
532        }
533
534        // Set retries from inline counter
535        cost.retries = total_retries;
536
537        // End root span — Ok if no abort, Error if aborted
538        {
539            let span_status = if aborted {
540                SpanStatus::Error
541            } else {
542                SpanStatus::Ok
543            };
544            let mut log = self.log.lock().await;
545            log.end_span(&root_span_id, span_status);
546        }
547
548        let proposal_result = ProposalResult {
549            proposal_id: proposal.id.clone(),
550            results,
551            cost,
552        };
553
554        // Post-execution: auto-distill skills from this execution trace
555        if self.auto_distill {
556            if let Some(ref memgine) = self.memgine {
557                // Convert results to TraceEvents for distillation
558                let trace_events: Vec<car_memgine::TraceEvent> = proposal_result.results.iter()
559                    .map(|r| {
560                        let kind = match r.status {
561                            ActionStatus::Succeeded => "action_succeeded",
562                            ActionStatus::Failed => "action_failed",
563                            ActionStatus::Rejected => "action_rejected",
564                            ActionStatus::Skipped => "action_skipped",
565                            _ => "unknown",
566                        };
567                        // Find the matching action to get the tool name
568                        let tool = proposal.actions.iter()
569                            .find(|a| a.id == r.action_id)
570                            .and_then(|a| a.tool.clone());
571                        let mut data = serde_json::Map::new();
572                        if let Some(ref e) = r.error {
573                            data.insert("error".into(), Value::from(e.as_str()));
574                        }
575                        if let Some(ref o) = r.output {
576                            data.insert("output".into(), o.clone());
577                        }
578                        car_memgine::TraceEvent {
579                            kind: kind.to_string(),
580                            action_id: Some(r.action_id.clone()),
581                            tool,
582                            data: Value::Object(data),
583                        }
584                    })
585                    .collect();
586
587                let mut engine = memgine.lock().await;
588                let skills = engine.distill_skills(&trace_events).await;
589                if !skills.is_empty() {
590                    let count = skills.len();
591                    engine.ingest_distilled_skills(&skills);
592
593                    // Log the distillation event
594                    let mut log = self.log.lock().await;
595                    log.append(
596                        EventKind::SkillDistilled,
597                        None,
598                        Some(&proposal_result.proposal_id),
599                        [
600                            ("skills_count".to_string(), Value::from(count)),
601                            ("skill_names".to_string(), Value::from(
602                                skills.iter().map(|s| s.name.as_str()).collect::<Vec<_>>()
603                            )),
604                        ].into(),
605                    );
606
607                    // Check if any domains need evolution
608                    let threshold = 0.6; // TODO: make configurable
609                    let domains = engine.domains_needing_evolution(threshold);
610                    for domain in &domains {
611                        // Collect failed events for this domain
612                        let failed: Vec<car_memgine::TraceEvent> = trace_events.iter()
613                            .filter(|e| matches!(e.kind.as_str(), "action_failed" | "action_rejected"))
614                            .cloned()
615                            .collect();
616                        if !failed.is_empty() {
617                            let evolved = engine.evolve_skills(&failed, domain).await;
618                            if !evolved.is_empty() {
619                                log.append(
620                                    EventKind::EvolutionTriggered,
621                                    None,
622                                    Some(&proposal_result.proposal_id),
623                                    [
624                                        ("domain".to_string(), Value::from(domain.as_str())),
625                                        ("new_skills".to_string(), Value::from(evolved.len())),
626                                    ].into(),
627                                );
628                            }
629                        }
630                    }
631                }
632            }
633        }
634
635        proposal_result
636    }
637
638    /// Process a single action: idempotency → validate → policy → execute.
639    /// Returns (ActionResult, retries_count).
640    async fn process_action(
641        &self,
642        action: &Action,
643        proposal_id: &str,
644        trace_id: &str,
645        parent_span_id: &str,
646    ) -> (ActionResult, u32) {
647        // Derive action type name for span naming
648        let action_type_name = serde_json::to_string(&action.action_type)
649            .unwrap_or_default()
650            .trim_matches('"')
651            .to_string();
652        let span_name = format!("action.{}", action_type_name);
653
654        // Begin child span for this action
655        let action_span_id = {
656            let mut attrs: HashMap<String, Value> = HashMap::new();
657            attrs.insert("action_id".to_string(), Value::from(action.id.as_str()));
658            if let Some(ref tool) = action.tool {
659                attrs.insert("tool".to_string(), Value::from(tool.as_str()));
660            }
661            let mut log = self.log.lock().await;
662            log.begin_span(&span_name, trace_id, Some(parent_span_id), attrs)
663        };
664
665        // Execute the action pipeline and capture result
666        let (result, retries) = self.process_action_inner(action, proposal_id).await;
667
668        // End action span based on result status
669        let span_status = match result.status {
670            ActionStatus::Succeeded => SpanStatus::Ok,
671            ActionStatus::Failed | ActionStatus::Rejected => SpanStatus::Error,
672            _ => SpanStatus::Unset,
673        };
674        {
675            let mut log = self.log.lock().await;
676            log.end_span(&action_span_id, span_status);
677        }
678
679        (result, retries)
680    }
681
682    /// Inner action processing: idempotency -> validate -> policy -> execute.
683    /// Returns (ActionResult, retries_count).
684    async fn process_action_inner(&self, action: &Action, proposal_id: &str) -> (ActionResult, u32) {
685        // Idempotency check
686        if action.idempotent {
687            let key = idempotency_key(action);
688            let cache = self.idempotency_cache.lock().await;
689            if let Some(cached) = cache.get(&key) {
690                let mut log = self.log.lock().await;
691                log.append(
692                    EventKind::ActionDeduplicated,
693                    Some(&action.id),
694                    Some(proposal_id),
695                    [("cached_action_id".to_string(), Value::from(cached.action_id.as_str()))].into(),
696                );
697                return (ActionResult {
698                    action_id: action.id.clone(),
699                    status: cached.status.clone(),
700                    output: cached.output.clone(),
701                    error: cached.error.clone(),
702                    state_changes: cached.state_changes.clone(),
703                    duration_ms: Some(0.0),
704                    timestamp: chrono::Utc::now(),
705                }, 0);
706            }
707        }
708
709        // Capability check
710        {
711            let caps = self.capabilities.read().await;
712            if let Some(ref cap) = *caps {
713                // Check tool capability for ToolCall actions
714                if action.action_type == ActionType::ToolCall {
715                    if let Some(ref tool_name) = action.tool {
716                        if !cap.tool_allowed(tool_name) {
717                            let mut log = self.log.lock().await;
718                            log.append(
719                                EventKind::ActionRejected,
720                                Some(&action.id),
721                                Some(proposal_id),
722                                HashMap::new(),
723                            );
724                            return (rejected_result(
725                                &action.id,
726                                format!("capability denied: tool '{}' not allowed", tool_name),
727                            ), 0);
728                        }
729                    }
730                }
731
732                // Check state key capability for StateWrite/StateRead actions
733                if action.action_type == ActionType::StateWrite
734                    || action.action_type == ActionType::StateRead
735                {
736                    if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
737                        if !cap.state_key_allowed(key) {
738                            let mut log = self.log.lock().await;
739                            log.append(
740                                EventKind::ActionRejected,
741                                Some(&action.id),
742                                Some(proposal_id),
743                                HashMap::new(),
744                            );
745                            return (rejected_result(
746                                &action.id,
747                                format!("capability denied: state key '{}' not allowed", key),
748                            ), 0);
749                        }
750                    }
751                }
752            }
753        }
754
755        // Validate
756        let tools = self.tools.read().await;
757        let validation = validate_action(action, &self.state, &tools);
758        drop(tools);
759
760        if !validation.valid() {
761            let error = validation
762                .errors
763                .iter()
764                .map(|e| e.reason.as_str())
765                .collect::<Vec<_>>()
766                .join("; ");
767            let mut log = self.log.lock().await;
768            log.append(
769                EventKind::ActionRejected,
770                Some(&action.id),
771                Some(proposal_id),
772                HashMap::new(),
773            );
774            return (rejected_result(&action.id, error), 0);
775        }
776
777        // Policy check
778        {
779            let policies = self.policies.read().await;
780            let violations = policies.check(action, &self.state);
781            if !violations.is_empty() {
782                let error = violations
783                    .iter()
784                    .map(|v| format!("policy '{}': {}", v.policy_name, v.reason))
785                    .collect::<Vec<_>>()
786                    .join("; ");
787                let mut log = self.log.lock().await;
788                log.append(
789                    EventKind::PolicyViolation,
790                    Some(&action.id),
791                    Some(proposal_id),
792                    HashMap::new(),
793                );
794                return (rejected_result(&action.id, error), 0);
795            }
796        }
797
798        // Validated
799        {
800            let mut log = self.log.lock().await;
801            log.append(
802                EventKind::ActionValidated,
803                Some(&action.id),
804                Some(proposal_id),
805                HashMap::new(),
806            );
807        }
808
809        // Execute with retry
810        let (result, retries) = self.execute_with_retry(action, proposal_id).await;
811
812        // Cache idempotent results
813        if action.idempotent && result.status == ActionStatus::Succeeded {
814            let mut cache = self.idempotency_cache.lock().await;
815            cache.insert(idempotency_key(action), result.clone());
816        }
817
818        (result, retries)
819    }
820
821    /// Execute with retry logic and timeout.
822    /// Returns (ActionResult, retries_count).
823    async fn execute_with_retry(&self, action: &Action, proposal_id: &str) -> (ActionResult, u32) {
824        let max_attempts = if action.failure_behavior == FailureBehavior::Retry {
825            action.max_retries + 1
826        } else {
827            1
828        };
829
830        let mut last_error: Option<String> = None;
831        let mut retries: u32 = 0;
832
833        for attempt in 0..max_attempts {
834            if attempt > 0 {
835                retries += 1;
836                let delay = RETRY_BASE_DELAY_MS * RETRY_BACKOFF_FACTOR.pow(attempt as u32 - 1);
837                tokio::time::sleep(Duration::from_millis(delay)).await;
838                let mut log = self.log.lock().await;
839                log.append(
840                    EventKind::ActionRetrying,
841                    Some(&action.id),
842                    Some(proposal_id),
843                    [("attempt".to_string(), Value::from(attempt + 1))].into(),
844                );
845            }
846
847            {
848                let mut log = self.log.lock().await;
849                log.append(
850                    EventKind::ActionExecuting,
851                    Some(&action.id),
852                    Some(proposal_id),
853                    HashMap::new(),
854                );
855            }
856
857            let start = std::time::Instant::now();
858            let transitions_before = self.state.transition_count();
859
860            // Execute with optional timeout
861            let exec_result = if let Some(timeout_ms) = action.timeout_ms {
862                match timeout(
863                    Duration::from_millis(timeout_ms),
864                    self.dispatch(action),
865                )
866                .await
867                {
868                    Ok(r) => r,
869                    Err(_) => Err(format!("action timed out after {}ms", timeout_ms)),
870                }
871            } else {
872                self.dispatch(action).await
873            };
874
875            let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
876
877            match exec_result {
878                Ok(output) => {
879                    // Commit post-effects
880                    let mut state_changes: HashMap<String, Value> = HashMap::new();
881                    for (key, value) in &action.expected_effects {
882                        self.state.set(key, value.clone(), &action.id);
883                        state_changes.insert(key.clone(), value.clone());
884                    }
885
886                    // Capture state changes from dispatch
887                    for t in self.state.transitions_since(transitions_before) {
888                        if !state_changes.contains_key(&t.key) {
889                            if let Some(v) = t.new_value {
890                                state_changes.insert(t.key.clone(), v);
891                            }
892                        }
893                    }
894
895                    let mut log = self.log.lock().await;
896                    log.append(
897                        EventKind::ActionSucceeded,
898                        Some(&action.id),
899                        Some(proposal_id),
900                        [("duration_ms".to_string(), Value::from(duration_ms))].into(),
901                    );
902
903                    if !state_changes.is_empty() {
904                        log.append(
905                            EventKind::StateChanged,
906                            Some(&action.id),
907                            Some(proposal_id),
908                            [("changes".to_string(), serde_json::to_value(&state_changes).unwrap_or_default())].into(),
909                        );
910                    }
911
912                    return (ActionResult {
913                        action_id: action.id.clone(),
914                        status: ActionStatus::Succeeded,
915                        output: Some(output),
916                        error: None,
917                        state_changes,
918                        duration_ms: Some(duration_ms),
919                        timestamp: chrono::Utc::now(),
920                    }, retries);
921                }
922                Err(e) => {
923                    last_error = Some(e.clone());
924                    let mut log = self.log.lock().await;
925                    log.append(
926                        EventKind::ActionFailed,
927                        Some(&action.id),
928                        Some(proposal_id),
929                        [
930                            ("error".to_string(), Value::from(e.as_str())),
931                            ("attempt".to_string(), Value::from(attempt + 1)),
932                        ]
933                        .into(),
934                    );
935                }
936            }
937        }
938
939        // All attempts exhausted
940        if action.failure_behavior == FailureBehavior::Skip {
941            return (skipped_result(
942                &action.id,
943                last_error.as_deref().unwrap_or("all attempts exhausted"),
944            ), retries);
945        }
946
947        (ActionResult {
948            action_id: action.id.clone(),
949            status: ActionStatus::Failed,
950            output: None,
951            error: last_error,
952            state_changes: HashMap::new(),
953            duration_ms: None,
954            timestamp: chrono::Utc::now(),
955        }, retries)
956    }
957
958    /// Dispatch an action to the appropriate handler.
959    async fn dispatch(&self, action: &Action) -> Result<Value, String> {
960        match action.action_type {
961            ActionType::ToolCall => {
962                let tool_name = action.tool.as_deref().ok_or("tool_call has no tool")?;
963                let params = Value::Object(
964                    action
965                        .parameters
966                        .iter()
967                        .map(|(k, v)| (k.clone(), v.clone()))
968                        .collect(),
969                );
970
971                // Check cross-proposal result cache.
972                if let Some(cached) = self.result_cache.get(tool_name, &params).await {
973                    return Ok(cached);
974                }
975
976                // Apply rate limit backpressure before executing.
977                self.rate_limiter.acquire(tool_name).await;
978
979                // Try built-in inference tools first when the inference engine is available.
980                if matches!(tool_name, "infer" | "infer.grounded" | "embed" | "classify") {
981                    if let Some(ref engine) = self.inference_engine {
982                        // For "infer.grounded" or "infer" with memgine available,
983                        // build context from memory and attach it to the request.
984                        let params = {
985                            let should_ground = tool_name == "infer.grounded"
986                                || tool_name == "infer";
987                            if should_ground {
988                                if let Some(ref memgine) = self.memgine {
989                                    if let Some(prompt) = params.get("prompt").and_then(|v| v.as_str()) {
990                                        let ctx = { let mut m = memgine.lock().await; m.build_context(prompt) };
991                                        if !ctx.is_empty() {
992                                            let mut p = params.clone();
993                                            if let Some(obj) = p.as_object_mut() {
994                                                obj.insert("context".to_string(), Value::from(ctx));
995                                            }
996                                            p
997                                        } else {
998                                            params
999                                        }
1000                                    } else {
1001                                        params
1002                                    }
1003                                } else {
1004                                    params
1005                                }
1006                            } else {
1007                                params
1008                            }
1009                        };
1010
1011                        // Route "infer.grounded" to "infer" for the service layer
1012                        let effective_tool = if tool_name == "infer.grounded" { "infer" } else { tool_name };
1013                        let result = car_inference::service::execute_tool(engine, effective_tool, &params)
1014                            .await
1015                            .map_err(|e| e.to_string());
1016
1017                        if let Ok(ref value) = result {
1018                            self.result_cache
1019                                .put(tool_name, &params, value.clone())
1020                                .await;
1021                        }
1022
1023                        return result;
1024                    }
1025                }
1026
1027                let executor = {
1028                    let guard = self.tool_executor.lock().await;
1029                    guard.as_ref().ok_or("no tool executor configured")?.clone()
1030                };
1031                let result = executor.execute(tool_name, &params).await;
1032
1033                // Store successful results in the cross-proposal cache.
1034                if let Ok(ref value) = result {
1035                    self.result_cache
1036                        .put(tool_name, &params, value.clone())
1037                        .await;
1038                }
1039
1040                result
1041            }
1042            ActionType::StateWrite => {
1043                let key = action
1044                    .parameters
1045                    .get("key")
1046                    .and_then(|v| v.as_str())
1047                    .ok_or("state_write requires 'key' parameter")?;
1048                let value = action
1049                    .parameters
1050                    .get("value")
1051                    .cloned()
1052                    .unwrap_or(Value::Null);
1053                self.state.set(key, value, &action.id);
1054                Ok(Value::from(format!("written: {}", key)))
1055            }
1056            ActionType::StateRead => {
1057                let key = action
1058                    .parameters
1059                    .get("key")
1060                    .and_then(|v| v.as_str())
1061                    .ok_or("state_read requires 'key' parameter")?;
1062                Ok(self.state.get(key).unwrap_or(Value::Null))
1063            }
1064            ActionType::Assertion => {
1065                let key = action
1066                    .parameters
1067                    .get("key")
1068                    .and_then(|v| v.as_str())
1069                    .ok_or("assertion requires 'key' parameter")?;
1070                let expected = action.parameters.get("expected").cloned().unwrap_or(Value::Null);
1071                let actual = self.state.get(key).unwrap_or(Value::Null);
1072                if actual != expected {
1073                    Err(format!(
1074                        "assertion failed: state['{}'] = {:?}, expected {:?}",
1075                        key, actual, expected
1076                    ))
1077                } else {
1078                    Ok(serde_json::json!({"asserted": key, "value": actual}))
1079                }
1080            }
1081        }
1082    }
1083
1084    // --- Checkpoint and resume ---
1085
1086    /// Save a checkpoint of the current runtime state.
1087    pub async fn save_checkpoint(&self) -> Checkpoint {
1088        let state = self.state.snapshot();
1089        let tools: Vec<String> = self.tools.read().await.keys().cloned().collect();
1090        let log = self.log.lock().await;
1091        let events: Vec<Value> = log
1092            .events()
1093            .iter()
1094            .map(|e| serde_json::to_value(e).unwrap_or_default())
1095            .collect();
1096
1097        Checkpoint {
1098            checkpoint_id: Uuid::new_v4().to_string(),
1099            created_at: chrono::Utc::now(),
1100            state,
1101            events,
1102            tools,
1103            metadata: HashMap::new(),
1104        }
1105    }
1106
1107    /// Save checkpoint to a JSON file.
1108    pub async fn save_checkpoint_to_file(&self, path: &str) -> Result<(), String> {
1109        let checkpoint = self.save_checkpoint().await;
1110        let json = serde_json::to_string_pretty(&checkpoint)
1111            .map_err(|e| format!("serialize error: {}", e))?;
1112        tokio::fs::write(path, json).await.map_err(|e| format!("write error: {}", e))?;
1113        Ok(())
1114    }
1115
1116    /// Load a checkpoint from a JSON file and restore state.
1117    pub async fn load_checkpoint_from_file(&self, path: &str) -> Result<Checkpoint, String> {
1118        let json =
1119            tokio::fs::read_to_string(path).await.map_err(|e| format!("read error: {}", e))?;
1120        let checkpoint: Checkpoint =
1121            serde_json::from_str(&json).map_err(|e| format!("deserialize error: {}", e))?;
1122        self.restore_checkpoint(&checkpoint).await;
1123        Ok(checkpoint)
1124    }
1125
1126    /// Restore runtime state from a checkpoint.
1127    pub async fn restore_checkpoint(&self, checkpoint: &Checkpoint) {
1128        // Restore state
1129        for (key, value) in &checkpoint.state {
1130            self.state.set(key, value.clone(), "checkpoint_restore");
1131        }
1132        // Restore tools (as name-only schemas; full schemas are not persisted in checkpoint)
1133        let mut tools = self.tools.write().await;
1134        tools.clear();
1135        for tool_name in &checkpoint.tools {
1136            let schema = ToolSchema {
1137                name: tool_name.clone(),
1138                description: String::new(),
1139                parameters: serde_json::Value::Object(Default::default()),
1140                returns: None,
1141                idempotent: false,
1142                cache_ttl_secs: None,
1143                rate_limit: None,
1144            };
1145            tools.insert(tool_name.clone(), schema);
1146        }
1147    }
1148}
1149
1150impl Default for Runtime {
1151    fn default() -> Self {
1152        Self::new()
1153    }
1154}