1use crate::cache::ResultCache;
4use crate::capabilities::CapabilitySet;
5use crate::checkpoint::Checkpoint;
6use crate::rate_limit::{RateLimit, RateLimiter};
7use car_eventlog::{EventKind, EventLog, SpanStatus};
8use car_ir::{
9 build_dag, Action, ActionProposal, ActionResult, ActionStatus, ActionType, CostSummary,
10 FailureBehavior, ProposalResult, ToolSchema,
11};
12use car_policy::PolicyEngine;
13use car_state::StateStore;
14use car_validator::validate_action;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
20use tokio::time::timeout;
21use tracing::instrument;
22use uuid::Uuid;
23
24const RETRY_BASE_DELAY_MS: u64 = 100;
26const RETRY_BACKOFF_FACTOR: u64 = 2;
27
28#[async_trait::async_trait]
36pub trait ReplanCallback: Send + Sync {
37 async fn replan(&self, ctx: &ReplanContext) -> Result<ActionProposal, String>;
38}
39
40#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42pub struct ReplanContext {
43 pub proposal_id: String,
45 pub attempt: u32,
47 pub failed_actions: Vec<FailedActionSummary>,
49 pub completed_action_ids: Vec<String>,
51 pub state_snapshot: HashMap<String, Value>,
53 pub replans_remaining: u32,
55 pub original_source: String,
57 pub original_action_count: usize,
59 pub original_context: HashMap<String, Value>,
61}
62
63#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
65pub struct FailedActionSummary {
66 pub action_id: String,
67 pub tool: Option<String>,
68 pub error: String,
69 pub parameters: HashMap<String, Value>,
70}
71
72#[derive(Debug, Clone)]
74pub struct ReplanConfig {
75 pub max_replans: u32,
77 pub delay_ms: u64,
80 pub verify_before_execute: bool,
84}
85
86impl Default for ReplanConfig {
87 fn default() -> Self {
88 Self {
89 max_replans: 0,
90 delay_ms: 0,
91 verify_before_execute: true,
92 }
93 }
94}
95
96#[async_trait::async_trait]
101pub trait ToolExecutor: Send + Sync {
102 async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String>;
103}
104
105fn idempotency_key(action: &Action) -> String {
107 let sorted: std::collections::BTreeMap<_, _> = action.parameters.iter().collect();
108 let params = serde_json::to_string(&sorted).unwrap_or_default();
109 format!(
110 "{}:{}:{}",
111 serde_json::to_string(&action.action_type).unwrap_or_default(),
112 action.tool.as_deref().unwrap_or(""),
113 params
114 )
115}
116
117fn rejected_result(action_id: &str, error: String) -> ActionResult {
118 ActionResult {
119 action_id: action_id.to_string(),
120 status: ActionStatus::Rejected,
121 output: None,
122 error: Some(error),
123 state_changes: HashMap::new(),
124 duration_ms: None,
125 timestamp: chrono::Utc::now(),
126 }
127}
128
129fn snapshot_relevant_keys(
133 state: &car_state::StateStore,
134 action: &Action,
135) -> HashMap<String, Value> {
136 let mut keys: std::collections::HashSet<&str> = std::collections::HashSet::new();
137 for dep in &action.state_dependencies {
138 keys.insert(dep.as_str());
139 }
140 for key in action.expected_effects.keys() {
141 keys.insert(key.as_str());
142 }
143 if action.action_type == ActionType::StateWrite {
145 if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
146 keys.insert(key);
147 }
148 }
149
150 if keys.is_empty() {
151 return HashMap::new();
153 }
154
155 keys.iter()
156 .filter_map(|&k| state.get(k).map(|v| (k.to_string(), v)))
157 .collect()
158}
159
160fn skipped_result(action_id: &str, reason: &str) -> ActionResult {
161 ActionResult {
162 action_id: action_id.to_string(),
163 status: ActionStatus::Skipped,
164 output: None,
165 error: Some(reason.to_string()),
166 state_changes: HashMap::new(),
167 duration_ms: None,
168 timestamp: chrono::Utc::now(),
169 }
170}
171
172pub const CANCELED_PREFIX: &str = "canceled: ";
179
180fn canceled_result(action_id: &str, reason: &str) -> ActionResult {
185 ActionResult {
186 action_id: action_id.to_string(),
187 status: ActionStatus::Skipped,
188 output: None,
189 error: Some(format!("{}{}", CANCELED_PREFIX, reason)),
190 state_changes: HashMap::new(),
191 duration_ms: None,
192 timestamp: chrono::Utc::now(),
193 }
194}
195
196pub fn format_tool_result(result: &ActionResult) -> String {
198 match result.status {
199 ActionStatus::Succeeded => match &result.output {
200 Some(v) => serde_json::to_string(v).unwrap_or_else(|_| v.to_string()),
201 None => String::new(),
202 },
203 ActionStatus::Rejected => format!("[REJECTED] {}", result.error.as_deref().unwrap_or("")),
204 ActionStatus::Failed => format!("[FAILED] {}", result.error.as_deref().unwrap_or("")),
205 _ => format!(
206 "[{:?}] {}",
207 result.status,
208 result.error.as_deref().unwrap_or("")
209 ),
210 }
211}
212
213#[derive(Debug, Clone)]
215pub struct CostBudget {
216 pub max_tool_calls: Option<u32>,
217 pub max_duration_ms: Option<f64>,
218 pub max_actions: Option<u32>,
219}
220
221pub struct Runtime {
235 pub state: Arc<StateStore>,
236 pub tools: Arc<TokioRwLock<HashMap<String, ToolSchema>>>,
237 pub policies: Arc<TokioRwLock<PolicyEngine>>,
238 pub session_policies: Arc<TokioRwLock<HashMap<String, Arc<TokioRwLock<PolicyEngine>>>>>,
249 pub log: Arc<TokioMutex<EventLog>>,
250 pub rate_limiter: Arc<RateLimiter>,
251 pub result_cache: Arc<ResultCache>,
252 tool_executor: TokioMutex<Option<Arc<dyn ToolExecutor>>>,
253 idempotency_cache: TokioMutex<HashMap<String, ActionResult>>,
254 cost_budget: TokioRwLock<Option<CostBudget>>,
255 capabilities: TokioRwLock<Option<CapabilitySet>>,
256 inference_engine: Option<Arc<car_inference::InferenceEngine>>,
257 memgine: Option<Arc<TokioMutex<car_memgine::MemgineEngine>>>,
259 auto_distill: bool,
261 trajectory_store: Option<Arc<car_memgine::TrajectoryStore>>,
263 replan_callback: TokioMutex<Option<Arc<dyn ReplanCallback>>>,
265 replan_config: TokioRwLock<ReplanConfig>,
267 pub registry: Arc<crate::registry::ToolRegistry>,
269}
270
271impl Runtime {
272 pub fn new() -> Self {
273 Self {
274 state: Arc::new(StateStore::new()),
275 tools: Arc::new(TokioRwLock::new(HashMap::new())),
276 policies: Arc::new(TokioRwLock::new(PolicyEngine::new())),
277 session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
278 log: Arc::new(TokioMutex::new(EventLog::new())),
279 rate_limiter: Arc::new(RateLimiter::new()),
280 result_cache: Arc::new(ResultCache::new()),
281 tool_executor: TokioMutex::new(None),
282 idempotency_cache: TokioMutex::new(HashMap::new()),
283 cost_budget: TokioRwLock::new(None),
284 capabilities: TokioRwLock::new(None),
285 inference_engine: None,
286 memgine: None,
287 auto_distill: false,
288 trajectory_store: None,
289 replan_callback: TokioMutex::new(None),
290 replan_config: TokioRwLock::new(ReplanConfig::default()),
291 registry: Arc::new(crate::registry::ToolRegistry::new()),
292 }
293 }
294
295 pub fn with_shared(
298 state: Arc<StateStore>,
299 log: Arc<TokioMutex<EventLog>>,
300 policies: Arc<TokioRwLock<PolicyEngine>>,
301 ) -> Self {
302 Self {
303 state,
304 tools: Arc::new(TokioRwLock::new(HashMap::new())),
305 policies,
306 session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
311 log,
312 rate_limiter: Arc::new(RateLimiter::new()),
313 result_cache: Arc::new(ResultCache::new()),
314 tool_executor: TokioMutex::new(None),
315 idempotency_cache: TokioMutex::new(HashMap::new()),
316 cost_budget: TokioRwLock::new(None),
317 capabilities: TokioRwLock::new(None),
318 inference_engine: None,
319 memgine: None,
320 auto_distill: false,
321 trajectory_store: None,
322 replan_callback: TokioMutex::new(None),
323 replan_config: TokioRwLock::new(ReplanConfig::default()),
324 registry: Arc::new(crate::registry::ToolRegistry::new()),
325 }
326 }
327
328 pub async fn open_session(&self) -> String {
346 let id = Uuid::new_v4().to_string();
347 let mut sessions = self.session_policies.write().await;
348 sessions.insert(id.clone(), Arc::new(TokioRwLock::new(PolicyEngine::new())));
349 id
350 }
351
352 pub async fn close_session(&self, session_id: &str) -> bool {
357 let mut sessions = self.session_policies.write().await;
358 sessions.remove(session_id).is_some()
359 }
360
361 pub async fn register_policy_in_session(
370 &self,
371 session_id: &str,
372 name: &str,
373 check: car_policy::PolicyCheck,
374 description: &str,
375 ) -> Result<(), String> {
376 let engine = {
377 let sessions = self.session_policies.read().await;
378 sessions
379 .get(session_id)
380 .cloned()
381 .ok_or_else(|| format!("unknown session id '{session_id}'"))?
382 };
383 let mut engine = engine.write().await;
384 engine.register(name, check, description);
385 Ok(())
386 }
387
388 pub async fn session_exists(&self, session_id: &str) -> bool {
392 self.session_policies.read().await.contains_key(session_id)
393 }
394
395 pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
398 self.inference_engine = Some(engine);
399 if let Ok(mut tools) = self.tools.try_write() {
401 for schema in car_inference::service::all_schemas() {
402 tools.insert(schema.name.clone(), schema);
403 }
404 }
405 self
406 }
407
408 pub fn with_learning(
412 mut self,
413 memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>,
414 auto_distill: bool,
415 ) -> Self {
416 self.memgine = Some(memgine);
417 self.auto_distill = auto_distill;
418 self
419 }
420
421 pub fn with_memgine(self, memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>) -> Self {
423 self.with_learning(memgine, true)
424 }
425
426 pub fn with_trajectory_store(mut self, store: Arc<car_memgine::TrajectoryStore>) -> Self {
428 self.trajectory_store = Some(store);
429 self
430 }
431
432 pub fn with_executor(self, executor: Arc<dyn ToolExecutor>) -> Self {
433 if let Ok(mut guard) = self.tool_executor.try_lock() {
435 *guard = Some(executor);
436 }
437 self
438 }
439
440 pub async fn set_executor(&self, executor: Arc<dyn ToolExecutor>) {
443 *self.tool_executor.lock().await = Some(executor);
444 }
445
446 pub fn with_event_log(mut self, log: EventLog) -> Self {
447 self.log = Arc::new(TokioMutex::new(log));
448 self
449 }
450
451 pub fn with_replan(self, callback: Arc<dyn ReplanCallback>, config: ReplanConfig) -> Self {
453 if let Ok(mut guard) = self.replan_callback.try_lock() {
454 *guard = Some(callback);
455 }
456 if let Ok(mut guard) = self.replan_config.try_write() {
457 *guard = config;
458 }
459 self
460 }
461
462 pub async fn set_replan_callback(&self, callback: Arc<dyn ReplanCallback>) {
464 *self.replan_callback.lock().await = Some(callback);
465 }
466
467 pub async fn set_replan_config(&self, config: ReplanConfig) {
469 *self.replan_config.write().await = config;
470 }
471
472 pub async fn register_tool(&self, name: &str) {
474 let schema = ToolSchema {
475 name: name.to_string(),
476 description: String::new(),
477 parameters: serde_json::Value::Object(Default::default()),
478 returns: None,
479 idempotent: false,
480 cache_ttl_secs: None,
481 rate_limit: None,
482 };
483 self.register_tool_schema(schema).await;
484 }
485
486 pub async fn register_tool_schema(&self, schema: ToolSchema) {
488 if let Some(ttl) = schema.cache_ttl_secs {
490 self.result_cache.enable_caching(&schema.name, ttl).await;
491 }
492 if let Some(ref rl) = schema.rate_limit {
494 self.rate_limiter
495 .set_limit(
496 &schema.name,
497 RateLimit {
498 max_calls: rl.max_calls,
499 interval_secs: rl.interval_secs,
500 },
501 )
502 .await;
503 }
504 self.tools.write().await.insert(schema.name.clone(), schema);
505 }
506
507 pub async fn register_tool_entry(&self, entry: crate::registry::ToolEntry) {
511 let schema = entry.schema.clone();
512 self.registry.register(entry).await;
513 self.register_tool_schema(schema).await;
514 }
515
516 pub async fn register_agent_basics(&self) {
521 for entry in crate::agent_basics::entries() {
522 self.register_tool_entry(entry).await;
523 }
524 }
525
526 pub async fn tool_schemas(&self) -> Vec<ToolSchema> {
528 self.tools.read().await.values().cloned().collect()
529 }
530
531 pub async fn set_cost_budget(&self, budget: CostBudget) {
533 *self.cost_budget.write().await = Some(budget);
534 }
535
536 pub async fn set_capabilities(&self, caps: CapabilitySet) {
538 *self.capabilities.write().await = Some(caps);
539 }
540
541 pub async fn set_rate_limit(&self, tool: &str, max_calls: u32, interval_secs: f64) {
547 self.rate_limiter
548 .set_limit(
549 tool,
550 RateLimit {
551 max_calls,
552 interval_secs,
553 },
554 )
555 .await;
556 }
557
558 pub async fn enable_tool_cache(&self, tool: &str, ttl_secs: u64) {
560 self.result_cache.enable_caching(tool, ttl_secs).await;
561 }
562
563 #[instrument(
573 name = "proposal.execute",
574 skip_all,
575 fields(
576 proposal_id = %proposal.id,
577 action_count = proposal.actions.len(),
578 )
579 )]
580 pub async fn execute(&self, proposal: &ActionProposal) -> ProposalResult {
581 let token = tokio_util::sync::CancellationToken::new();
584 self.execute_with_cancel(proposal, &token).await
585 }
586
587 pub async fn execute_with_session(
599 &self,
600 proposal: &ActionProposal,
601 session_id: &str,
602 ) -> ProposalResult {
603 let token = tokio_util::sync::CancellationToken::new();
604 self.execute_with_session_and_cancel(proposal, session_id, &token).await
605 }
606
607 pub async fn execute_with_session_and_cancel(
611 &self,
612 proposal: &ActionProposal,
613 session_id: &str,
614 cancel: &tokio_util::sync::CancellationToken,
615 ) -> ProposalResult {
616 self.execute_with_optional_session(proposal, Some(session_id), cancel).await
617 }
618
619 pub async fn execute_with_cancel(
647 &self,
648 proposal: &ActionProposal,
649 cancel: &tokio_util::sync::CancellationToken,
650 ) -> ProposalResult {
651 self.execute_with_optional_session(proposal, None, cancel).await
652 }
653
654 async fn execute_with_optional_session(
660 &self,
661 proposal: &ActionProposal,
662 session_id: Option<&str>,
663 cancel: &tokio_util::sync::CancellationToken,
664 ) -> ProposalResult {
665 let config = self.replan_config.read().await.clone();
666 let mut current_proposal = proposal.clone();
667 let mut attempt: u32 = 0;
668
669 loop {
670 let (result, state_before_map) = self
671 .execute_inner_with_cancel(¤t_proposal, Some(cancel), session_id)
672 .await;
673
674 let aborted = result
676 .results
677 .iter()
678 .any(|r| r.status == ActionStatus::Failed);
679 if !aborted || attempt >= config.max_replans {
680 if aborted && attempt > 0 {
681 let mut log = self.log.lock().await;
683 log.append(
684 EventKind::ReplanExhausted,
685 None,
686 Some(&proposal.id),
687 [("attempts".to_string(), Value::from(attempt))].into(),
688 );
689 }
690
691 let outcome = if !aborted {
693 if attempt > 0 {
694 car_memgine::TrajectoryOutcome::ReplanSuccess
695 } else {
696 car_memgine::TrajectoryOutcome::Success
697 }
698 } else if attempt > 0 {
699 car_memgine::TrajectoryOutcome::ReplanExhausted
700 } else {
701 car_memgine::TrajectoryOutcome::Failed
702 };
703 if let Some(err) = self.persist_trajectory(
704 proposal,
705 ¤t_proposal,
706 &result,
707 outcome,
708 attempt,
709 &state_before_map,
710 ) {
711 let mut log = self.log.lock().await;
712 log.append(
713 EventKind::ActionFailed,
714 None,
715 Some(&proposal.id),
716 [(
717 "trajectory_persist_error".to_string(),
718 Value::from(err.as_str()),
719 )]
720 .into(),
721 );
722 }
723
724 return result;
725 }
726
727 let callback = {
729 let guard = self.replan_callback.lock().await;
730 guard.clone()
731 };
732 let Some(callback) = callback else {
733 if let Some(err) = self.persist_trajectory(
735 proposal,
736 ¤t_proposal,
737 &result,
738 car_memgine::TrajectoryOutcome::Failed,
739 attempt,
740 &state_before_map,
741 ) {
742 let mut log = self.log.lock().await;
743 log.append(
744 EventKind::ActionFailed,
745 None,
746 Some(&proposal.id),
747 [(
748 "trajectory_persist_error".to_string(),
749 Value::from(err.as_str()),
750 )]
751 .into(),
752 );
753 }
754 return result;
755 };
756
757 let failed_actions: Vec<FailedActionSummary> = result
759 .results
760 .iter()
761 .filter(|r| r.status == ActionStatus::Failed)
762 .map(|r| {
763 let action = current_proposal
764 .actions
765 .iter()
766 .find(|a| a.id == r.action_id);
767 FailedActionSummary {
768 action_id: r.action_id.clone(),
769 tool: action.and_then(|a| a.tool.clone()),
770 error: r.error.clone().unwrap_or_default(),
771 parameters: action.map(|a| a.parameters.clone()).unwrap_or_default(),
772 }
773 })
774 .collect();
775
776 let completed_action_ids: Vec<String> = result
777 .results
778 .iter()
779 .filter(|r| r.status == ActionStatus::Succeeded)
780 .map(|r| r.action_id.clone())
781 .collect();
782
783 let ctx = ReplanContext {
784 proposal_id: proposal.id.clone(),
785 attempt: attempt + 1,
786 failed_actions,
787 completed_action_ids,
788 state_snapshot: self.state.snapshot(),
789 replans_remaining: config.max_replans.saturating_sub(attempt + 1),
790 original_source: proposal.source.clone(),
791 original_action_count: proposal.actions.len(),
792 original_context: proposal.context.clone(),
793 };
794
795 if config.delay_ms > 0 {
797 tokio::time::sleep(Duration::from_millis(config.delay_ms)).await;
798 }
799
800 {
802 let mut log = self.log.lock().await;
803 log.append(
804 EventKind::ReplanAttempted,
805 None,
806 Some(&proposal.id),
807 [
808 ("attempt".to_string(), Value::from(attempt + 1)),
809 (
810 "failed_count".to_string(),
811 Value::from(ctx.failed_actions.len()),
812 ),
813 ]
814 .into(),
815 );
816 }
817
818 match callback.replan(&ctx).await {
820 Ok(new_proposal) => {
821 if config.verify_before_execute {
823 let tools_guard = self.tools.read().await;
824 let tool_names: std::collections::HashSet<String> =
825 tools_guard.keys().cloned().collect();
826 drop(tools_guard);
827
828 let current_state = self.state.snapshot();
829 let vr = car_verify::verify(
830 &new_proposal,
831 Some(¤t_state),
832 Some(&tool_names),
833 100,
834 );
835 if !vr.valid {
836 let error_msgs: Vec<String> = vr
837 .issues
838 .iter()
839 .filter(|i| i.severity == "error")
840 .map(|i| i.message.clone())
841 .collect();
842 let mut log = self.log.lock().await;
843 log.append(
844 EventKind::ReplanRejected,
845 None,
846 Some(&proposal.id),
847 [
848 ("errors".to_string(), Value::from(error_msgs.join("; "))),
849 ("attempt".to_string(), Value::from(attempt + 1)),
850 ]
851 .into(),
852 );
853 attempt += 1;
855 continue;
856 }
857 }
858
859 {
861 let mut log = self.log.lock().await;
862 log.append(
863 EventKind::ReplanProposalReceived,
864 None,
865 Some(&proposal.id),
866 [
867 ("attempt".to_string(), Value::from(attempt + 1)),
868 (
869 "new_action_count".to_string(),
870 Value::from(new_proposal.actions.len()),
871 ),
872 ]
873 .into(),
874 );
875 }
876 current_proposal = new_proposal;
877 attempt += 1;
878 }
879 Err(e) => {
880 let mut log = self.log.lock().await;
882 log.append(
883 EventKind::ReplanExhausted,
884 None,
885 Some(&proposal.id),
886 [
887 ("reason".to_string(), Value::from("callback_error")),
888 ("error".to_string(), Value::from(e.as_str())),
889 ("attempt".to_string(), Value::from(attempt + 1)),
890 ]
891 .into(),
892 );
893 if let Some(err) = self.persist_trajectory(
894 proposal,
895 ¤t_proposal,
896 &result,
897 car_memgine::TrajectoryOutcome::Failed,
898 attempt,
899 &state_before_map,
900 ) {
901 log.append(
902 EventKind::ActionFailed,
903 None,
904 Some(&proposal.id),
905 [(
906 "trajectory_persist_error".to_string(),
907 Value::from(err.as_str()),
908 )]
909 .into(),
910 );
911 }
912 return result;
913 }
914 }
915 }
916 }
917
918 fn persist_trajectory(
920 &self,
921 proposal: &ActionProposal,
922 current_proposal: &ActionProposal,
923 result: &ProposalResult,
924 outcome: car_memgine::TrajectoryOutcome,
925 attempt: u32,
926 state_before_map: &HashMap<String, HashMap<String, Value>>,
927 ) -> Option<String> {
928 let store = self.trajectory_store.as_ref()?;
929
930 let trace_events: Vec<car_memgine::TraceEvent> = result
931 .results
932 .iter()
933 .map(|r| {
934 let kind = match r.status {
935 ActionStatus::Succeeded => "action_succeeded",
936 ActionStatus::Failed => "action_failed",
937 ActionStatus::Rejected => "action_rejected",
938 ActionStatus::Skipped => "action_skipped",
939 _ => "unknown",
940 };
941 let tool = current_proposal
942 .actions
943 .iter()
944 .find(|a| a.id == r.action_id)
945 .and_then(|a| a.tool.clone());
946 let reward = match r.status {
947 ActionStatus::Succeeded => Some(1.0),
948 ActionStatus::Failed => Some(0.0),
949 ActionStatus::Rejected => Some(0.0),
950 ActionStatus::Skipped => None,
951 _ => None,
952 };
953 car_memgine::TraceEvent {
954 kind: kind.to_string(),
955 action_id: Some(r.action_id.clone()),
956 tool,
957 data: r
958 .error
959 .as_ref()
960 .map(|e| serde_json::json!({"error": e}))
961 .unwrap_or(serde_json::json!({})),
962 duration_ms: r.duration_ms,
963 state_before: state_before_map.get(&r.action_id).cloned(),
964 state_after: if !r.state_changes.is_empty() {
965 Some(r.state_changes.clone())
966 } else {
967 None
968 },
969 reward,
970 }
971 })
972 .collect();
973
974 let trajectory = car_memgine::Trajectory {
975 proposal_id: proposal.id.clone(),
976 source: proposal.source.clone(),
977 action_count: current_proposal.actions.len(),
978 events: trace_events,
979 outcome,
980 timestamp: chrono::Utc::now(),
981 duration_ms: result.cost.total_duration_ms,
982 replan_attempts: attempt,
983 };
984
985 match store.append(&trajectory) {
986 Ok(()) => None,
987 Err(e) => Some(e.to_string()),
988 }
989 }
990
991 pub async fn plan_and_execute(
997 &self,
998 candidates: &[ActionProposal],
999 planner_config: Option<car_planner::PlannerConfig>,
1000 feedback: Option<&car_planner::ToolFeedback>,
1001 ) -> ProposalResult {
1002 if candidates.is_empty() {
1003 return ProposalResult {
1004 proposal_id: "empty".to_string(),
1005 results: vec![],
1006 cost: car_ir::CostSummary::default(),
1007 };
1008 }
1009
1010 let planner = car_planner::Planner::new(planner_config.unwrap_or_default());
1012 let tools_guard = self.tools.read().await;
1013 let tool_names: std::collections::HashSet<String> = tools_guard.keys().cloned().collect();
1014 drop(tools_guard);
1015
1016 let pre_plan_snapshot = self.state.snapshot();
1017 let pre_plan_transitions = self.state.transition_count();
1018 let ranked = planner.rank_with_feedback(
1019 candidates,
1020 Some(&pre_plan_snapshot),
1021 Some(&tool_names),
1022 feedback,
1023 );
1024
1025 let mut first_failure: Option<ProposalResult> = None;
1027 for scored in &ranked {
1028 if !scored.valid {
1029 continue;
1030 }
1031
1032 self.state
1035 .restore(pre_plan_snapshot.clone(), pre_plan_transitions);
1036
1037 let proposal = &candidates[scored.index];
1038 let result = self.execute(proposal).await;
1039
1040 if result.all_succeeded() {
1041 return result;
1042 }
1043
1044 tracing::info!(
1045 proposal_id = %proposal.id,
1046 score = scored.score,
1047 "plan_and_execute: proposal failed, trying next candidate"
1048 );
1049
1050 if first_failure.is_none() {
1051 first_failure = Some(result);
1052 }
1053 }
1054
1055 first_failure.unwrap_or_else(|| ProposalResult {
1057 proposal_id: candidates[0].id.clone(),
1058 results: vec![],
1059 cost: car_ir::CostSummary::default(),
1060 })
1061 }
1062
1063 async fn execute_inner_with_cancel(
1071 &self,
1072 proposal: &ActionProposal,
1073 cancel: Option<&tokio_util::sync::CancellationToken>,
1074 session_id: Option<&str>,
1075 ) -> (ProposalResult, HashMap<String, HashMap<String, Value>>) {
1076 let trace_id = Uuid::new_v4().to_string();
1078
1079 let root_span_id = {
1081 let mut log = self.log.lock().await;
1082 log.begin_span(
1083 "proposal.execute",
1084 &trace_id,
1085 None,
1086 [("proposal_id".to_string(), Value::from(proposal.id.as_str()))].into(),
1087 )
1088 };
1089
1090 {
1092 let mut log = self.log.lock().await;
1093 log.append(
1094 EventKind::ProposalReceived,
1095 None,
1096 Some(&proposal.id),
1097 [
1098 ("source".to_string(), Value::from(proposal.source.as_str())),
1099 (
1100 "action_count".to_string(),
1101 Value::from(proposal.actions.len()),
1102 ),
1103 ]
1104 .into(),
1105 );
1106 }
1107
1108 {
1110 let caps = self.capabilities.read().await;
1111 if let Some(ref cap) = *caps {
1112 if !cap.actions_within_budget(proposal.actions.len() as u32) {
1113 let mut action_results = Vec::new();
1114 for action in &proposal.actions {
1115 action_results.push(rejected_result(
1116 &action.id,
1117 format!(
1118 "capability denied: proposal has {} actions, max allowed is {:?}",
1119 proposal.actions.len(),
1120 cap.max_actions
1121 ),
1122 ));
1123 }
1124 return (
1125 ProposalResult {
1126 proposal_id: proposal.id.clone(),
1127 results: action_results,
1128 cost: CostSummary::default(),
1129 },
1130 HashMap::new(),
1131 );
1132 }
1133 }
1134 }
1135
1136 let snapshot = self.state.snapshot();
1138 let transition_count = self.state.transition_count();
1139
1140 let mut results: Vec<ActionResult> = Vec::new();
1141 let mut state_before_map: HashMap<String, HashMap<String, Value>> = HashMap::new();
1143 let mut aborted = false;
1144 let mut budget_exceeded = false;
1145 let mut total_retries: u32 = 0;
1146
1147 let mut running_tool_calls: u32 = 0;
1149 let mut running_actions: u32 = 0;
1150 let mut running_duration_ms: f64 = 0.0;
1151
1152 let budget = self.cost_budget.read().await.clone();
1154
1155 let levels = build_dag(&proposal.actions);
1157
1158 let mut canceled = false;
1159 for level in &levels {
1160 if !canceled {
1166 if let Some(token) = cancel {
1167 if token.is_cancelled() {
1168 canceled = true;
1169 }
1170 }
1171 }
1172 if canceled {
1173 for &idx in level {
1174 results.push(canceled_result(
1175 &proposal.actions[idx].id,
1176 "cancellation requested by caller",
1177 ));
1178 }
1179 continue;
1180 }
1181 if aborted || budget_exceeded {
1182 let skip_reason = if budget_exceeded {
1183 "cost budget exceeded"
1184 } else {
1185 "skipped due to earlier abort"
1186 };
1187 for &idx in level {
1188 results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1189 }
1190 continue;
1191 }
1192
1193 let has_abort = level
1195 .iter()
1196 .any(|&i| proposal.actions[i].failure_behavior == FailureBehavior::Abort);
1197
1198 if level.len() == 1 || has_abort {
1199 for &idx in level {
1201 if aborted || budget_exceeded {
1202 let skip_reason = if budget_exceeded {
1203 "cost budget exceeded"
1204 } else {
1205 "skipped due to abort"
1206 };
1207 results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1208 continue;
1209 }
1210
1211 if let Some(ref b) = budget {
1213 if let Some(max) = b.max_actions {
1214 if running_actions >= max {
1215 budget_exceeded = true;
1216 results.push(skipped_result(
1217 &proposal.actions[idx].id,
1218 "cost budget exceeded",
1219 ));
1220 continue;
1221 }
1222 }
1223 if let Some(max) = b.max_tool_calls {
1224 if proposal.actions[idx].action_type == ActionType::ToolCall
1225 && running_tool_calls >= max
1226 {
1227 budget_exceeded = true;
1228 results.push(skipped_result(
1229 &proposal.actions[idx].id,
1230 "cost budget exceeded",
1231 ));
1232 continue;
1233 }
1234 }
1235 if let Some(max) = b.max_duration_ms {
1236 if running_duration_ms >= max {
1237 budget_exceeded = true;
1238 results.push(skipped_result(
1239 &proposal.actions[idx].id,
1240 "cost budget exceeded",
1241 ));
1242 continue;
1243 }
1244 }
1245 }
1246
1247 state_before_map.insert(
1248 proposal.actions[idx].id.clone(),
1249 snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1250 );
1251 let (ar, action_retries) = self
1252 .process_action(
1253 &proposal.actions[idx],
1254 &proposal.id,
1255 &trace_id,
1256 &root_span_id,
1257 session_id,
1258 )
1259 .await;
1260 total_retries += action_retries;
1261
1262 if ar.status == ActionStatus::Succeeded
1264 && proposal.actions[idx].action_type == ActionType::ToolCall
1265 {
1266 running_tool_calls += 1;
1267 }
1268 if ar.status != ActionStatus::Skipped {
1269 running_actions += 1;
1270 }
1271 if let Some(d) = ar.duration_ms {
1272 running_duration_ms += d;
1273 }
1274
1275 if ar.status == ActionStatus::Failed
1276 && proposal.actions[idx].failure_behavior == FailureBehavior::Abort
1277 {
1278 aborted = true;
1279 }
1280 results.push(ar);
1281 }
1282 } else {
1283 for &idx in level {
1286 state_before_map.insert(
1287 proposal.actions[idx].id.clone(),
1288 snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1289 );
1290 }
1291 let futs: Vec<_> = level
1292 .iter()
1293 .map(|&idx| {
1294 self.process_action(
1295 &proposal.actions[idx],
1296 &proposal.id,
1297 &trace_id,
1298 &root_span_id,
1299 session_id,
1300 )
1301 })
1302 .collect();
1303 let level_results = futures::future::join_all(futs).await;
1304
1305 for (i, (ar, action_retries)) in level_results.into_iter().enumerate() {
1306 let idx = level[i];
1307 total_retries += action_retries;
1308 if ar.status == ActionStatus::Succeeded
1309 && proposal.actions[idx].action_type == ActionType::ToolCall
1310 {
1311 running_tool_calls += 1;
1312 }
1313 if ar.status != ActionStatus::Skipped {
1314 running_actions += 1;
1315 }
1316 if let Some(d) = ar.duration_ms {
1317 running_duration_ms += d;
1318 }
1319 results.push(ar);
1320 }
1321 }
1322 }
1323
1324 if aborted {
1326 self.state.restore(snapshot.clone(), transition_count);
1327
1328 let mut log = self.log.lock().await;
1329 log.append(
1330 EventKind::StateSnapshot,
1331 None,
1332 Some(&proposal.id),
1333 [(
1334 "state".to_string(),
1335 serde_json::to_value(&snapshot).unwrap_or_default(),
1336 )]
1337 .into(),
1338 );
1339 log.append(
1340 EventKind::StateRollback,
1341 None,
1342 Some(&proposal.id),
1343 [(
1344 "rolled_back_to".to_string(),
1345 Value::from("pre-proposal snapshot"),
1346 )]
1347 .into(),
1348 );
1349
1350 let mut cache = self.idempotency_cache.lock().await;
1352 for r in &results {
1353 if r.status == ActionStatus::Succeeded {
1354 for action in &proposal.actions {
1355 if action.id == r.action_id && action.idempotent {
1356 cache.remove(&idempotency_key(action));
1357 }
1358 }
1359 }
1360 }
1361 }
1362
1363 let action_order: HashMap<String, usize> = proposal
1365 .actions
1366 .iter()
1367 .enumerate()
1368 .map(|(i, a)| (a.id.clone(), i))
1369 .collect();
1370 results.sort_by_key(|r| {
1371 action_order
1372 .get(&r.action_id)
1373 .copied()
1374 .unwrap_or(usize::MAX)
1375 });
1376
1377 let mut cost = CostSummary::default();
1379 for r in &results {
1380 let action = action_order
1381 .get(&r.action_id)
1382 .and_then(|&i| proposal.actions.get(i));
1383 match r.status {
1384 ActionStatus::Succeeded => {
1385 cost.actions_executed += 1;
1386 if let Some(a) = action {
1387 if a.action_type == ActionType::ToolCall {
1388 cost.tool_calls += 1;
1389 }
1390 }
1391 }
1392 ActionStatus::Failed | ActionStatus::Rejected => {
1393 cost.actions_executed += 1;
1394 }
1395 ActionStatus::Skipped => {
1396 cost.actions_skipped += 1;
1397 }
1398 _ => {}
1399 }
1400 if let Some(d) = r.duration_ms {
1401 cost.total_duration_ms += d;
1402 }
1403 }
1404
1405 cost.retries = total_retries;
1407
1408 {
1410 let span_status = if aborted {
1411 SpanStatus::Error
1412 } else {
1413 SpanStatus::Ok
1414 };
1415 let mut log = self.log.lock().await;
1416 log.end_span(&root_span_id, span_status);
1417 }
1418
1419 let proposal_result = ProposalResult {
1420 proposal_id: proposal.id.clone(),
1421 results,
1422 cost,
1423 };
1424
1425 if self.auto_distill {
1427 if let Some(ref memgine) = self.memgine {
1428 let trace_events: Vec<car_memgine::TraceEvent> = proposal_result
1430 .results
1431 .iter()
1432 .map(|r| {
1433 let kind = match r.status {
1434 ActionStatus::Succeeded => "action_succeeded",
1435 ActionStatus::Failed => "action_failed",
1436 ActionStatus::Rejected => "action_rejected",
1437 ActionStatus::Skipped => "action_skipped",
1438 _ => "unknown",
1439 };
1440 let tool = proposal
1442 .actions
1443 .iter()
1444 .find(|a| a.id == r.action_id)
1445 .and_then(|a| a.tool.clone());
1446 let mut data = serde_json::Map::new();
1447 if let Some(ref e) = r.error {
1448 data.insert("error".into(), Value::from(e.as_str()));
1449 }
1450 if let Some(ref o) = r.output {
1451 data.insert("output".into(), o.clone());
1452 }
1453 car_memgine::TraceEvent {
1454 kind: kind.to_string(),
1455 action_id: Some(r.action_id.clone()),
1456 tool,
1457 data: Value::Object(data),
1458 duration_ms: r.duration_ms,
1459 reward: match r.status {
1460 ActionStatus::Succeeded => Some(1.0),
1461 ActionStatus::Failed | ActionStatus::Rejected => Some(0.0),
1462 _ => None,
1463 },
1464 ..Default::default()
1465 }
1466 })
1467 .collect();
1468
1469 let mut engine = memgine.lock().await;
1470 let skills = engine.distill_skills(&trace_events).await;
1471 if !skills.is_empty() {
1472 let count = skills.len();
1473 engine.ingest_distilled_skills(&skills);
1474
1475 let mut log = self.log.lock().await;
1477 log.append(
1478 EventKind::SkillDistilled,
1479 None,
1480 Some(&proposal_result.proposal_id),
1481 [
1482 ("skills_count".to_string(), Value::from(count)),
1483 (
1484 "skill_names".to_string(),
1485 Value::from(
1486 skills.iter().map(|s| s.name.as_str()).collect::<Vec<_>>(),
1487 ),
1488 ),
1489 ]
1490 .into(),
1491 );
1492
1493 let threshold = engine.evolution_threshold();
1495 let domains = engine.domains_needing_evolution(threshold);
1496 for domain in &domains {
1497 let failed: Vec<car_memgine::TraceEvent> = trace_events
1499 .iter()
1500 .filter(|e| {
1501 matches!(e.kind.as_str(), "action_failed" | "action_rejected")
1502 })
1503 .cloned()
1504 .collect();
1505 if !failed.is_empty() {
1506 let evolved = engine.evolve_skills(&failed, domain).await;
1507 if !evolved.is_empty() {
1508 log.append(
1509 EventKind::EvolutionTriggered,
1510 None,
1511 Some(&proposal_result.proposal_id),
1512 [
1513 ("domain".to_string(), Value::from(domain.as_str())),
1514 ("new_skills".to_string(), Value::from(evolved.len())),
1515 ]
1516 .into(),
1517 );
1518 }
1519 }
1520 }
1521 }
1522 }
1523 }
1524
1525 (proposal_result, state_before_map)
1526 }
1527
1528 async fn process_action(
1531 &self,
1532 action: &Action,
1533 proposal_id: &str,
1534 trace_id: &str,
1535 parent_span_id: &str,
1536 session_id: Option<&str>,
1537 ) -> (ActionResult, u32) {
1538 let action_type_name = serde_json::to_string(&action.action_type)
1540 .unwrap_or_default()
1541 .trim_matches('"')
1542 .to_string();
1543 let span_name = format!("action.{}", action_type_name);
1544
1545 let action_span_id = {
1547 let mut attrs: HashMap<String, Value> = HashMap::new();
1548 attrs.insert("action_id".to_string(), Value::from(action.id.as_str()));
1549 if let Some(ref tool) = action.tool {
1550 attrs.insert("tool".to_string(), Value::from(tool.as_str()));
1551 }
1552 let mut log = self.log.lock().await;
1553 log.begin_span(&span_name, trace_id, Some(parent_span_id), attrs)
1554 };
1555
1556 let (result, retries) = self
1558 .process_action_inner(action, proposal_id, session_id)
1559 .await;
1560
1561 let span_status = match result.status {
1563 ActionStatus::Succeeded => SpanStatus::Ok,
1564 ActionStatus::Failed | ActionStatus::Rejected => SpanStatus::Error,
1565 _ => SpanStatus::Unset,
1566 };
1567 {
1568 let mut log = self.log.lock().await;
1569 log.end_span(&action_span_id, span_status);
1570 }
1571
1572 (result, retries)
1573 }
1574
1575 #[instrument(
1578 name = "action.process",
1579 skip_all,
1580 fields(
1581 action_id = %action.id,
1582 action_type = ?action.action_type,
1583 tool = action.tool.as_deref().unwrap_or("none"),
1584 )
1585 )]
1586 async fn process_action_inner(
1587 &self,
1588 action: &Action,
1589 proposal_id: &str,
1590 session_id: Option<&str>,
1591 ) -> (ActionResult, u32) {
1592 if action.idempotent {
1594 let key = idempotency_key(action);
1595 let cache = self.idempotency_cache.lock().await;
1596 if let Some(cached) = cache.get(&key) {
1597 let mut log = self.log.lock().await;
1598 log.append(
1599 EventKind::ActionDeduplicated,
1600 Some(&action.id),
1601 Some(proposal_id),
1602 [(
1603 "cached_action_id".to_string(),
1604 Value::from(cached.action_id.as_str()),
1605 )]
1606 .into(),
1607 );
1608 return (
1609 ActionResult {
1610 action_id: action.id.clone(),
1611 status: cached.status.clone(),
1612 output: cached.output.clone(),
1613 error: cached.error.clone(),
1614 state_changes: cached.state_changes.clone(),
1615 duration_ms: Some(0.0),
1616 timestamp: chrono::Utc::now(),
1617 },
1618 0,
1619 );
1620 }
1621 }
1622
1623 {
1625 let caps = self.capabilities.read().await;
1626 if let Some(ref cap) = *caps {
1627 if action.action_type == ActionType::ToolCall {
1629 if let Some(ref tool_name) = action.tool {
1630 if !cap.tool_allowed(tool_name) {
1631 let mut log = self.log.lock().await;
1632 log.append(
1633 EventKind::ActionRejected,
1634 Some(&action.id),
1635 Some(proposal_id),
1636 HashMap::new(),
1637 );
1638 return (
1639 rejected_result(
1640 &action.id,
1641 format!("capability denied: tool '{}' not allowed", tool_name),
1642 ),
1643 0,
1644 );
1645 }
1646 }
1647 }
1648
1649 if action.action_type == ActionType::StateWrite
1651 || action.action_type == ActionType::StateRead
1652 {
1653 if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
1654 if !cap.state_key_allowed(key) {
1655 let mut log = self.log.lock().await;
1656 log.append(
1657 EventKind::ActionRejected,
1658 Some(&action.id),
1659 Some(proposal_id),
1660 HashMap::new(),
1661 );
1662 return (
1663 rejected_result(
1664 &action.id,
1665 format!("capability denied: state key '{}' not allowed", key),
1666 ),
1667 0,
1668 );
1669 }
1670 }
1671 }
1672 }
1673 }
1674
1675 let tools = self.tools.read().await;
1677 let validation = validate_action(action, &self.state, &tools);
1678 drop(tools);
1679
1680 if !validation.valid() {
1681 let error = validation
1682 .errors
1683 .iter()
1684 .map(|e| e.reason.as_str())
1685 .collect::<Vec<_>>()
1686 .join("; ");
1687 let mut log = self.log.lock().await;
1688 log.append(
1689 EventKind::ActionRejected,
1690 Some(&action.id),
1691 Some(proposal_id),
1692 HashMap::new(),
1693 );
1694 return (rejected_result(&action.id, error), 0);
1695 }
1696
1697 {
1703 let mut violations = {
1704 let policies = self.policies.read().await;
1705 policies.check(action, &self.state)
1706 };
1707 if let Some(sid) = session_id {
1708 let session_engine = {
1713 let sessions = self.session_policies.read().await;
1714 sessions.get(sid).cloned()
1715 };
1716 if let Some(engine) = session_engine {
1717 let engine = engine.read().await;
1718 violations.extend(engine.check(action, &self.state));
1719 } else {
1720 let mut log = self.log.lock().await;
1726 log.append(
1727 EventKind::PolicyViolation,
1728 Some(&action.id),
1729 Some(proposal_id),
1730 HashMap::new(),
1731 );
1732 return (
1733 rejected_result(
1734 &action.id,
1735 format!(
1736 "unknown session id '{sid}' — open one via Runtime::open_session before executing under a session"
1737 ),
1738 ),
1739 0,
1740 );
1741 }
1742 }
1743 if !violations.is_empty() {
1744 let error = violations
1745 .iter()
1746 .map(|v| format!("policy '{}': {}", v.policy_name, v.reason))
1747 .collect::<Vec<_>>()
1748 .join("; ");
1749 let mut log = self.log.lock().await;
1750 log.append(
1751 EventKind::PolicyViolation,
1752 Some(&action.id),
1753 Some(proposal_id),
1754 HashMap::new(),
1755 );
1756 return (rejected_result(&action.id, error), 0);
1757 }
1758 }
1759
1760 {
1762 let mut log = self.log.lock().await;
1763 log.append(
1764 EventKind::ActionValidated,
1765 Some(&action.id),
1766 Some(proposal_id),
1767 HashMap::new(),
1768 );
1769 }
1770
1771 let (result, retries) = self.execute_with_retry(action, proposal_id).await;
1773
1774 if action.idempotent && result.status == ActionStatus::Succeeded {
1776 let mut cache = self.idempotency_cache.lock().await;
1777 cache.insert(idempotency_key(action), result.clone());
1778 }
1779
1780 tracing::info!(
1781 status = ?result.status,
1782 duration_ms = result.duration_ms,
1783 "action completed"
1784 );
1785
1786 (result, retries)
1787 }
1788
1789 async fn execute_with_retry(&self, action: &Action, proposal_id: &str) -> (ActionResult, u32) {
1792 let max_attempts = if action.failure_behavior == FailureBehavior::Retry {
1793 action.max_retries + 1
1794 } else {
1795 1
1796 };
1797
1798 let mut last_error: Option<String> = None;
1799 let mut retries: u32 = 0;
1800
1801 for attempt in 0..max_attempts {
1802 if attempt > 0 {
1803 retries += 1;
1804 let delay = RETRY_BASE_DELAY_MS * RETRY_BACKOFF_FACTOR.pow(attempt as u32 - 1);
1805 tokio::time::sleep(Duration::from_millis(delay)).await;
1806 let mut log = self.log.lock().await;
1807 log.append(
1808 EventKind::ActionRetrying,
1809 Some(&action.id),
1810 Some(proposal_id),
1811 [("attempt".to_string(), Value::from(attempt + 1))].into(),
1812 );
1813 }
1814
1815 {
1816 let mut log = self.log.lock().await;
1817 log.append(
1818 EventKind::ActionExecuting,
1819 Some(&action.id),
1820 Some(proposal_id),
1821 HashMap::new(),
1822 );
1823 }
1824
1825 let start = std::time::Instant::now();
1826 let transitions_before = self.state.transition_count();
1827
1828 let exec_result = if let Some(timeout_ms) = action.timeout_ms {
1830 match timeout(Duration::from_millis(timeout_ms), self.dispatch(action)).await {
1831 Ok(r) => r,
1832 Err(_) => Err(format!("action timed out after {}ms", timeout_ms)),
1833 }
1834 } else {
1835 self.dispatch(action).await
1836 };
1837
1838 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
1839
1840 match exec_result {
1841 Ok(output) => {
1842 let mut state_changes: HashMap<String, Value> = HashMap::new();
1844 for (key, value) in &action.expected_effects {
1845 self.state.set(key, value.clone(), &action.id);
1846 state_changes.insert(key.clone(), value.clone());
1847 }
1848
1849 for t in self.state.transitions_since(transitions_before) {
1851 if !state_changes.contains_key(&t.key) {
1852 if let Some(v) = t.new_value {
1853 state_changes.insert(t.key.clone(), v);
1854 }
1855 }
1856 }
1857
1858 let mut log = self.log.lock().await;
1859 log.append(
1860 EventKind::ActionSucceeded,
1861 Some(&action.id),
1862 Some(proposal_id),
1863 [("duration_ms".to_string(), Value::from(duration_ms))].into(),
1864 );
1865
1866 if !state_changes.is_empty() {
1867 log.append(
1868 EventKind::StateChanged,
1869 Some(&action.id),
1870 Some(proposal_id),
1871 [(
1872 "changes".to_string(),
1873 serde_json::to_value(&state_changes).unwrap_or_default(),
1874 )]
1875 .into(),
1876 );
1877 }
1878
1879 return (
1880 ActionResult {
1881 action_id: action.id.clone(),
1882 status: ActionStatus::Succeeded,
1883 output: Some(output),
1884 error: None,
1885 state_changes,
1886 duration_ms: Some(duration_ms),
1887 timestamp: chrono::Utc::now(),
1888 },
1889 retries,
1890 );
1891 }
1892 Err(e) => {
1893 last_error = Some(e.clone());
1894 let mut log = self.log.lock().await;
1895 log.append(
1896 EventKind::ActionFailed,
1897 Some(&action.id),
1898 Some(proposal_id),
1899 [
1900 ("error".to_string(), Value::from(e.as_str())),
1901 ("attempt".to_string(), Value::from(attempt + 1)),
1902 ]
1903 .into(),
1904 );
1905 }
1906 }
1907 }
1908
1909 if action.failure_behavior == FailureBehavior::Skip {
1911 return (
1912 skipped_result(
1913 &action.id,
1914 last_error.as_deref().unwrap_or("all attempts exhausted"),
1915 ),
1916 retries,
1917 );
1918 }
1919
1920 (
1921 ActionResult {
1922 action_id: action.id.clone(),
1923 status: ActionStatus::Failed,
1924 output: None,
1925 error: last_error,
1926 state_changes: HashMap::new(),
1927 duration_ms: None,
1928 timestamp: chrono::Utc::now(),
1929 },
1930 retries,
1931 )
1932 }
1933
1934 async fn dispatch(&self, action: &Action) -> Result<Value, String> {
1936 match action.action_type {
1937 ActionType::ToolCall => {
1938 let tool_name = action.tool.as_deref().ok_or("tool_call has no tool")?;
1939 let params = Value::Object(
1940 action
1941 .parameters
1942 .iter()
1943 .map(|(k, v)| (k.clone(), v.clone()))
1944 .collect(),
1945 );
1946
1947 if let Some(cached) = self.result_cache.get(tool_name, ¶ms).await {
1949 return Ok(cached);
1950 }
1951
1952 self.rate_limiter.acquire(tool_name).await;
1954
1955 if matches!(
1957 tool_name,
1958 "infer" | "infer.grounded" | "embed" | "classify" | "transcribe" | "synthesize"
1959 ) {
1960 if let Some(ref engine) = self.inference_engine {
1961 let params = {
1964 let should_ground =
1965 tool_name == "infer.grounded" || tool_name == "infer";
1966 if should_ground {
1967 if let Some(ref memgine) = self.memgine {
1968 if let Some(prompt) =
1969 params.get("prompt").and_then(|v| v.as_str())
1970 {
1971 let ctx = {
1972 let mut m = memgine.lock().await;
1973 m.build_context(prompt)
1974 };
1975 if !ctx.is_empty() {
1976 let mut p = params.clone();
1977 if let Some(obj) = p.as_object_mut() {
1978 obj.insert("context".to_string(), Value::from(ctx));
1979 }
1980 p
1981 } else {
1982 params
1983 }
1984 } else {
1985 params
1986 }
1987 } else {
1988 params
1989 }
1990 } else {
1991 params
1992 }
1993 };
1994
1995 let effective_tool = if tool_name == "infer.grounded" {
1997 "infer"
1998 } else {
1999 tool_name
2000 };
2001 let result =
2002 car_inference::service::execute_tool(engine, effective_tool, ¶ms)
2003 .await
2004 .map_err(|e| e.to_string());
2005
2006 if let Ok(ref value) = result {
2007 self.result_cache
2008 .put(tool_name, ¶ms, value.clone())
2009 .await;
2010 }
2011
2012 return result;
2013 }
2014 }
2015
2016 if tool_name == "memory.consolidate" {
2018 if let Some(ref memgine) = self.memgine {
2019 let report = {
2020 let mut m = memgine.lock().await;
2021 m.consolidate().await
2022 };
2023 {
2025 let mut log = self.log.lock().await;
2026 log.append(
2027 EventKind::Consolidated,
2028 None,
2029 None,
2030 [
2031 (
2032 "expired_pruned".to_string(),
2033 Value::from(report.expired_pruned),
2034 ),
2035 (
2036 "superseded_gc".to_string(),
2037 Value::from(report.superseded_gc),
2038 ),
2039 (
2040 "stale_embeddings_removed".to_string(),
2041 Value::from(report.stale_embeddings_removed),
2042 ),
2043 (
2044 "nodes_embedded".to_string(),
2045 Value::from(report.nodes_embedded),
2046 ),
2047 (
2048 "domains_evolved".to_string(),
2049 Value::from(report.domains_evolved.clone()),
2050 ),
2051 ("total_nodes".to_string(), Value::from(report.total_nodes)),
2052 ("total_edges".to_string(), Value::from(report.total_edges)),
2053 ]
2054 .into(),
2055 );
2056 }
2057 return Ok(serde_json::to_value(&report).unwrap_or(Value::Null));
2058 } else {
2059 return Err(
2060 "memory.consolidate requires memgine (attach with with_learning)"
2061 .into(),
2062 );
2063 }
2064 }
2065
2066 let configured = {
2072 let guard = self.tool_executor.lock().await;
2073 guard.as_ref().cloned()
2074 };
2075
2076 if let Some(ref executor) = configured {
2077 let result = executor.execute(tool_name, ¶ms).await;
2078 let fall_through = matches!(&result, Err(e) if e.starts_with("unknown tool"));
2079 if !fall_through {
2080 if let Ok(ref value) = result {
2081 self.result_cache
2082 .put(tool_name, ¶ms, value.clone())
2083 .await;
2084 }
2085 return result;
2086 }
2087 }
2088
2089 if let Some(result) = crate::agent_basics::execute(tool_name, ¶ms).await {
2090 if let Ok(ref value) = result {
2091 self.result_cache
2092 .put(tool_name, ¶ms, value.clone())
2093 .await;
2094 }
2095 return result;
2096 }
2097
2098 Err(format!("no handler for tool '{}'", tool_name))
2099 }
2100 ActionType::StateWrite => {
2101 let key = action
2102 .parameters
2103 .get("key")
2104 .and_then(|v| v.as_str())
2105 .ok_or("state_write requires 'key' parameter")?;
2106 let value = action
2107 .parameters
2108 .get("value")
2109 .cloned()
2110 .unwrap_or(Value::Null);
2111 self.state.set(key, value, &action.id);
2112 Ok(Value::from(format!("written: {}", key)))
2113 }
2114 ActionType::StateRead => {
2115 let key = action
2116 .parameters
2117 .get("key")
2118 .and_then(|v| v.as_str())
2119 .ok_or("state_read requires 'key' parameter")?;
2120 Ok(self.state.get(key).unwrap_or(Value::Null))
2121 }
2122 ActionType::Assertion => {
2123 let key = action
2124 .parameters
2125 .get("key")
2126 .and_then(|v| v.as_str())
2127 .ok_or("assertion requires 'key' parameter")?;
2128 let expected = action
2129 .parameters
2130 .get("expected")
2131 .cloned()
2132 .unwrap_or(Value::Null);
2133 let actual = self.state.get(key).unwrap_or(Value::Null);
2134 if actual != expected {
2135 Err(format!(
2136 "assertion failed: state['{}'] = {:?}, expected {:?}",
2137 key, actual, expected
2138 ))
2139 } else {
2140 Ok(serde_json::json!({"asserted": key, "value": actual}))
2141 }
2142 }
2143 }
2144 }
2145
2146 pub async fn save_checkpoint(&self) -> Checkpoint {
2150 let state = self.state.snapshot();
2151 let tools: Vec<String> = self.tools.read().await.keys().cloned().collect();
2152 let log = self.log.lock().await;
2153 let events: Vec<Value> = log
2154 .events()
2155 .iter()
2156 .map(|e| serde_json::to_value(e).unwrap_or_default())
2157 .collect();
2158
2159 Checkpoint {
2160 checkpoint_id: Uuid::new_v4().to_string(),
2161 created_at: chrono::Utc::now(),
2162 state,
2163 events,
2164 tools,
2165 metadata: HashMap::new(),
2166 }
2167 }
2168
2169 pub async fn save_checkpoint_to_file(&self, path: &str) -> Result<(), String> {
2171 let checkpoint = self.save_checkpoint().await;
2172 let json = serde_json::to_string_pretty(&checkpoint)
2173 .map_err(|e| format!("serialize error: {}", e))?;
2174 tokio::fs::write(path, json)
2175 .await
2176 .map_err(|e| format!("write error: {}", e))?;
2177 Ok(())
2178 }
2179
2180 pub async fn load_checkpoint_from_file(&self, path: &str) -> Result<Checkpoint, String> {
2182 let json = tokio::fs::read_to_string(path)
2183 .await
2184 .map_err(|e| format!("read error: {}", e))?;
2185 let checkpoint: Checkpoint =
2186 serde_json::from_str(&json).map_err(|e| format!("deserialize error: {}", e))?;
2187 self.restore_checkpoint(&checkpoint).await;
2188 Ok(checkpoint)
2189 }
2190
2191 pub async fn restore_checkpoint(&self, checkpoint: &Checkpoint) {
2193 self.state.replace_all(checkpoint.state.clone());
2195 self.idempotency_cache.lock().await.clear();
2198 let mut tools = self.tools.write().await;
2200 tools.clear();
2201 for tool_name in &checkpoint.tools {
2202 let schema = ToolSchema {
2203 name: tool_name.clone(),
2204 description: String::new(),
2205 parameters: serde_json::Value::Object(Default::default()),
2206 returns: None,
2207 idempotent: false,
2208 cache_ttl_secs: None,
2209 rate_limit: None,
2210 };
2211 tools.insert(tool_name.clone(), schema);
2212 }
2213 }
2214
2215 pub async fn register_subprocess_tool(
2220 &self,
2221 name: &str,
2222 tool: crate::subprocess::SubprocessTool,
2223 ) {
2224 use crate::subprocess::SubprocessToolExecutor;
2225
2226 let schema = ToolSchema {
2227 name: name.to_string(),
2228 description: format!("Subprocess tool: {}", tool.command),
2229 parameters: serde_json::Value::Object(Default::default()),
2230 returns: None,
2231 idempotent: false,
2232 cache_ttl_secs: None,
2233 rate_limit: None,
2234 };
2235 self.register_tool_schema(schema).await;
2236
2237 let mut guard = self.tool_executor.lock().await;
2238 let mut executor = match guard.take() {
2239 Some(existing) => {
2240 let mut sub = SubprocessToolExecutor::new();
2241 sub = sub.with_fallback(existing);
2242 sub
2243 }
2244 None => SubprocessToolExecutor::new(),
2245 };
2246 executor.register(name, tool);
2247 *guard = Some(std::sync::Arc::new(executor));
2248 }
2249}
2250
2251impl Default for Runtime {
2252 fn default() -> Self {
2253 Self::new()
2254 }
2255}