1use crate::cache::ResultCache;
4use crate::capabilities::CapabilitySet;
5use crate::checkpoint::Checkpoint;
6use crate::rate_limit::{RateLimit, RateLimiter};
7use car_eventlog::{EventKind, EventLog, SpanStatus};
8use car_ir::{
9 build_dag, Action, ActionProposal, ActionResult, ActionStatus, ActionType, CostSummary,
10 FailureBehavior, ProposalResult, ToolSchema,
11};
12use car_policy::PolicyEngine;
13use car_state::StateStore;
14use car_validator::validate_action;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
20use tokio::time::timeout;
21use uuid::Uuid;
22
23const RETRY_BASE_DELAY_MS: u64 = 100;
25const RETRY_BACKOFF_FACTOR: u64 = 2;
26
27#[async_trait::async_trait]
32pub trait ToolExecutor: Send + Sync {
33 async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String>;
34}
35
36fn idempotency_key(action: &Action) -> String {
38 let sorted: std::collections::BTreeMap<_, _> = action.parameters.iter().collect();
39 let params = serde_json::to_string(&sorted).unwrap_or_default();
40 format!(
41 "{}:{}:{}",
42 serde_json::to_string(&action.action_type).unwrap_or_default(),
43 action.tool.as_deref().unwrap_or(""),
44 params
45 )
46}
47
48fn rejected_result(action_id: &str, error: String) -> ActionResult {
49 ActionResult {
50 action_id: action_id.to_string(),
51 status: ActionStatus::Rejected,
52 output: None,
53 error: Some(error),
54 state_changes: HashMap::new(),
55 duration_ms: None,
56 timestamp: chrono::Utc::now(),
57 }
58}
59
60fn skipped_result(action_id: &str, reason: &str) -> ActionResult {
61 ActionResult {
62 action_id: action_id.to_string(),
63 status: ActionStatus::Skipped,
64 output: None,
65 error: Some(reason.to_string()),
66 state_changes: HashMap::new(),
67 duration_ms: None,
68 timestamp: chrono::Utc::now(),
69 }
70}
71
72pub fn format_tool_result(result: &ActionResult) -> String {
74 match result.status {
75 ActionStatus::Succeeded => match &result.output {
76 Some(v) => serde_json::to_string(v).unwrap_or_else(|_| v.to_string()),
77 None => String::new(),
78 },
79 ActionStatus::Rejected => format!("[REJECTED] {}", result.error.as_deref().unwrap_or("")),
80 ActionStatus::Failed => format!("[FAILED] {}", result.error.as_deref().unwrap_or("")),
81 _ => format!(
82 "[{:?}] {}",
83 result.status,
84 result.error.as_deref().unwrap_or("")
85 ),
86 }
87}
88
89#[derive(Debug, Clone)]
91pub struct CostBudget {
92 pub max_tool_calls: Option<u32>,
93 pub max_duration_ms: Option<f64>,
94 pub max_actions: Option<u32>,
95}
96
97pub struct Runtime {
110 pub state: Arc<StateStore>,
111 pub tools: Arc<TokioRwLock<HashMap<String, ToolSchema>>>,
112 pub policies: Arc<TokioRwLock<PolicyEngine>>,
113 pub log: Arc<TokioMutex<EventLog>>,
114 pub rate_limiter: Arc<RateLimiter>,
115 pub result_cache: Arc<ResultCache>,
116 tool_executor: TokioMutex<Option<Arc<dyn ToolExecutor>>>,
117 idempotency_cache: TokioMutex<HashMap<String, ActionResult>>,
118 cost_budget: TokioRwLock<Option<CostBudget>>,
119 capabilities: TokioRwLock<Option<CapabilitySet>>,
120 inference_engine: Option<Arc<car_inference::InferenceEngine>>,
121 memgine: Option<Arc<TokioMutex<car_memgine::MemgineEngine>>>,
123 auto_distill: bool,
125}
126
127impl Runtime {
128 pub fn new() -> Self {
129 Self {
130 state: Arc::new(StateStore::new()),
131 tools: Arc::new(TokioRwLock::new(HashMap::new())),
132 policies: Arc::new(TokioRwLock::new(PolicyEngine::new())),
133 log: Arc::new(TokioMutex::new(EventLog::new())),
134 rate_limiter: Arc::new(RateLimiter::new()),
135 result_cache: Arc::new(ResultCache::new()),
136 tool_executor: TokioMutex::new(None),
137 idempotency_cache: TokioMutex::new(HashMap::new()),
138 cost_budget: TokioRwLock::new(None),
139 capabilities: TokioRwLock::new(None),
140 inference_engine: None,
141 memgine: None,
142 auto_distill: false,
143 }
144 }
145
146 pub fn with_shared(
149 state: Arc<StateStore>,
150 log: Arc<TokioMutex<EventLog>>,
151 policies: Arc<TokioRwLock<PolicyEngine>>,
152 ) -> Self {
153 Self {
154 state,
155 tools: Arc::new(TokioRwLock::new(HashMap::new())),
156 policies,
157 log,
158 rate_limiter: Arc::new(RateLimiter::new()),
159 result_cache: Arc::new(ResultCache::new()),
160 tool_executor: TokioMutex::new(None),
161 idempotency_cache: TokioMutex::new(HashMap::new()),
162 cost_budget: TokioRwLock::new(None),
163 capabilities: TokioRwLock::new(None),
164 inference_engine: None,
165 memgine: None,
166 auto_distill: false,
167 }
168 }
169
170 pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
173 self.inference_engine = Some(engine);
174 if let Ok(mut tools) = self.tools.try_write() {
176 for schema in car_inference::service::all_schemas() {
177 tools.insert(schema.name.clone(), schema);
178 }
179 }
180 self
181 }
182
183 pub fn with_learning(
186 mut self,
187 memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>,
188 auto_distill: bool,
189 ) -> Self {
190 self.memgine = Some(memgine);
191 self.auto_distill = auto_distill;
192 self
193 }
194
195 pub fn with_executor(self, executor: Arc<dyn ToolExecutor>) -> Self {
196 if let Ok(mut guard) = self.tool_executor.try_lock() {
198 *guard = Some(executor);
199 }
200 self
201 }
202
203 pub async fn set_executor(&self, executor: Arc<dyn ToolExecutor>) {
206 *self.tool_executor.lock().await = Some(executor);
207 }
208
209 pub fn with_event_log(mut self, log: EventLog) -> Self {
210 self.log = Arc::new(TokioMutex::new(log));
211 self
212 }
213
214 pub async fn register_tool(&self, name: &str) {
216 let schema = ToolSchema {
217 name: name.to_string(),
218 description: String::new(),
219 parameters: serde_json::Value::Object(Default::default()),
220 returns: None,
221 idempotent: false,
222 cache_ttl_secs: None,
223 rate_limit: None,
224 };
225 self.register_tool_schema(schema).await;
226 }
227
228 pub async fn register_tool_schema(&self, schema: ToolSchema) {
230 if let Some(ttl) = schema.cache_ttl_secs {
232 self.result_cache.enable_caching(&schema.name, ttl).await;
233 }
234 if let Some(ref rl) = schema.rate_limit {
236 self.rate_limiter
237 .set_limit(
238 &schema.name,
239 RateLimit {
240 max_calls: rl.max_calls,
241 interval_secs: rl.interval_secs,
242 },
243 )
244 .await;
245 }
246 self.tools.write().await.insert(schema.name.clone(), schema);
247 }
248
249 pub async fn tool_schemas(&self) -> Vec<ToolSchema> {
251 self.tools.read().await.values().cloned().collect()
252 }
253
254 pub async fn set_cost_budget(&self, budget: CostBudget) {
256 *self.cost_budget.write().await = Some(budget);
257 }
258
259 pub async fn set_capabilities(&self, caps: CapabilitySet) {
261 *self.capabilities.write().await = Some(caps);
262 }
263
264 pub async fn set_rate_limit(&self, tool: &str, max_calls: u32, interval_secs: f64) {
270 self.rate_limiter
271 .set_limit(tool, RateLimit { max_calls, interval_secs })
272 .await;
273 }
274
275 pub async fn enable_tool_cache(&self, tool: &str, ttl_secs: u64) {
277 self.result_cache.enable_caching(tool, ttl_secs).await;
278 }
279
280 pub async fn execute(&self, proposal: &ActionProposal) -> ProposalResult {
282 let trace_id = Uuid::new_v4().to_string();
284
285 let root_span_id = {
287 let mut log = self.log.lock().await;
288 log.begin_span(
289 "proposal.execute",
290 &trace_id,
291 None,
292 [("proposal_id".to_string(), Value::from(proposal.id.as_str()))].into(),
293 )
294 };
295
296 {
298 let mut log = self.log.lock().await;
299 log.append(
300 EventKind::ProposalReceived,
301 None,
302 Some(&proposal.id),
303 [
304 ("source".to_string(), Value::from(proposal.source.as_str())),
305 (
306 "action_count".to_string(),
307 Value::from(proposal.actions.len()),
308 ),
309 ]
310 .into(),
311 );
312 }
313
314 {
316 let caps = self.capabilities.read().await;
317 if let Some(ref cap) = *caps {
318 if !cap.actions_within_budget(proposal.actions.len() as u32) {
319 let mut action_results = Vec::new();
320 for action in &proposal.actions {
321 action_results.push(rejected_result(
322 &action.id,
323 format!(
324 "capability denied: proposal has {} actions, max allowed is {:?}",
325 proposal.actions.len(),
326 cap.max_actions
327 ),
328 ));
329 }
330 return ProposalResult {
331 proposal_id: proposal.id.clone(),
332 results: action_results,
333 cost: CostSummary::default(),
334 };
335 }
336 }
337 }
338
339 let snapshot = self.state.snapshot();
341 let transition_count = self.state.transition_count();
342
343 let mut results: Vec<ActionResult> = Vec::new();
344 let mut aborted = false;
345 let mut budget_exceeded = false;
346 let mut total_retries: u32 = 0;
347
348 let mut running_tool_calls: u32 = 0;
350 let mut running_actions: u32 = 0;
351 let mut running_duration_ms: f64 = 0.0;
352
353 let budget = self.cost_budget.read().await.clone();
355
356 let levels = build_dag(&proposal.actions);
358
359 for level in &levels {
360 if aborted || budget_exceeded {
361 let skip_reason = if budget_exceeded {
362 "cost budget exceeded"
363 } else {
364 "skipped due to earlier abort"
365 };
366 for &idx in level {
367 results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
368 }
369 continue;
370 }
371
372 let has_abort = level
374 .iter()
375 .any(|&i| proposal.actions[i].failure_behavior == FailureBehavior::Abort);
376
377 if level.len() == 1 || has_abort {
378 for &idx in level {
380 if aborted || budget_exceeded {
381 let skip_reason = if budget_exceeded {
382 "cost budget exceeded"
383 } else {
384 "skipped due to abort"
385 };
386 results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
387 continue;
388 }
389
390 if let Some(ref b) = budget {
392 if let Some(max) = b.max_actions {
393 if running_actions >= max {
394 budget_exceeded = true;
395 results.push(skipped_result(&proposal.actions[idx].id, "cost budget exceeded"));
396 continue;
397 }
398 }
399 if let Some(max) = b.max_tool_calls {
400 if proposal.actions[idx].action_type == ActionType::ToolCall
401 && running_tool_calls >= max
402 {
403 budget_exceeded = true;
404 results.push(skipped_result(&proposal.actions[idx].id, "cost budget exceeded"));
405 continue;
406 }
407 }
408 if let Some(max) = b.max_duration_ms {
409 if running_duration_ms >= max {
410 budget_exceeded = true;
411 results.push(skipped_result(&proposal.actions[idx].id, "cost budget exceeded"));
412 continue;
413 }
414 }
415 }
416
417 let (ar, action_retries) = self.process_action(&proposal.actions[idx], &proposal.id, &trace_id, &root_span_id).await;
418 total_retries += action_retries;
419
420 if ar.status == ActionStatus::Succeeded
422 && proposal.actions[idx].action_type == ActionType::ToolCall
423 {
424 running_tool_calls += 1;
425 }
426 if ar.status != ActionStatus::Skipped {
427 running_actions += 1;
428 }
429 if let Some(d) = ar.duration_ms {
430 running_duration_ms += d;
431 }
432
433 if ar.status == ActionStatus::Failed
434 && proposal.actions[idx].failure_behavior == FailureBehavior::Abort
435 {
436 aborted = true;
437 }
438 results.push(ar);
439 }
440 } else {
441 let futs: Vec<_> = level.iter()
443 .map(|&idx| self.process_action(&proposal.actions[idx], &proposal.id, &trace_id, &root_span_id))
444 .collect();
445 let level_results = futures::future::join_all(futs).await;
446
447 for (i, (ar, action_retries)) in level_results.into_iter().enumerate() {
448 let idx = level[i];
449 total_retries += action_retries;
450 if ar.status == ActionStatus::Succeeded
451 && proposal.actions[idx].action_type == ActionType::ToolCall
452 {
453 running_tool_calls += 1;
454 }
455 if ar.status != ActionStatus::Skipped {
456 running_actions += 1;
457 }
458 if let Some(d) = ar.duration_ms {
459 running_duration_ms += d;
460 }
461 results.push(ar);
462 }
463 }
464 }
465
466 if aborted {
468 self.state.restore(snapshot.clone(), transition_count);
469
470 let mut log = self.log.lock().await;
471 log.append(
472 EventKind::StateSnapshot,
473 None,
474 Some(&proposal.id),
475 [("state".to_string(), serde_json::to_value(&snapshot).unwrap_or_default())].into(),
476 );
477 log.append(
478 EventKind::StateRollback,
479 None,
480 Some(&proposal.id),
481 [("rolled_back_to".to_string(), Value::from("pre-proposal snapshot"))].into(),
482 );
483
484 let mut cache = self.idempotency_cache.lock().await;
486 for r in &results {
487 if r.status == ActionStatus::Succeeded {
488 for action in &proposal.actions {
489 if action.id == r.action_id && action.idempotent {
490 cache.remove(&idempotency_key(action));
491 }
492 }
493 }
494 }
495 }
496
497 let action_order: HashMap<String, usize> = proposal
499 .actions
500 .iter()
501 .enumerate()
502 .map(|(i, a)| (a.id.clone(), i))
503 .collect();
504 results.sort_by_key(|r| action_order.get(&r.action_id).copied().unwrap_or(usize::MAX));
505
506 let mut cost = CostSummary::default();
508 for r in &results {
509 let action = action_order
510 .get(&r.action_id)
511 .and_then(|&i| proposal.actions.get(i));
512 match r.status {
513 ActionStatus::Succeeded => {
514 cost.actions_executed += 1;
515 if let Some(a) = action {
516 if a.action_type == ActionType::ToolCall {
517 cost.tool_calls += 1;
518 }
519 }
520 }
521 ActionStatus::Failed | ActionStatus::Rejected => {
522 cost.actions_executed += 1;
523 }
524 ActionStatus::Skipped => {
525 cost.actions_skipped += 1;
526 }
527 _ => {}
528 }
529 if let Some(d) = r.duration_ms {
530 cost.total_duration_ms += d;
531 }
532 }
533
534 cost.retries = total_retries;
536
537 {
539 let span_status = if aborted {
540 SpanStatus::Error
541 } else {
542 SpanStatus::Ok
543 };
544 let mut log = self.log.lock().await;
545 log.end_span(&root_span_id, span_status);
546 }
547
548 let proposal_result = ProposalResult {
549 proposal_id: proposal.id.clone(),
550 results,
551 cost,
552 };
553
554 if self.auto_distill {
556 if let Some(ref memgine) = self.memgine {
557 let trace_events: Vec<car_memgine::TraceEvent> = proposal_result.results.iter()
559 .map(|r| {
560 let kind = match r.status {
561 ActionStatus::Succeeded => "action_succeeded",
562 ActionStatus::Failed => "action_failed",
563 ActionStatus::Rejected => "action_rejected",
564 ActionStatus::Skipped => "action_skipped",
565 _ => "unknown",
566 };
567 let tool = proposal.actions.iter()
569 .find(|a| a.id == r.action_id)
570 .and_then(|a| a.tool.clone());
571 let mut data = serde_json::Map::new();
572 if let Some(ref e) = r.error {
573 data.insert("error".into(), Value::from(e.as_str()));
574 }
575 if let Some(ref o) = r.output {
576 data.insert("output".into(), o.clone());
577 }
578 car_memgine::TraceEvent {
579 kind: kind.to_string(),
580 action_id: Some(r.action_id.clone()),
581 tool,
582 data: Value::Object(data),
583 }
584 })
585 .collect();
586
587 let mut engine = memgine.lock().await;
588 let skills = engine.distill_skills(&trace_events).await;
589 if !skills.is_empty() {
590 let count = skills.len();
591 engine.ingest_distilled_skills(&skills);
592
593 let mut log = self.log.lock().await;
595 log.append(
596 EventKind::SkillDistilled,
597 None,
598 Some(&proposal_result.proposal_id),
599 [
600 ("skills_count".to_string(), Value::from(count)),
601 ("skill_names".to_string(), Value::from(
602 skills.iter().map(|s| s.name.as_str()).collect::<Vec<_>>()
603 )),
604 ].into(),
605 );
606
607 let threshold = 0.6; let domains = engine.domains_needing_evolution(threshold);
610 for domain in &domains {
611 let failed: Vec<car_memgine::TraceEvent> = trace_events.iter()
613 .filter(|e| matches!(e.kind.as_str(), "action_failed" | "action_rejected"))
614 .cloned()
615 .collect();
616 if !failed.is_empty() {
617 let evolved = engine.evolve_skills(&failed, domain).await;
618 if !evolved.is_empty() {
619 log.append(
620 EventKind::EvolutionTriggered,
621 None,
622 Some(&proposal_result.proposal_id),
623 [
624 ("domain".to_string(), Value::from(domain.as_str())),
625 ("new_skills".to_string(), Value::from(evolved.len())),
626 ].into(),
627 );
628 }
629 }
630 }
631 }
632 }
633 }
634
635 proposal_result
636 }
637
638 async fn process_action(
641 &self,
642 action: &Action,
643 proposal_id: &str,
644 trace_id: &str,
645 parent_span_id: &str,
646 ) -> (ActionResult, u32) {
647 let action_type_name = serde_json::to_string(&action.action_type)
649 .unwrap_or_default()
650 .trim_matches('"')
651 .to_string();
652 let span_name = format!("action.{}", action_type_name);
653
654 let action_span_id = {
656 let mut attrs: HashMap<String, Value> = HashMap::new();
657 attrs.insert("action_id".to_string(), Value::from(action.id.as_str()));
658 if let Some(ref tool) = action.tool {
659 attrs.insert("tool".to_string(), Value::from(tool.as_str()));
660 }
661 let mut log = self.log.lock().await;
662 log.begin_span(&span_name, trace_id, Some(parent_span_id), attrs)
663 };
664
665 let (result, retries) = self.process_action_inner(action, proposal_id).await;
667
668 let span_status = match result.status {
670 ActionStatus::Succeeded => SpanStatus::Ok,
671 ActionStatus::Failed | ActionStatus::Rejected => SpanStatus::Error,
672 _ => SpanStatus::Unset,
673 };
674 {
675 let mut log = self.log.lock().await;
676 log.end_span(&action_span_id, span_status);
677 }
678
679 (result, retries)
680 }
681
682 async fn process_action_inner(&self, action: &Action, proposal_id: &str) -> (ActionResult, u32) {
685 if action.idempotent {
687 let key = idempotency_key(action);
688 let cache = self.idempotency_cache.lock().await;
689 if let Some(cached) = cache.get(&key) {
690 let mut log = self.log.lock().await;
691 log.append(
692 EventKind::ActionDeduplicated,
693 Some(&action.id),
694 Some(proposal_id),
695 [("cached_action_id".to_string(), Value::from(cached.action_id.as_str()))].into(),
696 );
697 return (ActionResult {
698 action_id: action.id.clone(),
699 status: cached.status.clone(),
700 output: cached.output.clone(),
701 error: cached.error.clone(),
702 state_changes: cached.state_changes.clone(),
703 duration_ms: Some(0.0),
704 timestamp: chrono::Utc::now(),
705 }, 0);
706 }
707 }
708
709 {
711 let caps = self.capabilities.read().await;
712 if let Some(ref cap) = *caps {
713 if action.action_type == ActionType::ToolCall {
715 if let Some(ref tool_name) = action.tool {
716 if !cap.tool_allowed(tool_name) {
717 let mut log = self.log.lock().await;
718 log.append(
719 EventKind::ActionRejected,
720 Some(&action.id),
721 Some(proposal_id),
722 HashMap::new(),
723 );
724 return (rejected_result(
725 &action.id,
726 format!("capability denied: tool '{}' not allowed", tool_name),
727 ), 0);
728 }
729 }
730 }
731
732 if action.action_type == ActionType::StateWrite
734 || action.action_type == ActionType::StateRead
735 {
736 if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
737 if !cap.state_key_allowed(key) {
738 let mut log = self.log.lock().await;
739 log.append(
740 EventKind::ActionRejected,
741 Some(&action.id),
742 Some(proposal_id),
743 HashMap::new(),
744 );
745 return (rejected_result(
746 &action.id,
747 format!("capability denied: state key '{}' not allowed", key),
748 ), 0);
749 }
750 }
751 }
752 }
753 }
754
755 let tools = self.tools.read().await;
757 let validation = validate_action(action, &self.state, &tools);
758 drop(tools);
759
760 if !validation.valid() {
761 let error = validation
762 .errors
763 .iter()
764 .map(|e| e.reason.as_str())
765 .collect::<Vec<_>>()
766 .join("; ");
767 let mut log = self.log.lock().await;
768 log.append(
769 EventKind::ActionRejected,
770 Some(&action.id),
771 Some(proposal_id),
772 HashMap::new(),
773 );
774 return (rejected_result(&action.id, error), 0);
775 }
776
777 {
779 let policies = self.policies.read().await;
780 let violations = policies.check(action, &self.state);
781 if !violations.is_empty() {
782 let error = violations
783 .iter()
784 .map(|v| format!("policy '{}': {}", v.policy_name, v.reason))
785 .collect::<Vec<_>>()
786 .join("; ");
787 let mut log = self.log.lock().await;
788 log.append(
789 EventKind::PolicyViolation,
790 Some(&action.id),
791 Some(proposal_id),
792 HashMap::new(),
793 );
794 return (rejected_result(&action.id, error), 0);
795 }
796 }
797
798 {
800 let mut log = self.log.lock().await;
801 log.append(
802 EventKind::ActionValidated,
803 Some(&action.id),
804 Some(proposal_id),
805 HashMap::new(),
806 );
807 }
808
809 let (result, retries) = self.execute_with_retry(action, proposal_id).await;
811
812 if action.idempotent && result.status == ActionStatus::Succeeded {
814 let mut cache = self.idempotency_cache.lock().await;
815 cache.insert(idempotency_key(action), result.clone());
816 }
817
818 (result, retries)
819 }
820
821 async fn execute_with_retry(&self, action: &Action, proposal_id: &str) -> (ActionResult, u32) {
824 let max_attempts = if action.failure_behavior == FailureBehavior::Retry {
825 action.max_retries + 1
826 } else {
827 1
828 };
829
830 let mut last_error: Option<String> = None;
831 let mut retries: u32 = 0;
832
833 for attempt in 0..max_attempts {
834 if attempt > 0 {
835 retries += 1;
836 let delay = RETRY_BASE_DELAY_MS * RETRY_BACKOFF_FACTOR.pow(attempt as u32 - 1);
837 tokio::time::sleep(Duration::from_millis(delay)).await;
838 let mut log = self.log.lock().await;
839 log.append(
840 EventKind::ActionRetrying,
841 Some(&action.id),
842 Some(proposal_id),
843 [("attempt".to_string(), Value::from(attempt + 1))].into(),
844 );
845 }
846
847 {
848 let mut log = self.log.lock().await;
849 log.append(
850 EventKind::ActionExecuting,
851 Some(&action.id),
852 Some(proposal_id),
853 HashMap::new(),
854 );
855 }
856
857 let start = std::time::Instant::now();
858 let transitions_before = self.state.transition_count();
859
860 let exec_result = if let Some(timeout_ms) = action.timeout_ms {
862 match timeout(
863 Duration::from_millis(timeout_ms),
864 self.dispatch(action),
865 )
866 .await
867 {
868 Ok(r) => r,
869 Err(_) => Err(format!("action timed out after {}ms", timeout_ms)),
870 }
871 } else {
872 self.dispatch(action).await
873 };
874
875 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
876
877 match exec_result {
878 Ok(output) => {
879 let mut state_changes: HashMap<String, Value> = HashMap::new();
881 for (key, value) in &action.expected_effects {
882 self.state.set(key, value.clone(), &action.id);
883 state_changes.insert(key.clone(), value.clone());
884 }
885
886 for t in self.state.transitions_since(transitions_before) {
888 if !state_changes.contains_key(&t.key) {
889 if let Some(v) = t.new_value {
890 state_changes.insert(t.key.clone(), v);
891 }
892 }
893 }
894
895 let mut log = self.log.lock().await;
896 log.append(
897 EventKind::ActionSucceeded,
898 Some(&action.id),
899 Some(proposal_id),
900 [("duration_ms".to_string(), Value::from(duration_ms))].into(),
901 );
902
903 if !state_changes.is_empty() {
904 log.append(
905 EventKind::StateChanged,
906 Some(&action.id),
907 Some(proposal_id),
908 [("changes".to_string(), serde_json::to_value(&state_changes).unwrap_or_default())].into(),
909 );
910 }
911
912 return (ActionResult {
913 action_id: action.id.clone(),
914 status: ActionStatus::Succeeded,
915 output: Some(output),
916 error: None,
917 state_changes,
918 duration_ms: Some(duration_ms),
919 timestamp: chrono::Utc::now(),
920 }, retries);
921 }
922 Err(e) => {
923 last_error = Some(e.clone());
924 let mut log = self.log.lock().await;
925 log.append(
926 EventKind::ActionFailed,
927 Some(&action.id),
928 Some(proposal_id),
929 [
930 ("error".to_string(), Value::from(e.as_str())),
931 ("attempt".to_string(), Value::from(attempt + 1)),
932 ]
933 .into(),
934 );
935 }
936 }
937 }
938
939 if action.failure_behavior == FailureBehavior::Skip {
941 return (skipped_result(
942 &action.id,
943 last_error.as_deref().unwrap_or("all attempts exhausted"),
944 ), retries);
945 }
946
947 (ActionResult {
948 action_id: action.id.clone(),
949 status: ActionStatus::Failed,
950 output: None,
951 error: last_error,
952 state_changes: HashMap::new(),
953 duration_ms: None,
954 timestamp: chrono::Utc::now(),
955 }, retries)
956 }
957
958 async fn dispatch(&self, action: &Action) -> Result<Value, String> {
960 match action.action_type {
961 ActionType::ToolCall => {
962 let tool_name = action.tool.as_deref().ok_or("tool_call has no tool")?;
963 let params = Value::Object(
964 action
965 .parameters
966 .iter()
967 .map(|(k, v)| (k.clone(), v.clone()))
968 .collect(),
969 );
970
971 if let Some(cached) = self.result_cache.get(tool_name, ¶ms).await {
973 return Ok(cached);
974 }
975
976 self.rate_limiter.acquire(tool_name).await;
978
979 if matches!(tool_name, "infer" | "infer.grounded" | "embed" | "classify") {
981 if let Some(ref engine) = self.inference_engine {
982 let params = {
985 let should_ground = tool_name == "infer.grounded"
986 || tool_name == "infer";
987 if should_ground {
988 if let Some(ref memgine) = self.memgine {
989 if let Some(prompt) = params.get("prompt").and_then(|v| v.as_str()) {
990 let ctx = { let mut m = memgine.lock().await; m.build_context(prompt) };
991 if !ctx.is_empty() {
992 let mut p = params.clone();
993 if let Some(obj) = p.as_object_mut() {
994 obj.insert("context".to_string(), Value::from(ctx));
995 }
996 p
997 } else {
998 params
999 }
1000 } else {
1001 params
1002 }
1003 } else {
1004 params
1005 }
1006 } else {
1007 params
1008 }
1009 };
1010
1011 let effective_tool = if tool_name == "infer.grounded" { "infer" } else { tool_name };
1013 let result = car_inference::service::execute_tool(engine, effective_tool, ¶ms)
1014 .await
1015 .map_err(|e| e.to_string());
1016
1017 if let Ok(ref value) = result {
1018 self.result_cache
1019 .put(tool_name, ¶ms, value.clone())
1020 .await;
1021 }
1022
1023 return result;
1024 }
1025 }
1026
1027 let executor = {
1028 let guard = self.tool_executor.lock().await;
1029 guard.as_ref().ok_or("no tool executor configured")?.clone()
1030 };
1031 let result = executor.execute(tool_name, ¶ms).await;
1032
1033 if let Ok(ref value) = result {
1035 self.result_cache
1036 .put(tool_name, ¶ms, value.clone())
1037 .await;
1038 }
1039
1040 result
1041 }
1042 ActionType::StateWrite => {
1043 let key = action
1044 .parameters
1045 .get("key")
1046 .and_then(|v| v.as_str())
1047 .ok_or("state_write requires 'key' parameter")?;
1048 let value = action
1049 .parameters
1050 .get("value")
1051 .cloned()
1052 .unwrap_or(Value::Null);
1053 self.state.set(key, value, &action.id);
1054 Ok(Value::from(format!("written: {}", key)))
1055 }
1056 ActionType::StateRead => {
1057 let key = action
1058 .parameters
1059 .get("key")
1060 .and_then(|v| v.as_str())
1061 .ok_or("state_read requires 'key' parameter")?;
1062 Ok(self.state.get(key).unwrap_or(Value::Null))
1063 }
1064 ActionType::Assertion => {
1065 let key = action
1066 .parameters
1067 .get("key")
1068 .and_then(|v| v.as_str())
1069 .ok_or("assertion requires 'key' parameter")?;
1070 let expected = action.parameters.get("expected").cloned().unwrap_or(Value::Null);
1071 let actual = self.state.get(key).unwrap_or(Value::Null);
1072 if actual != expected {
1073 Err(format!(
1074 "assertion failed: state['{}'] = {:?}, expected {:?}",
1075 key, actual, expected
1076 ))
1077 } else {
1078 Ok(serde_json::json!({"asserted": key, "value": actual}))
1079 }
1080 }
1081 }
1082 }
1083
1084 pub async fn save_checkpoint(&self) -> Checkpoint {
1088 let state = self.state.snapshot();
1089 let tools: Vec<String> = self.tools.read().await.keys().cloned().collect();
1090 let log = self.log.lock().await;
1091 let events: Vec<Value> = log
1092 .events()
1093 .iter()
1094 .map(|e| serde_json::to_value(e).unwrap_or_default())
1095 .collect();
1096
1097 Checkpoint {
1098 checkpoint_id: Uuid::new_v4().to_string(),
1099 created_at: chrono::Utc::now(),
1100 state,
1101 events,
1102 tools,
1103 metadata: HashMap::new(),
1104 }
1105 }
1106
1107 pub async fn save_checkpoint_to_file(&self, path: &str) -> Result<(), String> {
1109 let checkpoint = self.save_checkpoint().await;
1110 let json = serde_json::to_string_pretty(&checkpoint)
1111 .map_err(|e| format!("serialize error: {}", e))?;
1112 tokio::fs::write(path, json).await.map_err(|e| format!("write error: {}", e))?;
1113 Ok(())
1114 }
1115
1116 pub async fn load_checkpoint_from_file(&self, path: &str) -> Result<Checkpoint, String> {
1118 let json =
1119 tokio::fs::read_to_string(path).await.map_err(|e| format!("read error: {}", e))?;
1120 let checkpoint: Checkpoint =
1121 serde_json::from_str(&json).map_err(|e| format!("deserialize error: {}", e))?;
1122 self.restore_checkpoint(&checkpoint).await;
1123 Ok(checkpoint)
1124 }
1125
1126 pub async fn restore_checkpoint(&self, checkpoint: &Checkpoint) {
1128 for (key, value) in &checkpoint.state {
1130 self.state.set(key, value.clone(), "checkpoint_restore");
1131 }
1132 let mut tools = self.tools.write().await;
1134 tools.clear();
1135 for tool_name in &checkpoint.tools {
1136 let schema = ToolSchema {
1137 name: tool_name.clone(),
1138 description: String::new(),
1139 parameters: serde_json::Value::Object(Default::default()),
1140 returns: None,
1141 idempotent: false,
1142 cache_ttl_secs: None,
1143 rate_limit: None,
1144 };
1145 tools.insert(tool_name.clone(), schema);
1146 }
1147 }
1148}
1149
1150impl Default for Runtime {
1151 fn default() -> Self {
1152 Self::new()
1153 }
1154}