1use crate::cache::ResultCache;
4use crate::capabilities::CapabilitySet;
5use crate::checkpoint::Checkpoint;
6use crate::rate_limit::{RateLimit, RateLimiter};
7use car_eventlog::{EventKind, EventLog, SpanStatus};
8use car_ir::{
9 build_dag, Action, ActionProposal, ActionResult, ActionStatus, ActionType, CostSummary,
10 FailureBehavior, ProposalResult, ToolSchema,
11};
12use car_policy::PolicyEngine;
13use car_state::StateStore;
14use car_validator::validate_action;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
20use tokio::time::timeout;
21use tracing::instrument;
22use uuid::Uuid;
23
24const RETRY_BASE_DELAY_MS: u64 = 100;
26const RETRY_BACKOFF_FACTOR: u64 = 2;
27
28#[async_trait::async_trait]
36pub trait ReplanCallback: Send + Sync {
37 async fn replan(&self, ctx: &ReplanContext) -> Result<ActionProposal, String>;
38}
39
40#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42pub struct ReplanContext {
43 pub proposal_id: String,
45 pub attempt: u32,
47 pub failed_actions: Vec<FailedActionSummary>,
49 pub completed_action_ids: Vec<String>,
51 pub state_snapshot: HashMap<String, Value>,
53 pub replans_remaining: u32,
55 pub original_source: String,
57 pub original_action_count: usize,
59 pub original_context: HashMap<String, Value>,
61}
62
63#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
65pub struct FailedActionSummary {
66 pub action_id: String,
67 pub tool: Option<String>,
68 pub error: String,
69 pub parameters: HashMap<String, Value>,
70}
71
72#[derive(Debug, Clone)]
74pub struct ReplanConfig {
75 pub max_replans: u32,
77 pub delay_ms: u64,
80 pub verify_before_execute: bool,
84}
85
86impl Default for ReplanConfig {
87 fn default() -> Self {
88 Self {
89 max_replans: 0,
90 delay_ms: 0,
91 verify_before_execute: true,
92 }
93 }
94}
95
96#[async_trait::async_trait]
101pub trait ToolExecutor: Send + Sync {
102 async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String>;
103
104 async fn execute_with_action(
113 &self,
114 tool: &str,
115 params: &Value,
116 _action_id: &str,
117 ) -> Result<Value, String> {
118 self.execute(tool, params).await
119 }
120}
121
122fn idempotency_key(action: &Action) -> String {
124 let sorted: std::collections::BTreeMap<_, _> = action.parameters.iter().collect();
125 let params = serde_json::to_string(&sorted).unwrap_or_default();
126 format!(
127 "{}:{}:{}",
128 serde_json::to_string(&action.action_type).unwrap_or_default(),
129 action.tool.as_deref().unwrap_or(""),
130 params
131 )
132}
133
134fn rejected_result(action_id: &str, error: String) -> ActionResult {
135 ActionResult {
136 action_id: action_id.to_string(),
137 status: ActionStatus::Rejected,
138 output: None,
139 error: Some(error),
140 state_changes: HashMap::new(),
141 duration_ms: None,
142 timestamp: chrono::Utc::now(),
143 }
144}
145
146fn snapshot_relevant_keys(
150 state: &car_state::StateStore,
151 action: &Action,
152) -> HashMap<String, Value> {
153 let mut keys: std::collections::HashSet<&str> = std::collections::HashSet::new();
154 for dep in &action.state_dependencies {
155 keys.insert(dep.as_str());
156 }
157 for key in action.expected_effects.keys() {
158 keys.insert(key.as_str());
159 }
160 if action.action_type == ActionType::StateWrite {
162 if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
163 keys.insert(key);
164 }
165 }
166
167 if keys.is_empty() {
168 return HashMap::new();
170 }
171
172 keys.iter()
173 .filter_map(|&k| state.get(k).map(|v| (k.to_string(), v)))
174 .collect()
175}
176
177fn skipped_result(action_id: &str, reason: &str) -> ActionResult {
178 ActionResult {
179 action_id: action_id.to_string(),
180 status: ActionStatus::Skipped,
181 output: None,
182 error: Some(reason.to_string()),
183 state_changes: HashMap::new(),
184 duration_ms: None,
185 timestamp: chrono::Utc::now(),
186 }
187}
188
189pub const CANCELED_PREFIX: &str = "canceled: ";
196
197fn canceled_result(action_id: &str, reason: &str) -> ActionResult {
202 ActionResult {
203 action_id: action_id.to_string(),
204 status: ActionStatus::Skipped,
205 output: None,
206 error: Some(format!("{}{}", CANCELED_PREFIX, reason)),
207 state_changes: HashMap::new(),
208 duration_ms: None,
209 timestamp: chrono::Utc::now(),
210 }
211}
212
213pub fn format_tool_result(result: &ActionResult) -> String {
215 match result.status {
216 ActionStatus::Succeeded => match &result.output {
217 Some(v) => serde_json::to_string(v).unwrap_or_else(|_| v.to_string()),
218 None => String::new(),
219 },
220 ActionStatus::Rejected => format!("[REJECTED] {}", result.error.as_deref().unwrap_or("")),
221 ActionStatus::Failed => format!("[FAILED] {}", result.error.as_deref().unwrap_or("")),
222 _ => format!(
223 "[{:?}] {}",
224 result.status,
225 result.error.as_deref().unwrap_or("")
226 ),
227 }
228}
229
230#[derive(Debug, Clone)]
232pub struct CostBudget {
233 pub max_tool_calls: Option<u32>,
234 pub max_duration_ms: Option<f64>,
235 pub max_actions: Option<u32>,
236}
237
238pub struct Runtime {
252 pub state: Arc<StateStore>,
253 pub tools: Arc<TokioRwLock<HashMap<String, ToolSchema>>>,
254 pub policies: Arc<TokioRwLock<PolicyEngine>>,
255 pub session_policies: Arc<TokioRwLock<HashMap<String, Arc<TokioRwLock<PolicyEngine>>>>>,
266 pub log: Arc<TokioMutex<EventLog>>,
267 pub rate_limiter: Arc<RateLimiter>,
268 pub result_cache: Arc<ResultCache>,
269 tool_executor: TokioMutex<Option<Arc<dyn ToolExecutor>>>,
270 idempotency_cache: TokioMutex<HashMap<String, ActionResult>>,
271 cost_budget: TokioRwLock<Option<CostBudget>>,
272 capabilities: TokioRwLock<Option<CapabilitySet>>,
273 inference_engine: Option<Arc<car_inference::InferenceEngine>>,
274 memgine: Option<Arc<TokioMutex<car_memgine::MemgineEngine>>>,
276 auto_distill: bool,
278 trajectory_store: Option<Arc<car_memgine::TrajectoryStore>>,
280 replan_callback: TokioMutex<Option<Arc<dyn ReplanCallback>>>,
282 replan_config: TokioRwLock<ReplanConfig>,
284 pub registry: Arc<crate::registry::ToolRegistry>,
286}
287
288impl Runtime {
289 pub fn new() -> Self {
290 Self {
291 state: Arc::new(StateStore::new()),
292 tools: Arc::new(TokioRwLock::new(HashMap::new())),
293 policies: Arc::new(TokioRwLock::new(PolicyEngine::new())),
294 session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
295 log: Arc::new(TokioMutex::new(EventLog::new())),
296 rate_limiter: Arc::new(RateLimiter::new()),
297 result_cache: Arc::new(ResultCache::new()),
298 tool_executor: TokioMutex::new(None),
299 idempotency_cache: TokioMutex::new(HashMap::new()),
300 cost_budget: TokioRwLock::new(None),
301 capabilities: TokioRwLock::new(None),
302 inference_engine: None,
303 memgine: None,
304 auto_distill: false,
305 trajectory_store: None,
306 replan_callback: TokioMutex::new(None),
307 replan_config: TokioRwLock::new(ReplanConfig::default()),
308 registry: Arc::new(crate::registry::ToolRegistry::new()),
309 }
310 }
311
312 pub fn with_shared(
315 state: Arc<StateStore>,
316 log: Arc<TokioMutex<EventLog>>,
317 policies: Arc<TokioRwLock<PolicyEngine>>,
318 ) -> Self {
319 Self {
320 state,
321 tools: Arc::new(TokioRwLock::new(HashMap::new())),
322 policies,
323 session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
328 log,
329 rate_limiter: Arc::new(RateLimiter::new()),
330 result_cache: Arc::new(ResultCache::new()),
331 tool_executor: TokioMutex::new(None),
332 idempotency_cache: TokioMutex::new(HashMap::new()),
333 cost_budget: TokioRwLock::new(None),
334 capabilities: TokioRwLock::new(None),
335 inference_engine: None,
336 memgine: None,
337 auto_distill: false,
338 trajectory_store: None,
339 replan_callback: TokioMutex::new(None),
340 replan_config: TokioRwLock::new(ReplanConfig::default()),
341 registry: Arc::new(crate::registry::ToolRegistry::new()),
342 }
343 }
344
345 pub async fn open_session(&self) -> String {
363 let id = Uuid::new_v4().to_string();
364 let mut sessions = self.session_policies.write().await;
365 sessions.insert(id.clone(), Arc::new(TokioRwLock::new(PolicyEngine::new())));
366 id
367 }
368
369 pub async fn close_session(&self, session_id: &str) -> bool {
374 let mut sessions = self.session_policies.write().await;
375 sessions.remove(session_id).is_some()
376 }
377
378 pub async fn register_policy_in_session(
387 &self,
388 session_id: &str,
389 name: &str,
390 check: car_policy::PolicyCheck,
391 description: &str,
392 ) -> Result<(), String> {
393 let engine = {
394 let sessions = self.session_policies.read().await;
395 sessions
396 .get(session_id)
397 .cloned()
398 .ok_or_else(|| format!("unknown session id '{session_id}'"))?
399 };
400 let mut engine = engine.write().await;
401 engine.register(name, check, description);
402 Ok(())
403 }
404
405 pub async fn session_exists(&self, session_id: &str) -> bool {
409 self.session_policies.read().await.contains_key(session_id)
410 }
411
412 pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
415 self.inference_engine = Some(engine);
416 if let Ok(mut tools) = self.tools.try_write() {
418 for schema in car_inference::service::all_schemas() {
419 tools.insert(schema.name.clone(), schema);
420 }
421 }
422 self
423 }
424
425 pub fn with_learning(
429 mut self,
430 memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>,
431 auto_distill: bool,
432 ) -> Self {
433 self.memgine = Some(memgine);
434 self.auto_distill = auto_distill;
435 self
436 }
437
438 pub fn with_memgine(self, memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>) -> Self {
440 self.with_learning(memgine, true)
441 }
442
443 pub fn with_trajectory_store(mut self, store: Arc<car_memgine::TrajectoryStore>) -> Self {
445 self.trajectory_store = Some(store);
446 self
447 }
448
449 pub fn with_executor(self, executor: Arc<dyn ToolExecutor>) -> Self {
450 if let Ok(mut guard) = self.tool_executor.try_lock() {
452 *guard = Some(executor);
453 }
454 self
455 }
456
457 pub async fn set_executor(&self, executor: Arc<dyn ToolExecutor>) {
460 *self.tool_executor.lock().await = Some(executor);
461 }
462
463 pub fn with_event_log(mut self, log: EventLog) -> Self {
464 self.log = Arc::new(TokioMutex::new(log));
465 self
466 }
467
468 pub fn with_replan(self, callback: Arc<dyn ReplanCallback>, config: ReplanConfig) -> Self {
470 if let Ok(mut guard) = self.replan_callback.try_lock() {
471 *guard = Some(callback);
472 }
473 if let Ok(mut guard) = self.replan_config.try_write() {
474 *guard = config;
475 }
476 self
477 }
478
479 pub async fn set_replan_callback(&self, callback: Arc<dyn ReplanCallback>) {
481 *self.replan_callback.lock().await = Some(callback);
482 }
483
484 pub async fn set_replan_config(&self, config: ReplanConfig) {
486 *self.replan_config.write().await = config;
487 }
488
489 pub async fn register_tool(&self, name: &str) {
491 let schema = ToolSchema {
492 name: name.to_string(),
493 description: String::new(),
494 parameters: serde_json::Value::Object(Default::default()),
495 returns: None,
496 idempotent: false,
497 cache_ttl_secs: None,
498 rate_limit: None,
499 };
500 self.register_tool_schema(schema).await;
501 }
502
503 pub async fn register_tool_schema(&self, schema: ToolSchema) {
505 if let Some(ttl) = schema.cache_ttl_secs {
507 self.result_cache.enable_caching(&schema.name, ttl).await;
508 }
509 if let Some(ref rl) = schema.rate_limit {
511 self.rate_limiter
512 .set_limit(
513 &schema.name,
514 RateLimit {
515 max_calls: rl.max_calls,
516 interval_secs: rl.interval_secs,
517 },
518 )
519 .await;
520 }
521 self.tools.write().await.insert(schema.name.clone(), schema);
522 }
523
524 pub async fn register_tool_entry(&self, entry: crate::registry::ToolEntry) {
528 let schema = entry.schema.clone();
529 self.registry.register(entry).await;
530 self.register_tool_schema(schema).await;
531 }
532
533 pub async fn register_agent_basics(&self) {
538 for entry in crate::agent_basics::entries() {
539 self.register_tool_entry(entry).await;
540 }
541 }
542
543 pub async fn tool_schemas(&self) -> Vec<ToolSchema> {
545 self.tools.read().await.values().cloned().collect()
546 }
547
548 pub async fn set_cost_budget(&self, budget: CostBudget) {
550 *self.cost_budget.write().await = Some(budget);
551 }
552
553 pub async fn set_capabilities(&self, caps: CapabilitySet) {
555 *self.capabilities.write().await = Some(caps);
556 }
557
558 pub async fn set_rate_limit(&self, tool: &str, max_calls: u32, interval_secs: f64) {
564 self.rate_limiter
565 .set_limit(
566 tool,
567 RateLimit {
568 max_calls,
569 interval_secs,
570 },
571 )
572 .await;
573 }
574
575 pub async fn enable_tool_cache(&self, tool: &str, ttl_secs: u64) {
577 self.result_cache.enable_caching(tool, ttl_secs).await;
578 }
579
580 #[instrument(
590 name = "proposal.execute",
591 skip_all,
592 fields(
593 proposal_id = %proposal.id,
594 action_count = proposal.actions.len(),
595 )
596 )]
597 pub async fn execute(&self, proposal: &ActionProposal) -> ProposalResult {
598 let token = tokio_util::sync::CancellationToken::new();
601 self.execute_with_cancel(proposal, &token).await
602 }
603
604 pub async fn execute_with_session(
616 &self,
617 proposal: &ActionProposal,
618 session_id: &str,
619 ) -> ProposalResult {
620 let token = tokio_util::sync::CancellationToken::new();
621 self.execute_with_session_and_cancel(proposal, session_id, &token)
622 .await
623 }
624
625 pub async fn execute_with_session_and_cancel(
629 &self,
630 proposal: &ActionProposal,
631 session_id: &str,
632 cancel: &tokio_util::sync::CancellationToken,
633 ) -> ProposalResult {
634 self.execute_with_optional_session(proposal, Some(session_id), None, cancel)
635 .await
636 }
637
638 pub async fn execute_with_cancel(
666 &self,
667 proposal: &ActionProposal,
668 cancel: &tokio_util::sync::CancellationToken,
669 ) -> ProposalResult {
670 self.execute_with_optional_session(proposal, None, None, cancel)
671 .await
672 }
673
674 pub async fn execute_scoped(
691 &self,
692 proposal: &ActionProposal,
693 scope: &crate::scope::RuntimeScope,
694 ) -> ProposalResult {
695 let token = tokio_util::sync::CancellationToken::new();
696 self.execute_scoped_with_cancel(proposal, scope, &token)
697 .await
698 }
699
700 pub async fn execute_scoped_with_cancel(
705 &self,
706 proposal: &ActionProposal,
707 scope: &crate::scope::RuntimeScope,
708 cancel: &tokio_util::sync::CancellationToken,
709 ) -> ProposalResult {
710 self.execute_with_optional_session(proposal, None, Some(scope), cancel)
711 .await
712 }
713
714 async fn execute_with_optional_session(
720 &self,
721 proposal: &ActionProposal,
722 session_id: Option<&str>,
723 scope: Option<&crate::scope::RuntimeScope>,
724 cancel: &tokio_util::sync::CancellationToken,
725 ) -> ProposalResult {
726 let config = self.replan_config.read().await.clone();
727 let mut current_proposal = proposal.clone();
728 let mut attempt: u32 = 0;
729
730 if let Some(s) = scope {
736 if !s.is_unscoped() {
737 let mut props: HashMap<String, Value> = HashMap::new();
738 if let Some(cid) = &s.caller_id {
739 props.insert("caller_id".to_string(), Value::from(cid.as_str()));
740 }
741 if let Some(tid) = &s.tenant_id {
742 props.insert("tenant_id".to_string(), Value::from(tid.as_str()));
743 }
744 if !s.claims.is_empty() {
745 if let Ok(claims_json) = serde_json::to_value(&s.claims) {
746 props.insert("claims".to_string(), claims_json);
747 }
748 }
749 let mut log = self.log.lock().await;
750 log.append(EventKind::SessionScope, None, Some(&proposal.id), props);
751 }
752 }
753
754 loop {
755 let (result, state_before_map) = self
756 .execute_inner_with_cancel(¤t_proposal, Some(cancel), session_id, scope)
757 .await;
758
759 let aborted = result
761 .results
762 .iter()
763 .any(|r| r.status == ActionStatus::Failed);
764 if !aborted || attempt >= config.max_replans {
765 if aborted && attempt > 0 {
766 let mut log = self.log.lock().await;
768 log.append(
769 EventKind::ReplanExhausted,
770 None,
771 Some(&proposal.id),
772 [("attempts".to_string(), Value::from(attempt))].into(),
773 );
774 }
775
776 let outcome = if !aborted {
778 if attempt > 0 {
779 car_memgine::TrajectoryOutcome::ReplanSuccess
780 } else {
781 car_memgine::TrajectoryOutcome::Success
782 }
783 } else if attempt > 0 {
784 car_memgine::TrajectoryOutcome::ReplanExhausted
785 } else {
786 car_memgine::TrajectoryOutcome::Failed
787 };
788 if let Some(err) = self.persist_trajectory(
789 proposal,
790 ¤t_proposal,
791 &result,
792 outcome,
793 attempt,
794 &state_before_map,
795 ) {
796 let mut log = self.log.lock().await;
797 log.append(
798 EventKind::ActionFailed,
799 None,
800 Some(&proposal.id),
801 [(
802 "trajectory_persist_error".to_string(),
803 Value::from(err.as_str()),
804 )]
805 .into(),
806 );
807 }
808
809 return result;
810 }
811
812 let callback = {
814 let guard = self.replan_callback.lock().await;
815 guard.clone()
816 };
817 let Some(callback) = callback else {
818 if let Some(err) = self.persist_trajectory(
820 proposal,
821 ¤t_proposal,
822 &result,
823 car_memgine::TrajectoryOutcome::Failed,
824 attempt,
825 &state_before_map,
826 ) {
827 let mut log = self.log.lock().await;
828 log.append(
829 EventKind::ActionFailed,
830 None,
831 Some(&proposal.id),
832 [(
833 "trajectory_persist_error".to_string(),
834 Value::from(err.as_str()),
835 )]
836 .into(),
837 );
838 }
839 return result;
840 };
841
842 let failed_actions: Vec<FailedActionSummary> = result
844 .results
845 .iter()
846 .filter(|r| r.status == ActionStatus::Failed)
847 .map(|r| {
848 let action = current_proposal
849 .actions
850 .iter()
851 .find(|a| a.id == r.action_id);
852 FailedActionSummary {
853 action_id: r.action_id.clone(),
854 tool: action.and_then(|a| a.tool.clone()),
855 error: r.error.clone().unwrap_or_default(),
856 parameters: action.map(|a| a.parameters.clone()).unwrap_or_default(),
857 }
858 })
859 .collect();
860
861 let completed_action_ids: Vec<String> = result
862 .results
863 .iter()
864 .filter(|r| r.status == ActionStatus::Succeeded)
865 .map(|r| r.action_id.clone())
866 .collect();
867
868 let ctx = ReplanContext {
869 proposal_id: proposal.id.clone(),
870 attempt: attempt + 1,
871 failed_actions,
872 completed_action_ids,
873 state_snapshot: self.state.snapshot(),
874 replans_remaining: config.max_replans.saturating_sub(attempt + 1),
875 original_source: proposal.source.clone(),
876 original_action_count: proposal.actions.len(),
877 original_context: proposal.context.clone(),
878 };
879
880 if config.delay_ms > 0 {
882 tokio::time::sleep(Duration::from_millis(config.delay_ms)).await;
883 }
884
885 {
887 let mut log = self.log.lock().await;
888 log.append(
889 EventKind::ReplanAttempted,
890 None,
891 Some(&proposal.id),
892 [
893 ("attempt".to_string(), Value::from(attempt + 1)),
894 (
895 "failed_count".to_string(),
896 Value::from(ctx.failed_actions.len()),
897 ),
898 ]
899 .into(),
900 );
901 }
902
903 match callback.replan(&ctx).await {
905 Ok(new_proposal) => {
906 if config.verify_before_execute {
908 let tools_guard = self.tools.read().await;
909 let tool_names: std::collections::HashSet<String> =
910 tools_guard.keys().cloned().collect();
911 drop(tools_guard);
912
913 let current_state = self.state.snapshot();
914 let vr = car_verify::verify(
915 &new_proposal,
916 Some(¤t_state),
917 Some(&tool_names),
918 100,
919 );
920 if !vr.valid {
921 let error_msgs: Vec<String> = vr
922 .issues
923 .iter()
924 .filter(|i| i.severity == "error")
925 .map(|i| i.message.clone())
926 .collect();
927 let mut log = self.log.lock().await;
928 log.append(
929 EventKind::ReplanRejected,
930 None,
931 Some(&proposal.id),
932 [
933 ("errors".to_string(), Value::from(error_msgs.join("; "))),
934 ("attempt".to_string(), Value::from(attempt + 1)),
935 ]
936 .into(),
937 );
938 attempt += 1;
940 continue;
941 }
942 }
943
944 {
946 let mut log = self.log.lock().await;
947 log.append(
948 EventKind::ReplanProposalReceived,
949 None,
950 Some(&proposal.id),
951 [
952 ("attempt".to_string(), Value::from(attempt + 1)),
953 (
954 "new_action_count".to_string(),
955 Value::from(new_proposal.actions.len()),
956 ),
957 ]
958 .into(),
959 );
960 }
961 current_proposal = new_proposal;
962 attempt += 1;
963 }
964 Err(e) => {
965 let mut log = self.log.lock().await;
967 log.append(
968 EventKind::ReplanExhausted,
969 None,
970 Some(&proposal.id),
971 [
972 ("reason".to_string(), Value::from("callback_error")),
973 ("error".to_string(), Value::from(e.as_str())),
974 ("attempt".to_string(), Value::from(attempt + 1)),
975 ]
976 .into(),
977 );
978 if let Some(err) = self.persist_trajectory(
979 proposal,
980 ¤t_proposal,
981 &result,
982 car_memgine::TrajectoryOutcome::Failed,
983 attempt,
984 &state_before_map,
985 ) {
986 log.append(
987 EventKind::ActionFailed,
988 None,
989 Some(&proposal.id),
990 [(
991 "trajectory_persist_error".to_string(),
992 Value::from(err.as_str()),
993 )]
994 .into(),
995 );
996 }
997 return result;
998 }
999 }
1000 }
1001 }
1002
1003 fn persist_trajectory(
1005 &self,
1006 proposal: &ActionProposal,
1007 current_proposal: &ActionProposal,
1008 result: &ProposalResult,
1009 outcome: car_memgine::TrajectoryOutcome,
1010 attempt: u32,
1011 state_before_map: &HashMap<String, HashMap<String, Value>>,
1012 ) -> Option<String> {
1013 let store = self.trajectory_store.as_ref()?;
1014
1015 let trace_events: Vec<car_memgine::TraceEvent> = result
1016 .results
1017 .iter()
1018 .map(|r| {
1019 let kind = match r.status {
1020 ActionStatus::Succeeded => "action_succeeded",
1021 ActionStatus::Failed => "action_failed",
1022 ActionStatus::Rejected => "action_rejected",
1023 ActionStatus::Skipped => "action_skipped",
1024 _ => "unknown",
1025 };
1026 let tool = current_proposal
1027 .actions
1028 .iter()
1029 .find(|a| a.id == r.action_id)
1030 .and_then(|a| a.tool.clone());
1031 let reward = match r.status {
1032 ActionStatus::Succeeded => Some(1.0),
1033 ActionStatus::Failed => Some(0.0),
1034 ActionStatus::Rejected => Some(0.0),
1035 ActionStatus::Skipped => None,
1036 _ => None,
1037 };
1038 car_memgine::TraceEvent {
1039 kind: kind.to_string(),
1040 action_id: Some(r.action_id.clone()),
1041 tool,
1042 data: r
1043 .error
1044 .as_ref()
1045 .map(|e| serde_json::json!({"error": e}))
1046 .unwrap_or(serde_json::json!({})),
1047 duration_ms: r.duration_ms,
1048 state_before: state_before_map.get(&r.action_id).cloned(),
1049 state_after: if !r.state_changes.is_empty() {
1050 Some(r.state_changes.clone())
1051 } else {
1052 None
1053 },
1054 reward,
1055 }
1056 })
1057 .collect();
1058
1059 let trajectory = car_memgine::Trajectory {
1060 proposal_id: proposal.id.clone(),
1061 source: proposal.source.clone(),
1062 action_count: current_proposal.actions.len(),
1063 events: trace_events,
1064 outcome,
1065 timestamp: chrono::Utc::now(),
1066 duration_ms: result.cost.total_duration_ms,
1067 replan_attempts: attempt,
1068 };
1069
1070 match store.append(&trajectory) {
1071 Ok(()) => None,
1072 Err(e) => Some(e.to_string()),
1073 }
1074 }
1075
1076 pub async fn plan_and_execute(
1082 &self,
1083 candidates: &[ActionProposal],
1084 planner_config: Option<car_planner::PlannerConfig>,
1085 feedback: Option<&car_planner::ToolFeedback>,
1086 ) -> ProposalResult {
1087 if candidates.is_empty() {
1088 return ProposalResult {
1089 proposal_id: "empty".to_string(),
1090 results: vec![],
1091 cost: car_ir::CostSummary::default(),
1092 };
1093 }
1094
1095 let planner = car_planner::Planner::new(planner_config.unwrap_or_default());
1097 let tools_guard = self.tools.read().await;
1098 let tool_names: std::collections::HashSet<String> = tools_guard.keys().cloned().collect();
1099 drop(tools_guard);
1100
1101 let pre_plan_snapshot = self.state.snapshot();
1102 let pre_plan_transitions = self.state.transition_count();
1103 let ranked = planner.rank_with_feedback(
1104 candidates,
1105 Some(&pre_plan_snapshot),
1106 Some(&tool_names),
1107 feedback,
1108 );
1109
1110 let mut first_failure: Option<ProposalResult> = None;
1112 for scored in &ranked {
1113 if !scored.valid {
1114 continue;
1115 }
1116
1117 self.state
1120 .restore(pre_plan_snapshot.clone(), pre_plan_transitions);
1121
1122 let proposal = &candidates[scored.index];
1123 let result = self.execute(proposal).await;
1124
1125 if result.all_succeeded() {
1126 return result;
1127 }
1128
1129 tracing::info!(
1130 proposal_id = %proposal.id,
1131 score = scored.score,
1132 "plan_and_execute: proposal failed, trying next candidate"
1133 );
1134
1135 if first_failure.is_none() {
1136 first_failure = Some(result);
1137 }
1138 }
1139
1140 first_failure.unwrap_or_else(|| ProposalResult {
1142 proposal_id: candidates[0].id.clone(),
1143 results: vec![],
1144 cost: car_ir::CostSummary::default(),
1145 })
1146 }
1147
1148 async fn execute_inner_with_cancel(
1156 &self,
1157 proposal: &ActionProposal,
1158 cancel: Option<&tokio_util::sync::CancellationToken>,
1159 session_id: Option<&str>,
1160 scope: Option<&crate::scope::RuntimeScope>,
1161 ) -> (ProposalResult, HashMap<String, HashMap<String, Value>>) {
1162 let trace_id = Uuid::new_v4().to_string();
1164
1165 let root_span_id = {
1167 let mut log = self.log.lock().await;
1168 log.begin_span(
1169 "proposal.execute",
1170 &trace_id,
1171 None,
1172 [("proposal_id".to_string(), Value::from(proposal.id.as_str()))].into(),
1173 )
1174 };
1175
1176 {
1178 let mut log = self.log.lock().await;
1179 log.append(
1180 EventKind::ProposalReceived,
1181 None,
1182 Some(&proposal.id),
1183 [
1184 ("source".to_string(), Value::from(proposal.source.as_str())),
1185 (
1186 "action_count".to_string(),
1187 Value::from(proposal.actions.len()),
1188 ),
1189 ]
1190 .into(),
1191 );
1192 }
1193
1194 {
1196 let caps = self.capabilities.read().await;
1197 if let Some(ref cap) = *caps {
1198 if !cap.actions_within_budget(proposal.actions.len() as u32) {
1199 let mut action_results = Vec::new();
1200 for action in &proposal.actions {
1201 action_results.push(rejected_result(
1202 &action.id,
1203 format!(
1204 "capability denied: proposal has {} actions, max allowed is {:?}",
1205 proposal.actions.len(),
1206 cap.max_actions
1207 ),
1208 ));
1209 }
1210 return (
1211 ProposalResult {
1212 proposal_id: proposal.id.clone(),
1213 results: action_results,
1214 cost: CostSummary::default(),
1215 },
1216 HashMap::new(),
1217 );
1218 }
1219 }
1220 }
1221
1222 let snapshot = self.state.snapshot();
1224 let transition_count = self.state.transition_count();
1225
1226 let mut results: Vec<ActionResult> = Vec::new();
1227 let mut state_before_map: HashMap<String, HashMap<String, Value>> = HashMap::new();
1229 let mut aborted = false;
1230 let mut budget_exceeded = false;
1231 let mut total_retries: u32 = 0;
1232
1233 let mut running_tool_calls: u32 = 0;
1235 let mut running_actions: u32 = 0;
1236 let mut running_duration_ms: f64 = 0.0;
1237
1238 let budget = self.cost_budget.read().await.clone();
1240
1241 let levels = build_dag(&proposal.actions);
1243
1244 let mut canceled = false;
1245 for level in &levels {
1246 if !canceled {
1252 if let Some(token) = cancel {
1253 if token.is_cancelled() {
1254 canceled = true;
1255 }
1256 }
1257 }
1258 if canceled {
1259 for &idx in level {
1260 results.push(canceled_result(
1261 &proposal.actions[idx].id,
1262 "cancellation requested by caller",
1263 ));
1264 }
1265 continue;
1266 }
1267 if aborted || budget_exceeded {
1268 let skip_reason = if budget_exceeded {
1269 "cost budget exceeded"
1270 } else {
1271 "skipped due to earlier abort"
1272 };
1273 for &idx in level {
1274 results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1275 }
1276 continue;
1277 }
1278
1279 let has_abort = level
1281 .iter()
1282 .any(|&i| proposal.actions[i].failure_behavior == FailureBehavior::Abort);
1283
1284 if level.len() == 1 || has_abort {
1285 for &idx in level {
1287 if aborted || budget_exceeded {
1288 let skip_reason = if budget_exceeded {
1289 "cost budget exceeded"
1290 } else {
1291 "skipped due to abort"
1292 };
1293 results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1294 continue;
1295 }
1296
1297 if let Some(ref b) = budget {
1299 if let Some(max) = b.max_actions {
1300 if running_actions >= max {
1301 budget_exceeded = true;
1302 results.push(skipped_result(
1303 &proposal.actions[idx].id,
1304 "cost budget exceeded",
1305 ));
1306 continue;
1307 }
1308 }
1309 if let Some(max) = b.max_tool_calls {
1310 if proposal.actions[idx].action_type == ActionType::ToolCall
1311 && running_tool_calls >= max
1312 {
1313 budget_exceeded = true;
1314 results.push(skipped_result(
1315 &proposal.actions[idx].id,
1316 "cost budget exceeded",
1317 ));
1318 continue;
1319 }
1320 }
1321 if let Some(max) = b.max_duration_ms {
1322 if running_duration_ms >= max {
1323 budget_exceeded = true;
1324 results.push(skipped_result(
1325 &proposal.actions[idx].id,
1326 "cost budget exceeded",
1327 ));
1328 continue;
1329 }
1330 }
1331 }
1332
1333 state_before_map.insert(
1334 proposal.actions[idx].id.clone(),
1335 snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1336 );
1337 let (ar, action_retries) = self
1338 .process_action(
1339 &proposal.actions[idx],
1340 &proposal.id,
1341 &trace_id,
1342 &root_span_id,
1343 session_id,
1344 scope,
1345 )
1346 .await;
1347 total_retries += action_retries;
1348
1349 if ar.status == ActionStatus::Succeeded
1351 && proposal.actions[idx].action_type == ActionType::ToolCall
1352 {
1353 running_tool_calls += 1;
1354 }
1355 if ar.status != ActionStatus::Skipped {
1356 running_actions += 1;
1357 }
1358 if let Some(d) = ar.duration_ms {
1359 running_duration_ms += d;
1360 }
1361
1362 if ar.status == ActionStatus::Failed
1363 && proposal.actions[idx].failure_behavior == FailureBehavior::Abort
1364 {
1365 aborted = true;
1366 }
1367 results.push(ar);
1368 }
1369 } else {
1370 for &idx in level {
1373 state_before_map.insert(
1374 proposal.actions[idx].id.clone(),
1375 snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1376 );
1377 }
1378 let futs: Vec<_> = level
1379 .iter()
1380 .map(|&idx| {
1381 self.process_action(
1382 &proposal.actions[idx],
1383 &proposal.id,
1384 &trace_id,
1385 &root_span_id,
1386 session_id,
1387 scope,
1388 )
1389 })
1390 .collect();
1391 let level_results = futures::future::join_all(futs).await;
1392
1393 for (i, (ar, action_retries)) in level_results.into_iter().enumerate() {
1394 let idx = level[i];
1395 total_retries += action_retries;
1396 if ar.status == ActionStatus::Succeeded
1397 && proposal.actions[idx].action_type == ActionType::ToolCall
1398 {
1399 running_tool_calls += 1;
1400 }
1401 if ar.status != ActionStatus::Skipped {
1402 running_actions += 1;
1403 }
1404 if let Some(d) = ar.duration_ms {
1405 running_duration_ms += d;
1406 }
1407 results.push(ar);
1408 }
1409 }
1410 }
1411
1412 if aborted {
1414 self.state.restore(snapshot.clone(), transition_count);
1415
1416 let mut log = self.log.lock().await;
1417 log.append(
1418 EventKind::StateSnapshot,
1419 None,
1420 Some(&proposal.id),
1421 [(
1422 "state".to_string(),
1423 serde_json::to_value(&snapshot).unwrap_or_default(),
1424 )]
1425 .into(),
1426 );
1427 log.append(
1428 EventKind::StateRollback,
1429 None,
1430 Some(&proposal.id),
1431 [(
1432 "rolled_back_to".to_string(),
1433 Value::from("pre-proposal snapshot"),
1434 )]
1435 .into(),
1436 );
1437
1438 let mut cache = self.idempotency_cache.lock().await;
1440 for r in &results {
1441 if r.status == ActionStatus::Succeeded {
1442 for action in &proposal.actions {
1443 if action.id == r.action_id && action.idempotent {
1444 cache.remove(&idempotency_key(action));
1445 }
1446 }
1447 }
1448 }
1449 }
1450
1451 let action_order: HashMap<String, usize> = proposal
1453 .actions
1454 .iter()
1455 .enumerate()
1456 .map(|(i, a)| (a.id.clone(), i))
1457 .collect();
1458 results.sort_by_key(|r| {
1459 action_order
1460 .get(&r.action_id)
1461 .copied()
1462 .unwrap_or(usize::MAX)
1463 });
1464
1465 let mut cost = CostSummary::default();
1467 for r in &results {
1468 let action = action_order
1469 .get(&r.action_id)
1470 .and_then(|&i| proposal.actions.get(i));
1471 match r.status {
1472 ActionStatus::Succeeded => {
1473 cost.actions_executed += 1;
1474 if let Some(a) = action {
1475 if a.action_type == ActionType::ToolCall {
1476 cost.tool_calls += 1;
1477 }
1478 }
1479 }
1480 ActionStatus::Failed | ActionStatus::Rejected => {
1481 cost.actions_executed += 1;
1482 }
1483 ActionStatus::Skipped => {
1484 cost.actions_skipped += 1;
1485 }
1486 _ => {}
1487 }
1488 if let Some(d) = r.duration_ms {
1489 cost.total_duration_ms += d;
1490 }
1491 }
1492
1493 cost.retries = total_retries;
1495
1496 {
1498 let span_status = if aborted {
1499 SpanStatus::Error
1500 } else {
1501 SpanStatus::Ok
1502 };
1503 let mut log = self.log.lock().await;
1504 log.end_span(&root_span_id, span_status);
1505 }
1506
1507 let proposal_result = ProposalResult {
1508 proposal_id: proposal.id.clone(),
1509 results,
1510 cost,
1511 };
1512
1513 if self.auto_distill {
1515 if let Some(ref memgine) = self.memgine {
1516 let trace_events: Vec<car_memgine::TraceEvent> = proposal_result
1518 .results
1519 .iter()
1520 .map(|r| {
1521 let kind = match r.status {
1522 ActionStatus::Succeeded => "action_succeeded",
1523 ActionStatus::Failed => "action_failed",
1524 ActionStatus::Rejected => "action_rejected",
1525 ActionStatus::Skipped => "action_skipped",
1526 _ => "unknown",
1527 };
1528 let tool = proposal
1530 .actions
1531 .iter()
1532 .find(|a| a.id == r.action_id)
1533 .and_then(|a| a.tool.clone());
1534 let mut data = serde_json::Map::new();
1535 if let Some(ref e) = r.error {
1536 data.insert("error".into(), Value::from(e.as_str()));
1537 }
1538 if let Some(ref o) = r.output {
1539 data.insert("output".into(), o.clone());
1540 }
1541 car_memgine::TraceEvent {
1542 kind: kind.to_string(),
1543 action_id: Some(r.action_id.clone()),
1544 tool,
1545 data: Value::Object(data),
1546 duration_ms: r.duration_ms,
1547 reward: match r.status {
1548 ActionStatus::Succeeded => Some(1.0),
1549 ActionStatus::Failed | ActionStatus::Rejected => Some(0.0),
1550 _ => None,
1551 },
1552 ..Default::default()
1553 }
1554 })
1555 .collect();
1556
1557 let mut engine = memgine.lock().await;
1558 let skills = engine.distill_skills(&trace_events).await;
1559 if !skills.is_empty() {
1560 let count = skills.len();
1561 let tenant = scope.and_then(|s| s.tenant_id.as_deref());
1567 engine.scoped(tenant).ingest_distilled_skills(&skills);
1568
1569 let mut log = self.log.lock().await;
1571 log.append(
1572 EventKind::SkillDistilled,
1573 None,
1574 Some(&proposal_result.proposal_id),
1575 [
1576 ("skills_count".to_string(), Value::from(count)),
1577 (
1578 "skill_names".to_string(),
1579 Value::from(
1580 skills.iter().map(|s| s.name.as_str()).collect::<Vec<_>>(),
1581 ),
1582 ),
1583 ]
1584 .into(),
1585 );
1586
1587 let threshold = engine.evolution_threshold();
1589 let domains = engine.domains_needing_evolution(threshold);
1590 for domain in &domains {
1591 let failed: Vec<car_memgine::TraceEvent> = trace_events
1593 .iter()
1594 .filter(|e| {
1595 matches!(e.kind.as_str(), "action_failed" | "action_rejected")
1596 })
1597 .cloned()
1598 .collect();
1599 if !failed.is_empty() {
1600 let evolved = engine.evolve_skills(&failed, domain).await;
1601 if !evolved.is_empty() {
1602 log.append(
1603 EventKind::EvolutionTriggered,
1604 None,
1605 Some(&proposal_result.proposal_id),
1606 [
1607 ("domain".to_string(), Value::from(domain.as_str())),
1608 ("new_skills".to_string(), Value::from(evolved.len())),
1609 ]
1610 .into(),
1611 );
1612 }
1613 }
1614 }
1615 }
1616 }
1617 }
1618
1619 (proposal_result, state_before_map)
1620 }
1621
1622 async fn process_action(
1625 &self,
1626 action: &Action,
1627 proposal_id: &str,
1628 trace_id: &str,
1629 parent_span_id: &str,
1630 session_id: Option<&str>,
1631 scope: Option<&crate::scope::RuntimeScope>,
1632 ) -> (ActionResult, u32) {
1633 let action_type_name = serde_json::to_string(&action.action_type)
1635 .unwrap_or_default()
1636 .trim_matches('"')
1637 .to_string();
1638 let span_name = format!("action.{}", action_type_name);
1639
1640 let action_span_id = {
1642 let mut attrs: HashMap<String, Value> = HashMap::new();
1643 attrs.insert("action_id".to_string(), Value::from(action.id.as_str()));
1644 if let Some(ref tool) = action.tool {
1645 attrs.insert("tool".to_string(), Value::from(tool.as_str()));
1646 }
1647 let mut log = self.log.lock().await;
1648 log.begin_span(&span_name, trace_id, Some(parent_span_id), attrs)
1649 };
1650
1651 let (result, retries) = self
1653 .process_action_inner(action, proposal_id, session_id, scope)
1654 .await;
1655
1656 let span_status = match result.status {
1658 ActionStatus::Succeeded => SpanStatus::Ok,
1659 ActionStatus::Failed | ActionStatus::Rejected => SpanStatus::Error,
1660 _ => SpanStatus::Unset,
1661 };
1662 {
1663 let mut log = self.log.lock().await;
1664 log.end_span(&action_span_id, span_status);
1665 }
1666
1667 (result, retries)
1668 }
1669
1670 #[instrument(
1673 name = "action.process",
1674 skip_all,
1675 fields(
1676 action_id = %action.id,
1677 action_type = ?action.action_type,
1678 tool = action.tool.as_deref().unwrap_or("none"),
1679 )
1680 )]
1681 async fn process_action_inner(
1682 &self,
1683 action: &Action,
1684 proposal_id: &str,
1685 session_id: Option<&str>,
1686 scope: Option<&crate::scope::RuntimeScope>,
1687 ) -> (ActionResult, u32) {
1688 if action.idempotent {
1690 let key = idempotency_key(action);
1691 let cache = self.idempotency_cache.lock().await;
1692 if let Some(cached) = cache.get(&key) {
1693 let mut log = self.log.lock().await;
1694 log.append(
1695 EventKind::ActionDeduplicated,
1696 Some(&action.id),
1697 Some(proposal_id),
1698 [(
1699 "cached_action_id".to_string(),
1700 Value::from(cached.action_id.as_str()),
1701 )]
1702 .into(),
1703 );
1704 return (
1705 ActionResult {
1706 action_id: action.id.clone(),
1707 status: cached.status.clone(),
1708 output: cached.output.clone(),
1709 error: cached.error.clone(),
1710 state_changes: cached.state_changes.clone(),
1711 duration_ms: Some(0.0),
1712 timestamp: chrono::Utc::now(),
1713 },
1714 0,
1715 );
1716 }
1717 }
1718
1719 {
1721 let caps = self.capabilities.read().await;
1722 if let Some(ref cap) = *caps {
1723 if action.action_type == ActionType::ToolCall {
1725 if let Some(ref tool_name) = action.tool {
1726 if !cap.tool_allowed(tool_name) {
1727 let mut log = self.log.lock().await;
1728 log.append(
1729 EventKind::ActionRejected,
1730 Some(&action.id),
1731 Some(proposal_id),
1732 HashMap::new(),
1733 );
1734 return (
1735 rejected_result(
1736 &action.id,
1737 format!("capability denied: tool '{}' not allowed", tool_name),
1738 ),
1739 0,
1740 );
1741 }
1742 }
1743 }
1744
1745 if action.action_type == ActionType::StateWrite
1747 || action.action_type == ActionType::StateRead
1748 {
1749 if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
1750 if !cap.state_key_allowed(key) {
1751 let mut log = self.log.lock().await;
1752 log.append(
1753 EventKind::ActionRejected,
1754 Some(&action.id),
1755 Some(proposal_id),
1756 HashMap::new(),
1757 );
1758 return (
1759 rejected_result(
1760 &action.id,
1761 format!("capability denied: state key '{}' not allowed", key),
1762 ),
1763 0,
1764 );
1765 }
1766 }
1767 }
1768 }
1769 }
1770
1771 let tools = self.tools.read().await;
1773 let validation = validate_action(action, &self.state, &tools);
1774 drop(tools);
1775
1776 if !validation.valid() {
1777 let error = validation
1778 .errors
1779 .iter()
1780 .map(|e| e.reason.as_str())
1781 .collect::<Vec<_>>()
1782 .join("; ");
1783 let mut log = self.log.lock().await;
1784 log.append(
1785 EventKind::ActionRejected,
1786 Some(&action.id),
1787 Some(proposal_id),
1788 HashMap::new(),
1789 );
1790 return (rejected_result(&action.id, error), 0);
1791 }
1792
1793 {
1799 let mut violations = {
1800 let policies = self.policies.read().await;
1801 policies.check(action, &self.state)
1802 };
1803 if let Some(sid) = session_id {
1804 let session_engine = {
1809 let sessions = self.session_policies.read().await;
1810 sessions.get(sid).cloned()
1811 };
1812 if let Some(engine) = session_engine {
1813 let engine = engine.read().await;
1814 violations.extend(engine.check(action, &self.state));
1815 } else {
1816 let mut log = self.log.lock().await;
1822 log.append(
1823 EventKind::PolicyViolation,
1824 Some(&action.id),
1825 Some(proposal_id),
1826 HashMap::new(),
1827 );
1828 return (
1829 rejected_result(
1830 &action.id,
1831 format!(
1832 "unknown session id '{sid}' — open one via Runtime::open_session before executing under a session"
1833 ),
1834 ),
1835 0,
1836 );
1837 }
1838 }
1839 if !violations.is_empty() {
1840 let error = violations
1841 .iter()
1842 .map(|v| format!("policy '{}': {}", v.policy_name, v.reason))
1843 .collect::<Vec<_>>()
1844 .join("; ");
1845 let mut log = self.log.lock().await;
1846 log.append(
1847 EventKind::PolicyViolation,
1848 Some(&action.id),
1849 Some(proposal_id),
1850 HashMap::new(),
1851 );
1852 return (rejected_result(&action.id, error), 0);
1853 }
1854 }
1855
1856 {
1858 let mut log = self.log.lock().await;
1859 log.append(
1860 EventKind::ActionValidated,
1861 Some(&action.id),
1862 Some(proposal_id),
1863 HashMap::new(),
1864 );
1865 }
1866
1867 let (result, retries) = self.execute_with_retry(action, proposal_id, scope).await;
1869
1870 if action.idempotent && result.status == ActionStatus::Succeeded {
1872 let mut cache = self.idempotency_cache.lock().await;
1873 cache.insert(idempotency_key(action), result.clone());
1874 }
1875
1876 tracing::info!(
1877 status = ?result.status,
1878 duration_ms = result.duration_ms,
1879 "action completed"
1880 );
1881
1882 (result, retries)
1883 }
1884
1885 async fn execute_with_retry(
1888 &self,
1889 action: &Action,
1890 proposal_id: &str,
1891 scope: Option<&crate::scope::RuntimeScope>,
1892 ) -> (ActionResult, u32) {
1893 let max_attempts = if action.failure_behavior == FailureBehavior::Retry {
1894 action.max_retries + 1
1895 } else {
1896 1
1897 };
1898
1899 let mut last_error: Option<String> = None;
1900 let mut retries: u32 = 0;
1901
1902 for attempt in 0..max_attempts {
1903 if attempt > 0 {
1904 retries += 1;
1905 let delay = RETRY_BASE_DELAY_MS * RETRY_BACKOFF_FACTOR.pow(attempt as u32 - 1);
1906 tokio::time::sleep(Duration::from_millis(delay)).await;
1907 let mut log = self.log.lock().await;
1908 log.append(
1909 EventKind::ActionRetrying,
1910 Some(&action.id),
1911 Some(proposal_id),
1912 [("attempt".to_string(), Value::from(attempt + 1))].into(),
1913 );
1914 }
1915
1916 {
1917 let mut log = self.log.lock().await;
1918 log.append(
1919 EventKind::ActionExecuting,
1920 Some(&action.id),
1921 Some(proposal_id),
1922 HashMap::new(),
1923 );
1924 }
1925
1926 let start = std::time::Instant::now();
1927 let transitions_before = self.state.transition_count();
1928
1929 let exec_result = if let Some(timeout_ms) = action.timeout_ms {
1931 match timeout(
1932 Duration::from_millis(timeout_ms),
1933 self.dispatch(action, scope),
1934 )
1935 .await
1936 {
1937 Ok(r) => r,
1938 Err(_) => Err(format!("action timed out after {}ms", timeout_ms)),
1939 }
1940 } else {
1941 self.dispatch(action, scope).await
1942 };
1943
1944 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
1945
1946 match exec_result {
1947 Ok(output) => {
1948 let tenant_for_effects = scope.and_then(|s| s.tenant_id.as_deref());
1954 let scoped_state = self.state.scoped(tenant_for_effects);
1955 let mut state_changes: HashMap<String, Value> = HashMap::new();
1956 for (key, value) in &action.expected_effects {
1957 scoped_state.set(key, value.clone(), &action.id);
1958 state_changes.insert(key.clone(), value.clone());
1959 }
1960
1961 for t in self.state.transitions_since(transitions_before) {
1963 if !state_changes.contains_key(&t.key) {
1964 if let Some(v) = t.new_value {
1965 state_changes.insert(t.key.clone(), v);
1966 }
1967 }
1968 }
1969
1970 let mut log = self.log.lock().await;
1971 log.append(
1972 EventKind::ActionSucceeded,
1973 Some(&action.id),
1974 Some(proposal_id),
1975 [("duration_ms".to_string(), Value::from(duration_ms))].into(),
1976 );
1977
1978 if !state_changes.is_empty() {
1979 log.append(
1980 EventKind::StateChanged,
1981 Some(&action.id),
1982 Some(proposal_id),
1983 [(
1984 "changes".to_string(),
1985 serde_json::to_value(&state_changes).unwrap_or_default(),
1986 )]
1987 .into(),
1988 );
1989 }
1990
1991 return (
1992 ActionResult {
1993 action_id: action.id.clone(),
1994 status: ActionStatus::Succeeded,
1995 output: Some(output),
1996 error: None,
1997 state_changes,
1998 duration_ms: Some(duration_ms),
1999 timestamp: chrono::Utc::now(),
2000 },
2001 retries,
2002 );
2003 }
2004 Err(e) => {
2005 last_error = Some(e.clone());
2006 let mut log = self.log.lock().await;
2007 log.append(
2008 EventKind::ActionFailed,
2009 Some(&action.id),
2010 Some(proposal_id),
2011 [
2012 ("error".to_string(), Value::from(e.as_str())),
2013 ("attempt".to_string(), Value::from(attempt + 1)),
2014 ]
2015 .into(),
2016 );
2017 }
2018 }
2019 }
2020
2021 if action.failure_behavior == FailureBehavior::Skip {
2023 return (
2024 skipped_result(
2025 &action.id,
2026 last_error.as_deref().unwrap_or("all attempts exhausted"),
2027 ),
2028 retries,
2029 );
2030 }
2031
2032 (
2033 ActionResult {
2034 action_id: action.id.clone(),
2035 status: ActionStatus::Failed,
2036 output: None,
2037 error: last_error,
2038 state_changes: HashMap::new(),
2039 duration_ms: None,
2040 timestamp: chrono::Utc::now(),
2041 },
2042 retries,
2043 )
2044 }
2045
2046 async fn dispatch(
2056 &self,
2057 action: &Action,
2058 scope: Option<&crate::scope::RuntimeScope>,
2059 ) -> Result<Value, String> {
2060 match action.action_type {
2061 ActionType::ToolCall => {
2062 let tool_name = action.tool.as_deref().ok_or("tool_call has no tool")?;
2063 let params = Value::Object(
2064 action
2065 .parameters
2066 .iter()
2067 .map(|(k, v)| (k.clone(), v.clone()))
2068 .collect(),
2069 );
2070
2071 if let Some(cached) = self.result_cache.get(tool_name, ¶ms).await {
2073 return Ok(cached);
2074 }
2075
2076 self.rate_limiter.acquire(tool_name).await;
2078
2079 if matches!(
2081 tool_name,
2082 "infer" | "infer.grounded" | "embed" | "classify" | "transcribe" | "synthesize"
2083 ) {
2084 if let Some(ref engine) = self.inference_engine {
2085 let params = {
2088 let should_ground =
2089 tool_name == "infer.grounded" || tool_name == "infer";
2090 if should_ground {
2091 if let Some(ref memgine) = self.memgine {
2092 if let Some(prompt) =
2093 params.get("prompt").and_then(|v| v.as_str())
2094 {
2095 let ctx = {
2096 let mut m = memgine.lock().await;
2097 m.build_context(prompt)
2098 };
2099 if !ctx.is_empty() {
2100 let mut p = params.clone();
2101 if let Some(obj) = p.as_object_mut() {
2102 obj.insert("context".to_string(), Value::from(ctx));
2103 }
2104 p
2105 } else {
2106 params
2107 }
2108 } else {
2109 params
2110 }
2111 } else {
2112 params
2113 }
2114 } else {
2115 params
2116 }
2117 };
2118
2119 let effective_tool = if tool_name == "infer.grounded" {
2121 "infer"
2122 } else {
2123 tool_name
2124 };
2125 let result =
2126 car_inference::service::execute_tool(engine, effective_tool, ¶ms)
2127 .await
2128 .map_err(|e| e.to_string());
2129
2130 if let Ok(ref value) = result {
2131 self.result_cache
2132 .put(tool_name, ¶ms, value.clone())
2133 .await;
2134 }
2135
2136 return result;
2137 }
2138 }
2139
2140 if tool_name == "memory.consolidate" {
2142 if let Some(ref memgine) = self.memgine {
2143 let report = {
2144 let mut m = memgine.lock().await;
2145 m.consolidate().await
2146 };
2147 {
2149 let mut log = self.log.lock().await;
2150 log.append(
2151 EventKind::Consolidated,
2152 None,
2153 None,
2154 [
2155 (
2156 "expired_pruned".to_string(),
2157 Value::from(report.expired_pruned),
2158 ),
2159 (
2160 "superseded_gc".to_string(),
2161 Value::from(report.superseded_gc),
2162 ),
2163 (
2164 "stale_embeddings_removed".to_string(),
2165 Value::from(report.stale_embeddings_removed),
2166 ),
2167 (
2168 "nodes_embedded".to_string(),
2169 Value::from(report.nodes_embedded),
2170 ),
2171 (
2172 "domains_evolved".to_string(),
2173 Value::from(report.domains_evolved.clone()),
2174 ),
2175 ("total_nodes".to_string(), Value::from(report.total_nodes)),
2176 ("total_edges".to_string(), Value::from(report.total_edges)),
2177 ]
2178 .into(),
2179 );
2180 }
2181 return Ok(serde_json::to_value(&report).unwrap_or(Value::Null));
2182 } else {
2183 return Err(
2184 "memory.consolidate requires memgine (attach with with_learning)"
2185 .into(),
2186 );
2187 }
2188 }
2189
2190 let configured = {
2196 let guard = self.tool_executor.lock().await;
2197 guard.as_ref().cloned()
2198 };
2199
2200 if let Some(ref executor) = configured {
2201 let result = executor
2202 .execute_with_action(tool_name, ¶ms, &action.id)
2203 .await;
2204 let fall_through = matches!(&result, Err(e) if e.starts_with("unknown tool"));
2205 if !fall_through {
2206 if let Ok(ref value) = result {
2207 self.result_cache
2208 .put(tool_name, ¶ms, value.clone())
2209 .await;
2210 }
2211 return result;
2212 }
2213 }
2214
2215 if let Some(result) = crate::agent_basics::execute(tool_name, ¶ms).await {
2216 if let Ok(ref value) = result {
2217 self.result_cache
2218 .put(tool_name, ¶ms, value.clone())
2219 .await;
2220 }
2221 return result;
2222 }
2223
2224 Err(format!("no handler for tool '{}'", tool_name))
2225 }
2226 ActionType::StateWrite => {
2227 let key = action
2228 .parameters
2229 .get("key")
2230 .and_then(|v| v.as_str())
2231 .ok_or("state_write requires 'key' parameter")?;
2232 let value = action
2233 .parameters
2234 .get("value")
2235 .cloned()
2236 .unwrap_or(Value::Null);
2237 let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2238 self.state.scoped(tenant).set(key, value, &action.id);
2239 Ok(Value::from(format!("written: {}", key)))
2240 }
2241 ActionType::StateRead => {
2242 let key = action
2243 .parameters
2244 .get("key")
2245 .and_then(|v| v.as_str())
2246 .ok_or("state_read requires 'key' parameter")?;
2247 let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2248 Ok(self.state.scoped(tenant).get(key).unwrap_or(Value::Null))
2249 }
2250 ActionType::Assertion => {
2251 let key = action
2252 .parameters
2253 .get("key")
2254 .and_then(|v| v.as_str())
2255 .ok_or("assertion requires 'key' parameter")?;
2256 let expected = action
2257 .parameters
2258 .get("expected")
2259 .cloned()
2260 .unwrap_or(Value::Null);
2261 let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2262 let actual = self.state.scoped(tenant).get(key).unwrap_or(Value::Null);
2263 if actual != expected {
2264 Err(format!(
2265 "assertion failed: state['{}'] = {:?}, expected {:?}",
2266 key, actual, expected
2267 ))
2268 } else {
2269 Ok(serde_json::json!({"asserted": key, "value": actual}))
2270 }
2271 }
2272 }
2273 }
2274
2275 pub async fn save_checkpoint(&self) -> Checkpoint {
2279 let state = self.state.snapshot();
2280 let tools: Vec<String> = self.tools.read().await.keys().cloned().collect();
2281 let log = self.log.lock().await;
2282 let events: Vec<Value> = log
2283 .events()
2284 .iter()
2285 .map(|e| serde_json::to_value(e).unwrap_or_default())
2286 .collect();
2287
2288 Checkpoint {
2289 checkpoint_id: Uuid::new_v4().to_string(),
2290 created_at: chrono::Utc::now(),
2291 state,
2292 events,
2293 tools,
2294 metadata: HashMap::new(),
2295 }
2296 }
2297
2298 pub async fn save_checkpoint_to_file(&self, path: &str) -> Result<(), String> {
2300 let checkpoint = self.save_checkpoint().await;
2301 let json = serde_json::to_string_pretty(&checkpoint)
2302 .map_err(|e| format!("serialize error: {}", e))?;
2303 tokio::fs::write(path, json)
2304 .await
2305 .map_err(|e| format!("write error: {}", e))?;
2306 Ok(())
2307 }
2308
2309 pub async fn load_checkpoint_from_file(&self, path: &str) -> Result<Checkpoint, String> {
2311 let json = tokio::fs::read_to_string(path)
2312 .await
2313 .map_err(|e| format!("read error: {}", e))?;
2314 let checkpoint: Checkpoint =
2315 serde_json::from_str(&json).map_err(|e| format!("deserialize error: {}", e))?;
2316 self.restore_checkpoint(&checkpoint).await;
2317 Ok(checkpoint)
2318 }
2319
2320 pub async fn restore_checkpoint(&self, checkpoint: &Checkpoint) {
2322 self.state.replace_all(checkpoint.state.clone());
2324 self.idempotency_cache.lock().await.clear();
2327 let mut tools = self.tools.write().await;
2329 tools.clear();
2330 for tool_name in &checkpoint.tools {
2331 let schema = ToolSchema {
2332 name: tool_name.clone(),
2333 description: String::new(),
2334 parameters: serde_json::Value::Object(Default::default()),
2335 returns: None,
2336 idempotent: false,
2337 cache_ttl_secs: None,
2338 rate_limit: None,
2339 };
2340 tools.insert(tool_name.clone(), schema);
2341 }
2342 }
2343
2344 pub async fn register_subprocess_tool(
2349 &self,
2350 name: &str,
2351 tool: crate::subprocess::SubprocessTool,
2352 ) {
2353 use crate::subprocess::SubprocessToolExecutor;
2354
2355 let schema = ToolSchema {
2356 name: name.to_string(),
2357 description: format!("Subprocess tool: {}", tool.command),
2358 parameters: serde_json::Value::Object(Default::default()),
2359 returns: None,
2360 idempotent: false,
2361 cache_ttl_secs: None,
2362 rate_limit: None,
2363 };
2364 self.register_tool_schema(schema).await;
2365
2366 let mut guard = self.tool_executor.lock().await;
2367 let mut executor = match guard.take() {
2368 Some(existing) => {
2369 let mut sub = SubprocessToolExecutor::new();
2370 sub = sub.with_fallback(existing);
2371 sub
2372 }
2373 None => SubprocessToolExecutor::new(),
2374 };
2375 executor.register(name, tool);
2376 *guard = Some(std::sync::Arc::new(executor));
2377 }
2378}
2379
2380impl Default for Runtime {
2381 fn default() -> Self {
2382 Self::new()
2383 }
2384}