1use crate::cache::ResultCache;
4use crate::capabilities::CapabilitySet;
5use crate::checkpoint::Checkpoint;
6use crate::rate_limit::{RateLimit, RateLimiter};
7use car_eventlog::{EventKind, EventLog, SpanStatus};
8use car_ir::{
9 build_dag, Action, ActionProposal, ActionResult, ActionStatus, ActionType, CostSummary,
10 FailureBehavior, ProposalResult, ToolSchema,
11};
12use car_policy::PolicyEngine;
13use car_state::StateStore;
14use car_validator::validate_action;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
20use tokio::time::timeout;
21use tracing::instrument;
22use uuid::Uuid;
23
24const RETRY_BASE_DELAY_MS: u64 = 100;
26const RETRY_BACKOFF_FACTOR: u64 = 2;
27
28#[async_trait::async_trait]
36pub trait ReplanCallback: Send + Sync {
37 async fn replan(&self, ctx: &ReplanContext) -> Result<ActionProposal, String>;
38}
39
40#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42pub struct ReplanContext {
43 pub proposal_id: String,
45 pub attempt: u32,
47 pub failed_actions: Vec<FailedActionSummary>,
49 pub completed_action_ids: Vec<String>,
51 pub state_snapshot: HashMap<String, Value>,
53 pub replans_remaining: u32,
55 pub original_source: String,
57 pub original_action_count: usize,
59 pub original_context: HashMap<String, Value>,
61}
62
63#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
65pub struct FailedActionSummary {
66 pub action_id: String,
67 pub tool: Option<String>,
68 pub error: String,
69 pub parameters: HashMap<String, Value>,
70}
71
72#[derive(Debug, Clone)]
74pub struct ReplanConfig {
75 pub max_replans: u32,
77 pub delay_ms: u64,
80 pub verify_before_execute: bool,
84}
85
86impl Default for ReplanConfig {
87 fn default() -> Self {
88 Self {
89 max_replans: 0,
90 delay_ms: 0,
91 verify_before_execute: true,
92 }
93 }
94}
95
96#[async_trait::async_trait]
101pub trait ToolExecutor: Send + Sync {
102 async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String>;
103
104 async fn execute_with_action(
113 &self,
114 tool: &str,
115 params: &Value,
116 _action_id: &str,
117 ) -> Result<Value, String> {
118 self.execute(tool, params).await
119 }
120}
121
122fn idempotency_key(action: &Action) -> String {
124 let sorted: std::collections::BTreeMap<_, _> = action.parameters.iter().collect();
125 let params = serde_json::to_string(&sorted).unwrap_or_default();
126 format!(
127 "{}:{}:{}",
128 serde_json::to_string(&action.action_type).unwrap_or_default(),
129 action.tool.as_deref().unwrap_or(""),
130 params
131 )
132}
133
134fn rejected_result(action_id: &str, error: String) -> ActionResult {
135 ActionResult {
136 action_id: action_id.to_string(),
137 status: ActionStatus::Rejected,
138 output: None,
139 error: Some(error),
140 state_changes: HashMap::new(),
141 duration_ms: None,
142 timestamp: chrono::Utc::now(),
143 }
144}
145
146fn snapshot_relevant_keys(
150 state: &car_state::StateStore,
151 action: &Action,
152) -> HashMap<String, Value> {
153 let mut keys: std::collections::HashSet<&str> = std::collections::HashSet::new();
154 for dep in &action.state_dependencies {
155 keys.insert(dep.as_str());
156 }
157 for key in action.expected_effects.keys() {
158 keys.insert(key.as_str());
159 }
160 if action.action_type == ActionType::StateWrite {
162 if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
163 keys.insert(key);
164 }
165 }
166
167 if keys.is_empty() {
168 return HashMap::new();
170 }
171
172 keys.iter()
173 .filter_map(|&k| state.get(k).map(|v| (k.to_string(), v)))
174 .collect()
175}
176
177fn skipped_result(action_id: &str, reason: &str) -> ActionResult {
178 ActionResult {
179 action_id: action_id.to_string(),
180 status: ActionStatus::Skipped,
181 output: None,
182 error: Some(reason.to_string()),
183 state_changes: HashMap::new(),
184 duration_ms: None,
185 timestamp: chrono::Utc::now(),
186 }
187}
188
189pub const CANCELED_PREFIX: &str = "canceled: ";
196
197fn canceled_result(action_id: &str, reason: &str) -> ActionResult {
202 ActionResult {
203 action_id: action_id.to_string(),
204 status: ActionStatus::Skipped,
205 output: None,
206 error: Some(format!("{}{}", CANCELED_PREFIX, reason)),
207 state_changes: HashMap::new(),
208 duration_ms: None,
209 timestamp: chrono::Utc::now(),
210 }
211}
212
213pub fn format_tool_result(result: &ActionResult) -> String {
215 match result.status {
216 ActionStatus::Succeeded => match &result.output {
217 Some(v) => serde_json::to_string(v).unwrap_or_else(|_| v.to_string()),
218 None => String::new(),
219 },
220 ActionStatus::Rejected => format!("[REJECTED] {}", result.error.as_deref().unwrap_or("")),
221 ActionStatus::Failed => format!("[FAILED] {}", result.error.as_deref().unwrap_or("")),
222 _ => format!(
223 "[{:?}] {}",
224 result.status,
225 result.error.as_deref().unwrap_or("")
226 ),
227 }
228}
229
230#[derive(Debug, Clone)]
232pub struct CostBudget {
233 pub max_tool_calls: Option<u32>,
234 pub max_duration_ms: Option<f64>,
235 pub max_actions: Option<u32>,
236}
237
238pub struct Runtime {
252 pub state: Arc<StateStore>,
253 pub tools: Arc<TokioRwLock<HashMap<String, ToolSchema>>>,
254 pub policies: Arc<TokioRwLock<PolicyEngine>>,
255 pub session_policies: Arc<TokioRwLock<HashMap<String, Arc<TokioRwLock<PolicyEngine>>>>>,
266 pub log: Arc<TokioMutex<EventLog>>,
267 pub rate_limiter: Arc<RateLimiter>,
268 pub result_cache: Arc<ResultCache>,
269 tool_executor: TokioMutex<Option<Arc<dyn ToolExecutor>>>,
270 idempotency_cache: TokioMutex<HashMap<String, ActionResult>>,
271 cost_budget: TokioRwLock<Option<CostBudget>>,
272 capabilities: TokioRwLock<Option<CapabilitySet>>,
273 inference_engine: Option<Arc<car_inference::InferenceEngine>>,
274 memgine: Option<Arc<TokioMutex<car_memgine::MemgineEngine>>>,
276 auto_distill: bool,
278 trajectory_store: Option<Arc<car_memgine::TrajectoryStore>>,
280 replan_callback: TokioMutex<Option<Arc<dyn ReplanCallback>>>,
282 replan_config: TokioRwLock<ReplanConfig>,
284 pub registry: Arc<crate::registry::ToolRegistry>,
286}
287
288impl Runtime {
289 pub fn new() -> Self {
290 Self {
291 state: Arc::new(StateStore::new()),
292 tools: Arc::new(TokioRwLock::new(HashMap::new())),
293 policies: Arc::new(TokioRwLock::new(PolicyEngine::new())),
294 session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
295 log: Arc::new(TokioMutex::new(EventLog::new())),
296 rate_limiter: Arc::new(RateLimiter::new()),
297 result_cache: Arc::new(ResultCache::new()),
298 tool_executor: TokioMutex::new(None),
299 idempotency_cache: TokioMutex::new(HashMap::new()),
300 cost_budget: TokioRwLock::new(None),
301 capabilities: TokioRwLock::new(None),
302 inference_engine: None,
303 memgine: None,
304 auto_distill: false,
305 trajectory_store: None,
306 replan_callback: TokioMutex::new(None),
307 replan_config: TokioRwLock::new(ReplanConfig::default()),
308 registry: Arc::new(crate::registry::ToolRegistry::new()),
309 }
310 }
311
312 pub fn with_shared(
315 state: Arc<StateStore>,
316 log: Arc<TokioMutex<EventLog>>,
317 policies: Arc<TokioRwLock<PolicyEngine>>,
318 ) -> Self {
319 Self {
320 state,
321 tools: Arc::new(TokioRwLock::new(HashMap::new())),
322 policies,
323 session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
328 log,
329 rate_limiter: Arc::new(RateLimiter::new()),
330 result_cache: Arc::new(ResultCache::new()),
331 tool_executor: TokioMutex::new(None),
332 idempotency_cache: TokioMutex::new(HashMap::new()),
333 cost_budget: TokioRwLock::new(None),
334 capabilities: TokioRwLock::new(None),
335 inference_engine: None,
336 memgine: None,
337 auto_distill: false,
338 trajectory_store: None,
339 replan_callback: TokioMutex::new(None),
340 replan_config: TokioRwLock::new(ReplanConfig::default()),
341 registry: Arc::new(crate::registry::ToolRegistry::new()),
342 }
343 }
344
345 pub async fn open_session(&self) -> String {
363 let id = Uuid::new_v4().to_string();
364 let mut sessions = self.session_policies.write().await;
365 sessions.insert(id.clone(), Arc::new(TokioRwLock::new(PolicyEngine::new())));
366 id
367 }
368
369 pub async fn close_session(&self, session_id: &str) -> bool {
374 let mut sessions = self.session_policies.write().await;
375 sessions.remove(session_id).is_some()
376 }
377
378 pub async fn register_policy_in_session(
387 &self,
388 session_id: &str,
389 name: &str,
390 check: car_policy::PolicyCheck,
391 description: &str,
392 ) -> Result<(), String> {
393 let engine = {
394 let sessions = self.session_policies.read().await;
395 sessions
396 .get(session_id)
397 .cloned()
398 .ok_or_else(|| format!("unknown session id '{session_id}'"))?
399 };
400 let mut engine = engine.write().await;
401 engine.register(name, check, description);
402 Ok(())
403 }
404
405 pub async fn session_exists(&self, session_id: &str) -> bool {
409 self.session_policies.read().await.contains_key(session_id)
410 }
411
412 pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
415 self.inference_engine = Some(engine);
416 if let Ok(mut tools) = self.tools.try_write() {
418 for schema in car_inference::service::all_schemas() {
419 tools.insert(schema.name.clone(), schema);
420 }
421 }
422 self
423 }
424
425 pub fn with_learning(
429 mut self,
430 memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>,
431 auto_distill: bool,
432 ) -> Self {
433 self.memgine = Some(memgine);
434 self.auto_distill = auto_distill;
435 self
436 }
437
438 pub fn with_memgine(self, memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>) -> Self {
440 self.with_learning(memgine, true)
441 }
442
443 pub fn with_trajectory_store(mut self, store: Arc<car_memgine::TrajectoryStore>) -> Self {
445 self.trajectory_store = Some(store);
446 self
447 }
448
449 pub fn with_executor(self, executor: Arc<dyn ToolExecutor>) -> Self {
450 if let Ok(mut guard) = self.tool_executor.try_lock() {
452 *guard = Some(executor);
453 }
454 self
455 }
456
457 pub async fn set_executor(&self, executor: Arc<dyn ToolExecutor>) {
460 *self.tool_executor.lock().await = Some(executor);
461 }
462
463 pub fn with_event_log(mut self, log: EventLog) -> Self {
464 self.log = Arc::new(TokioMutex::new(log));
465 self
466 }
467
468 pub fn with_replan(self, callback: Arc<dyn ReplanCallback>, config: ReplanConfig) -> Self {
470 if let Ok(mut guard) = self.replan_callback.try_lock() {
471 *guard = Some(callback);
472 }
473 if let Ok(mut guard) = self.replan_config.try_write() {
474 *guard = config;
475 }
476 self
477 }
478
479 pub async fn set_replan_callback(&self, callback: Arc<dyn ReplanCallback>) {
481 *self.replan_callback.lock().await = Some(callback);
482 }
483
484 pub async fn set_replan_config(&self, config: ReplanConfig) {
486 *self.replan_config.write().await = config;
487 }
488
489 pub async fn register_tool(&self, name: &str) {
491 let schema = ToolSchema {
492 name: name.to_string(),
493 description: String::new(),
494 parameters: serde_json::Value::Object(Default::default()),
495 returns: None,
496 idempotent: false,
497 cache_ttl_secs: None,
498 rate_limit: None,
499 };
500 self.register_tool_schema(schema).await;
501 }
502
503 pub async fn register_tool_schema(&self, schema: ToolSchema) {
505 if let Some(ttl) = schema.cache_ttl_secs {
507 self.result_cache.enable_caching(&schema.name, ttl).await;
508 }
509 if let Some(ref rl) = schema.rate_limit {
511 self.rate_limiter
512 .set_limit(
513 &schema.name,
514 RateLimit {
515 max_calls: rl.max_calls,
516 interval_secs: rl.interval_secs,
517 },
518 )
519 .await;
520 }
521 self.tools.write().await.insert(schema.name.clone(), schema);
522 }
523
524 pub async fn register_tool_entry(&self, entry: crate::registry::ToolEntry) {
528 let schema = entry.schema.clone();
529 self.registry.register(entry).await;
530 self.register_tool_schema(schema).await;
531 }
532
533 pub async fn register_agent_basics(&self) {
538 for entry in crate::agent_basics::entries() {
539 self.register_tool_entry(entry).await;
540 }
541 }
542
543 pub async fn tool_schemas(&self) -> Vec<ToolSchema> {
545 self.tools.read().await.values().cloned().collect()
546 }
547
548 pub async fn set_cost_budget(&self, budget: CostBudget) {
550 *self.cost_budget.write().await = Some(budget);
551 }
552
553 pub async fn set_capabilities(&self, caps: CapabilitySet) {
555 *self.capabilities.write().await = Some(caps);
556 }
557
558 pub async fn set_rate_limit(&self, tool: &str, max_calls: u32, interval_secs: f64) {
564 self.rate_limiter
565 .set_limit(
566 tool,
567 RateLimit {
568 max_calls,
569 interval_secs,
570 },
571 )
572 .await;
573 }
574
575 pub async fn enable_tool_cache(&self, tool: &str, ttl_secs: u64) {
577 self.result_cache.enable_caching(tool, ttl_secs).await;
578 }
579
580 #[instrument(
590 name = "proposal.execute",
591 skip_all,
592 fields(
593 proposal_id = %proposal.id,
594 action_count = proposal.actions.len(),
595 )
596 )]
597 pub async fn execute(&self, proposal: &ActionProposal) -> ProposalResult {
598 let token = tokio_util::sync::CancellationToken::new();
601 self.execute_with_cancel(proposal, &token).await
602 }
603
604 pub async fn execute_with_session(
616 &self,
617 proposal: &ActionProposal,
618 session_id: &str,
619 ) -> ProposalResult {
620 let token = tokio_util::sync::CancellationToken::new();
621 self.execute_with_session_and_cancel(proposal, session_id, &token)
622 .await
623 }
624
625 pub async fn execute_with_session_and_cancel(
629 &self,
630 proposal: &ActionProposal,
631 session_id: &str,
632 cancel: &tokio_util::sync::CancellationToken,
633 ) -> ProposalResult {
634 self.execute_with_optional_session(proposal, Some(session_id), None, cancel)
635 .await
636 }
637
638 pub async fn execute_with_cancel(
666 &self,
667 proposal: &ActionProposal,
668 cancel: &tokio_util::sync::CancellationToken,
669 ) -> ProposalResult {
670 self.execute_with_optional_session(proposal, None, None, cancel)
671 .await
672 }
673
674 pub async fn execute_scoped(
691 &self,
692 proposal: &ActionProposal,
693 scope: &crate::scope::RuntimeScope,
694 ) -> ProposalResult {
695 let token = tokio_util::sync::CancellationToken::new();
696 self.execute_scoped_with_cancel(proposal, scope, &token)
697 .await
698 }
699
700 pub async fn execute_scoped_with_cancel(
705 &self,
706 proposal: &ActionProposal,
707 scope: &crate::scope::RuntimeScope,
708 cancel: &tokio_util::sync::CancellationToken,
709 ) -> ProposalResult {
710 self.execute_with_optional_session(proposal, None, Some(scope), cancel)
711 .await
712 }
713
714 async fn execute_with_optional_session(
720 &self,
721 proposal: &ActionProposal,
722 session_id: Option<&str>,
723 scope: Option<&crate::scope::RuntimeScope>,
724 cancel: &tokio_util::sync::CancellationToken,
725 ) -> ProposalResult {
726 let config = self.replan_config.read().await.clone();
727 let mut current_proposal = proposal.clone();
728 let mut attempt: u32 = 0;
729
730 if let Some(s) = scope {
736 if !s.is_unscoped() {
737 let mut props: HashMap<String, Value> = HashMap::new();
738 if let Some(cid) = &s.caller_id {
739 props.insert("caller_id".to_string(), Value::from(cid.as_str()));
740 }
741 if let Some(tid) = &s.tenant_id {
742 props.insert("tenant_id".to_string(), Value::from(tid.as_str()));
743 }
744 if !s.claims.is_empty() {
745 if let Ok(claims_json) = serde_json::to_value(&s.claims) {
746 props.insert("claims".to_string(), claims_json);
747 }
748 }
749 let mut log = self.log.lock().await;
750 log.append(EventKind::SessionScope, None, Some(&proposal.id), props);
751 }
752 }
753
754 loop {
755 let (result, state_before_map) = self
756 .execute_inner_with_cancel(¤t_proposal, Some(cancel), session_id, scope)
757 .await;
758
759 let aborted = result
761 .results
762 .iter()
763 .any(|r| r.status == ActionStatus::Failed);
764 if !aborted || attempt >= config.max_replans {
765 if aborted && attempt > 0 {
766 let mut log = self.log.lock().await;
768 log.append(
769 EventKind::ReplanExhausted,
770 None,
771 Some(&proposal.id),
772 [("attempts".to_string(), Value::from(attempt))].into(),
773 );
774 }
775
776 let outcome = if !aborted {
778 if attempt > 0 {
779 car_memgine::TrajectoryOutcome::ReplanSuccess
780 } else {
781 car_memgine::TrajectoryOutcome::Success
782 }
783 } else if attempt > 0 {
784 car_memgine::TrajectoryOutcome::ReplanExhausted
785 } else {
786 car_memgine::TrajectoryOutcome::Failed
787 };
788 if let Some(err) = self.persist_trajectory(
789 proposal,
790 ¤t_proposal,
791 &result,
792 outcome,
793 attempt,
794 &state_before_map,
795 ) {
796 let mut log = self.log.lock().await;
797 log.append(
798 EventKind::ActionFailed,
799 None,
800 Some(&proposal.id),
801 [(
802 "trajectory_persist_error".to_string(),
803 Value::from(err.as_str()),
804 )]
805 .into(),
806 );
807 }
808
809 return result;
810 }
811
812 let callback = {
814 let guard = self.replan_callback.lock().await;
815 guard.clone()
816 };
817 let Some(callback) = callback else {
818 if let Some(err) = self.persist_trajectory(
820 proposal,
821 ¤t_proposal,
822 &result,
823 car_memgine::TrajectoryOutcome::Failed,
824 attempt,
825 &state_before_map,
826 ) {
827 let mut log = self.log.lock().await;
828 log.append(
829 EventKind::ActionFailed,
830 None,
831 Some(&proposal.id),
832 [(
833 "trajectory_persist_error".to_string(),
834 Value::from(err.as_str()),
835 )]
836 .into(),
837 );
838 }
839 return result;
840 };
841
842 let failed_actions: Vec<FailedActionSummary> = result
844 .results
845 .iter()
846 .filter(|r| r.status == ActionStatus::Failed)
847 .map(|r| {
848 let action = current_proposal
849 .actions
850 .iter()
851 .find(|a| a.id == r.action_id);
852 FailedActionSummary {
853 action_id: r.action_id.clone(),
854 tool: action.and_then(|a| a.tool.clone()),
855 error: r.error.clone().unwrap_or_default(),
856 parameters: action.map(|a| a.parameters.clone()).unwrap_or_default(),
857 }
858 })
859 .collect();
860
861 let completed_action_ids: Vec<String> = result
862 .results
863 .iter()
864 .filter(|r| r.status == ActionStatus::Succeeded)
865 .map(|r| r.action_id.clone())
866 .collect();
867
868 let ctx = ReplanContext {
869 proposal_id: proposal.id.clone(),
870 attempt: attempt + 1,
871 failed_actions,
872 completed_action_ids,
873 state_snapshot: self.state.snapshot(),
874 replans_remaining: config.max_replans.saturating_sub(attempt + 1),
875 original_source: proposal.source.clone(),
876 original_action_count: proposal.actions.len(),
877 original_context: proposal.context.clone(),
878 };
879
880 if config.delay_ms > 0 {
882 tokio::time::sleep(Duration::from_millis(config.delay_ms)).await;
883 }
884
885 {
887 let mut log = self.log.lock().await;
888 log.append(
889 EventKind::ReplanAttempted,
890 None,
891 Some(&proposal.id),
892 [
893 ("attempt".to_string(), Value::from(attempt + 1)),
894 (
895 "failed_count".to_string(),
896 Value::from(ctx.failed_actions.len()),
897 ),
898 ]
899 .into(),
900 );
901 }
902
903 match callback.replan(&ctx).await {
905 Ok(new_proposal) => {
906 if config.verify_before_execute {
908 let current_state = self.state.snapshot();
909 let tools_guard = self.tools.read().await;
916 let vr = car_verify::verify_with_schemas(
917 &new_proposal,
918 Some(¤t_state),
919 Some(&tools_guard),
920 100,
921 );
922 drop(tools_guard);
923 if !vr.valid {
924 let error_msgs: Vec<String> = vr
925 .issues
926 .iter()
927 .filter(|i| i.severity == "error")
928 .map(|i| i.message.clone())
929 .collect();
930 let mut log = self.log.lock().await;
931 log.append(
932 EventKind::ReplanRejected,
933 None,
934 Some(&proposal.id),
935 [
936 ("errors".to_string(), Value::from(error_msgs.join("; "))),
937 ("attempt".to_string(), Value::from(attempt + 1)),
938 ]
939 .into(),
940 );
941 attempt += 1;
943 continue;
944 }
945 }
946
947 {
949 let mut log = self.log.lock().await;
950 log.append(
951 EventKind::ReplanProposalReceived,
952 None,
953 Some(&proposal.id),
954 [
955 ("attempt".to_string(), Value::from(attempt + 1)),
956 (
957 "new_action_count".to_string(),
958 Value::from(new_proposal.actions.len()),
959 ),
960 ]
961 .into(),
962 );
963 }
964 current_proposal = new_proposal;
965 attempt += 1;
966 }
967 Err(e) => {
968 let mut log = self.log.lock().await;
970 log.append(
971 EventKind::ReplanExhausted,
972 None,
973 Some(&proposal.id),
974 [
975 ("reason".to_string(), Value::from("callback_error")),
976 ("error".to_string(), Value::from(e.as_str())),
977 ("attempt".to_string(), Value::from(attempt + 1)),
978 ]
979 .into(),
980 );
981 if let Some(err) = self.persist_trajectory(
982 proposal,
983 ¤t_proposal,
984 &result,
985 car_memgine::TrajectoryOutcome::Failed,
986 attempt,
987 &state_before_map,
988 ) {
989 log.append(
990 EventKind::ActionFailed,
991 None,
992 Some(&proposal.id),
993 [(
994 "trajectory_persist_error".to_string(),
995 Value::from(err.as_str()),
996 )]
997 .into(),
998 );
999 }
1000 return result;
1001 }
1002 }
1003 }
1004 }
1005
1006 fn persist_trajectory(
1008 &self,
1009 proposal: &ActionProposal,
1010 current_proposal: &ActionProposal,
1011 result: &ProposalResult,
1012 outcome: car_memgine::TrajectoryOutcome,
1013 attempt: u32,
1014 state_before_map: &HashMap<String, HashMap<String, Value>>,
1015 ) -> Option<String> {
1016 let store = self.trajectory_store.as_ref()?;
1017
1018 let trace_events: Vec<car_memgine::TraceEvent> = result
1019 .results
1020 .iter()
1021 .map(|r| {
1022 let kind = match r.status {
1023 ActionStatus::Succeeded => "action_succeeded",
1024 ActionStatus::Failed => "action_failed",
1025 ActionStatus::Rejected => "action_rejected",
1026 ActionStatus::Skipped => "action_skipped",
1027 _ => "unknown",
1028 };
1029 let tool = current_proposal
1030 .actions
1031 .iter()
1032 .find(|a| a.id == r.action_id)
1033 .and_then(|a| a.tool.clone());
1034 let reward = match r.status {
1035 ActionStatus::Succeeded => Some(1.0),
1036 ActionStatus::Failed => Some(0.0),
1037 ActionStatus::Rejected => Some(0.0),
1038 ActionStatus::Skipped => None,
1039 _ => None,
1040 };
1041 car_memgine::TraceEvent {
1042 kind: kind.to_string(),
1043 action_id: Some(r.action_id.clone()),
1044 tool,
1045 data: r
1046 .error
1047 .as_ref()
1048 .map(|e| serde_json::json!({"error": e}))
1049 .unwrap_or(serde_json::json!({})),
1050 duration_ms: r.duration_ms,
1051 state_before: state_before_map.get(&r.action_id).cloned(),
1052 state_after: if !r.state_changes.is_empty() {
1053 Some(r.state_changes.clone())
1054 } else {
1055 None
1056 },
1057 reward,
1058 }
1059 })
1060 .collect();
1061
1062 let trajectory = car_memgine::Trajectory {
1063 proposal_id: proposal.id.clone(),
1064 source: proposal.source.clone(),
1065 action_count: current_proposal.actions.len(),
1066 events: trace_events,
1067 outcome,
1068 timestamp: chrono::Utc::now(),
1069 duration_ms: result.cost.total_duration_ms,
1070 replan_attempts: attempt,
1071 };
1072
1073 match store.append(&trajectory) {
1074 Ok(()) => None,
1075 Err(e) => Some(e.to_string()),
1076 }
1077 }
1078
1079 pub async fn plan_and_execute(
1085 &self,
1086 candidates: &[ActionProposal],
1087 planner_config: Option<car_planner::PlannerConfig>,
1088 feedback: Option<&car_planner::ToolFeedback>,
1089 ) -> ProposalResult {
1090 if candidates.is_empty() {
1091 return ProposalResult {
1092 proposal_id: "empty".to_string(),
1093 results: vec![],
1094 cost: car_ir::CostSummary::default(),
1095 };
1096 }
1097
1098 let planner = car_planner::Planner::new(planner_config.unwrap_or_default());
1100 let tools_guard = self.tools.read().await;
1101 let tool_names: std::collections::HashSet<String> = tools_guard.keys().cloned().collect();
1102 drop(tools_guard);
1103
1104 let pre_plan_snapshot = self.state.snapshot();
1105 let pre_plan_transitions = self.state.transition_count();
1106 let ranked = planner.rank_with_feedback(
1107 candidates,
1108 Some(&pre_plan_snapshot),
1109 Some(&tool_names),
1110 feedback,
1111 );
1112
1113 let mut first_failure: Option<ProposalResult> = None;
1115 for scored in &ranked {
1116 if !scored.valid {
1117 continue;
1118 }
1119
1120 self.state
1123 .restore(pre_plan_snapshot.clone(), pre_plan_transitions);
1124
1125 let proposal = &candidates[scored.index];
1126 let result = self.execute(proposal).await;
1127
1128 if result.all_succeeded() {
1129 return result;
1130 }
1131
1132 tracing::info!(
1133 proposal_id = %proposal.id,
1134 score = scored.score,
1135 "plan_and_execute: proposal failed, trying next candidate"
1136 );
1137
1138 if first_failure.is_none() {
1139 first_failure = Some(result);
1140 }
1141 }
1142
1143 first_failure.unwrap_or_else(|| ProposalResult {
1145 proposal_id: candidates[0].id.clone(),
1146 results: vec![],
1147 cost: car_ir::CostSummary::default(),
1148 })
1149 }
1150
1151 async fn execute_inner_with_cancel(
1159 &self,
1160 proposal: &ActionProposal,
1161 cancel: Option<&tokio_util::sync::CancellationToken>,
1162 session_id: Option<&str>,
1163 scope: Option<&crate::scope::RuntimeScope>,
1164 ) -> (ProposalResult, HashMap<String, HashMap<String, Value>>) {
1165 let trace_id = Uuid::new_v4().to_string();
1167
1168 let root_span_id = {
1170 let mut log = self.log.lock().await;
1171 log.begin_span(
1172 "proposal.execute",
1173 &trace_id,
1174 None,
1175 [("proposal_id".to_string(), Value::from(proposal.id.as_str()))].into(),
1176 )
1177 };
1178
1179 {
1181 let mut log = self.log.lock().await;
1182 log.append(
1183 EventKind::ProposalReceived,
1184 None,
1185 Some(&proposal.id),
1186 [
1187 ("source".to_string(), Value::from(proposal.source.as_str())),
1188 (
1189 "action_count".to_string(),
1190 Value::from(proposal.actions.len()),
1191 ),
1192 ]
1193 .into(),
1194 );
1195 }
1196
1197 {
1199 let caps = self.capabilities.read().await;
1200 if let Some(ref cap) = *caps {
1201 if !cap.actions_within_budget(proposal.actions.len() as u32) {
1202 let mut action_results = Vec::new();
1203 for action in &proposal.actions {
1204 action_results.push(rejected_result(
1205 &action.id,
1206 format!(
1207 "capability denied: proposal has {} actions, max allowed is {:?}",
1208 proposal.actions.len(),
1209 cap.max_actions
1210 ),
1211 ));
1212 }
1213 return (
1214 ProposalResult {
1215 proposal_id: proposal.id.clone(),
1216 results: action_results,
1217 cost: CostSummary::default(),
1218 },
1219 HashMap::new(),
1220 );
1221 }
1222 }
1223 }
1224
1225 let snapshot = self.state.snapshot();
1227 let transition_count = self.state.transition_count();
1228
1229 let mut results: Vec<ActionResult> = Vec::new();
1230 let mut state_before_map: HashMap<String, HashMap<String, Value>> = HashMap::new();
1232 let mut aborted = false;
1233 let mut budget_exceeded = false;
1234 let mut total_retries: u32 = 0;
1235
1236 let mut running_tool_calls: u32 = 0;
1238 let mut running_actions: u32 = 0;
1239 let mut running_duration_ms: f64 = 0.0;
1240
1241 let budget = self.cost_budget.read().await.clone();
1243
1244 let levels = build_dag(&proposal.actions);
1246
1247 let mut canceled = false;
1248 for level in &levels {
1249 if !canceled {
1255 if let Some(token) = cancel {
1256 if token.is_cancelled() {
1257 canceled = true;
1258 }
1259 }
1260 }
1261 if canceled {
1262 for &idx in level {
1263 results.push(canceled_result(
1264 &proposal.actions[idx].id,
1265 "cancellation requested by caller",
1266 ));
1267 }
1268 continue;
1269 }
1270 if aborted || budget_exceeded {
1271 let skip_reason = if budget_exceeded {
1272 "cost budget exceeded"
1273 } else {
1274 "skipped due to earlier abort"
1275 };
1276 for &idx in level {
1277 results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1278 }
1279 continue;
1280 }
1281
1282 let has_abort = level
1284 .iter()
1285 .any(|&i| proposal.actions[i].failure_behavior == FailureBehavior::Abort);
1286
1287 if level.len() == 1 || has_abort {
1288 for &idx in level {
1290 if aborted || budget_exceeded {
1291 let skip_reason = if budget_exceeded {
1292 "cost budget exceeded"
1293 } else {
1294 "skipped due to abort"
1295 };
1296 results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1297 continue;
1298 }
1299
1300 if let Some(ref b) = budget {
1302 if let Some(max) = b.max_actions {
1303 if running_actions >= max {
1304 budget_exceeded = true;
1305 results.push(skipped_result(
1306 &proposal.actions[idx].id,
1307 "cost budget exceeded",
1308 ));
1309 continue;
1310 }
1311 }
1312 if let Some(max) = b.max_tool_calls {
1313 if proposal.actions[idx].action_type == ActionType::ToolCall
1314 && running_tool_calls >= max
1315 {
1316 budget_exceeded = true;
1317 results.push(skipped_result(
1318 &proposal.actions[idx].id,
1319 "cost budget exceeded",
1320 ));
1321 continue;
1322 }
1323 }
1324 if let Some(max) = b.max_duration_ms {
1325 if running_duration_ms >= max {
1326 budget_exceeded = true;
1327 results.push(skipped_result(
1328 &proposal.actions[idx].id,
1329 "cost budget exceeded",
1330 ));
1331 continue;
1332 }
1333 }
1334 }
1335
1336 state_before_map.insert(
1337 proposal.actions[idx].id.clone(),
1338 snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1339 );
1340 let (ar, action_retries) = self
1341 .process_action(
1342 &proposal.actions[idx],
1343 &proposal.id,
1344 &trace_id,
1345 &root_span_id,
1346 session_id,
1347 scope,
1348 )
1349 .await;
1350 total_retries += action_retries;
1351
1352 if ar.status == ActionStatus::Succeeded
1354 && proposal.actions[idx].action_type == ActionType::ToolCall
1355 {
1356 running_tool_calls += 1;
1357 }
1358 if ar.status != ActionStatus::Skipped {
1359 running_actions += 1;
1360 }
1361 if let Some(d) = ar.duration_ms {
1362 running_duration_ms += d;
1363 }
1364
1365 if ar.status == ActionStatus::Failed
1366 && proposal.actions[idx].failure_behavior == FailureBehavior::Abort
1367 {
1368 aborted = true;
1369 }
1370 results.push(ar);
1371 }
1372 } else {
1373 for &idx in level {
1376 state_before_map.insert(
1377 proposal.actions[idx].id.clone(),
1378 snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1379 );
1380 }
1381 let futs: Vec<_> = level
1382 .iter()
1383 .map(|&idx| {
1384 self.process_action(
1385 &proposal.actions[idx],
1386 &proposal.id,
1387 &trace_id,
1388 &root_span_id,
1389 session_id,
1390 scope,
1391 )
1392 })
1393 .collect();
1394 let level_results = futures::future::join_all(futs).await;
1395
1396 for (i, (ar, action_retries)) in level_results.into_iter().enumerate() {
1397 let idx = level[i];
1398 total_retries += action_retries;
1399 if ar.status == ActionStatus::Succeeded
1400 && proposal.actions[idx].action_type == ActionType::ToolCall
1401 {
1402 running_tool_calls += 1;
1403 }
1404 if ar.status != ActionStatus::Skipped {
1405 running_actions += 1;
1406 }
1407 if let Some(d) = ar.duration_ms {
1408 running_duration_ms += d;
1409 }
1410 results.push(ar);
1411 }
1412 }
1413 }
1414
1415 if aborted {
1417 self.state.restore(snapshot.clone(), transition_count);
1418
1419 let mut log = self.log.lock().await;
1420 log.append(
1421 EventKind::StateSnapshot,
1422 None,
1423 Some(&proposal.id),
1424 [(
1425 "state".to_string(),
1426 serde_json::to_value(&snapshot).unwrap_or_default(),
1427 )]
1428 .into(),
1429 );
1430 log.append(
1431 EventKind::StateRollback,
1432 None,
1433 Some(&proposal.id),
1434 [(
1435 "rolled_back_to".to_string(),
1436 Value::from("pre-proposal snapshot"),
1437 )]
1438 .into(),
1439 );
1440
1441 let mut cache = self.idempotency_cache.lock().await;
1443 for r in &results {
1444 if r.status == ActionStatus::Succeeded {
1445 for action in &proposal.actions {
1446 if action.id == r.action_id && action.idempotent {
1447 cache.remove(&idempotency_key(action));
1448 }
1449 }
1450 }
1451 }
1452 }
1453
1454 let action_order: HashMap<String, usize> = proposal
1456 .actions
1457 .iter()
1458 .enumerate()
1459 .map(|(i, a)| (a.id.clone(), i))
1460 .collect();
1461 results.sort_by_key(|r| {
1462 action_order
1463 .get(&r.action_id)
1464 .copied()
1465 .unwrap_or(usize::MAX)
1466 });
1467
1468 let mut cost = CostSummary::default();
1470 for r in &results {
1471 let action = action_order
1472 .get(&r.action_id)
1473 .and_then(|&i| proposal.actions.get(i));
1474 match r.status {
1475 ActionStatus::Succeeded => {
1476 cost.actions_executed += 1;
1477 if let Some(a) = action {
1478 if a.action_type == ActionType::ToolCall {
1479 cost.tool_calls += 1;
1480 }
1481 }
1482 }
1483 ActionStatus::Failed | ActionStatus::Rejected => {
1484 cost.actions_executed += 1;
1485 }
1486 ActionStatus::Skipped => {
1487 cost.actions_skipped += 1;
1488 }
1489 _ => {}
1490 }
1491 if let Some(d) = r.duration_ms {
1492 cost.total_duration_ms += d;
1493 }
1494 }
1495
1496 cost.retries = total_retries;
1498
1499 {
1501 let span_status = if aborted {
1502 SpanStatus::Error
1503 } else {
1504 SpanStatus::Ok
1505 };
1506 let mut log = self.log.lock().await;
1507 log.end_span(&root_span_id, span_status);
1508 }
1509
1510 let proposal_result = ProposalResult {
1511 proposal_id: proposal.id.clone(),
1512 results,
1513 cost,
1514 };
1515
1516 if self.auto_distill {
1518 if let Some(ref memgine) = self.memgine {
1519 let trace_events: Vec<car_memgine::TraceEvent> = proposal_result
1521 .results
1522 .iter()
1523 .map(|r| {
1524 let kind = match r.status {
1525 ActionStatus::Succeeded => "action_succeeded",
1526 ActionStatus::Failed => "action_failed",
1527 ActionStatus::Rejected => "action_rejected",
1528 ActionStatus::Skipped => "action_skipped",
1529 _ => "unknown",
1530 };
1531 let tool = proposal
1533 .actions
1534 .iter()
1535 .find(|a| a.id == r.action_id)
1536 .and_then(|a| a.tool.clone());
1537 let mut data = serde_json::Map::new();
1538 if let Some(ref e) = r.error {
1539 data.insert("error".into(), Value::from(e.as_str()));
1540 }
1541 if let Some(ref o) = r.output {
1542 data.insert("output".into(), o.clone());
1543 }
1544 car_memgine::TraceEvent {
1545 kind: kind.to_string(),
1546 action_id: Some(r.action_id.clone()),
1547 tool,
1548 data: Value::Object(data),
1549 duration_ms: r.duration_ms,
1550 reward: match r.status {
1551 ActionStatus::Succeeded => Some(1.0),
1552 ActionStatus::Failed | ActionStatus::Rejected => Some(0.0),
1553 _ => None,
1554 },
1555 ..Default::default()
1556 }
1557 })
1558 .collect();
1559
1560 let mut engine = memgine.lock().await;
1561 let skills = engine.distill_skills(&trace_events).await;
1562 if !skills.is_empty() {
1563 let count = skills.len();
1564 let tenant = scope.and_then(|s| s.tenant_id.as_deref());
1570 engine.scoped(tenant).ingest_distilled_skills(&skills);
1571
1572 let mut log = self.log.lock().await;
1574 log.append(
1575 EventKind::SkillDistilled,
1576 None,
1577 Some(&proposal_result.proposal_id),
1578 [
1579 ("skills_count".to_string(), Value::from(count)),
1580 (
1581 "skill_names".to_string(),
1582 Value::from(
1583 skills.iter().map(|s| s.name.as_str()).collect::<Vec<_>>(),
1584 ),
1585 ),
1586 ]
1587 .into(),
1588 );
1589
1590 let threshold = engine.evolution_threshold();
1592 let domains = engine.domains_needing_evolution(threshold);
1593 for domain in &domains {
1594 let failed: Vec<car_memgine::TraceEvent> = trace_events
1596 .iter()
1597 .filter(|e| {
1598 matches!(e.kind.as_str(), "action_failed" | "action_rejected")
1599 })
1600 .cloned()
1601 .collect();
1602 if !failed.is_empty() {
1603 let evolved = engine.evolve_skills(&failed, domain).await;
1604 if !evolved.is_empty() {
1605 log.append(
1606 EventKind::EvolutionTriggered,
1607 None,
1608 Some(&proposal_result.proposal_id),
1609 [
1610 ("domain".to_string(), Value::from(domain.as_str())),
1611 ("new_skills".to_string(), Value::from(evolved.len())),
1612 ]
1613 .into(),
1614 );
1615 }
1616 }
1617 }
1618 }
1619 }
1620 }
1621
1622 (proposal_result, state_before_map)
1623 }
1624
1625 async fn process_action(
1628 &self,
1629 action: &Action,
1630 proposal_id: &str,
1631 trace_id: &str,
1632 parent_span_id: &str,
1633 session_id: Option<&str>,
1634 scope: Option<&crate::scope::RuntimeScope>,
1635 ) -> (ActionResult, u32) {
1636 let action_type_name = serde_json::to_string(&action.action_type)
1638 .unwrap_or_default()
1639 .trim_matches('"')
1640 .to_string();
1641 let span_name = format!("action.{}", action_type_name);
1642
1643 let action_span_id = {
1645 let mut attrs: HashMap<String, Value> = HashMap::new();
1646 attrs.insert("action_id".to_string(), Value::from(action.id.as_str()));
1647 if let Some(ref tool) = action.tool {
1648 attrs.insert("tool".to_string(), Value::from(tool.as_str()));
1649 }
1650 let mut log = self.log.lock().await;
1651 log.begin_span(&span_name, trace_id, Some(parent_span_id), attrs)
1652 };
1653
1654 let (result, retries) = self
1656 .process_action_inner(action, proposal_id, session_id, scope)
1657 .await;
1658
1659 let span_status = match result.status {
1661 ActionStatus::Succeeded => SpanStatus::Ok,
1662 ActionStatus::Failed | ActionStatus::Rejected => SpanStatus::Error,
1663 _ => SpanStatus::Unset,
1664 };
1665 {
1666 let mut log = self.log.lock().await;
1667 log.end_span(&action_span_id, span_status);
1668 }
1669
1670 (result, retries)
1671 }
1672
1673 #[instrument(
1676 name = "action.process",
1677 skip_all,
1678 fields(
1679 action_id = %action.id,
1680 action_type = ?action.action_type,
1681 tool = action.tool.as_deref().unwrap_or("none"),
1682 )
1683 )]
1684 async fn process_action_inner(
1685 &self,
1686 action: &Action,
1687 proposal_id: &str,
1688 session_id: Option<&str>,
1689 scope: Option<&crate::scope::RuntimeScope>,
1690 ) -> (ActionResult, u32) {
1691 if action.idempotent {
1693 let key = idempotency_key(action);
1694 let cache = self.idempotency_cache.lock().await;
1695 if let Some(cached) = cache.get(&key) {
1696 let mut log = self.log.lock().await;
1697 log.append(
1698 EventKind::ActionDeduplicated,
1699 Some(&action.id),
1700 Some(proposal_id),
1701 [(
1702 "cached_action_id".to_string(),
1703 Value::from(cached.action_id.as_str()),
1704 )]
1705 .into(),
1706 );
1707 return (
1708 ActionResult {
1709 action_id: action.id.clone(),
1710 status: cached.status.clone(),
1711 output: cached.output.clone(),
1712 error: cached.error.clone(),
1713 state_changes: cached.state_changes.clone(),
1714 duration_ms: Some(0.0),
1715 timestamp: chrono::Utc::now(),
1716 },
1717 0,
1718 );
1719 }
1720 }
1721
1722 {
1724 let caps = self.capabilities.read().await;
1725 if let Some(ref cap) = *caps {
1726 if action.action_type == ActionType::ToolCall {
1728 if let Some(ref tool_name) = action.tool {
1729 if !cap.tool_allowed(tool_name) {
1730 let mut log = self.log.lock().await;
1731 log.append(
1732 EventKind::ActionRejected,
1733 Some(&action.id),
1734 Some(proposal_id),
1735 HashMap::new(),
1736 );
1737 return (
1738 rejected_result(
1739 &action.id,
1740 format!("capability denied: tool '{}' not allowed", tool_name),
1741 ),
1742 0,
1743 );
1744 }
1745 }
1746 }
1747
1748 if action.action_type == ActionType::StateWrite
1750 || action.action_type == ActionType::StateRead
1751 {
1752 if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
1753 if !cap.state_key_allowed(key) {
1754 let mut log = self.log.lock().await;
1755 log.append(
1756 EventKind::ActionRejected,
1757 Some(&action.id),
1758 Some(proposal_id),
1759 HashMap::new(),
1760 );
1761 return (
1762 rejected_result(
1763 &action.id,
1764 format!("capability denied: state key '{}' not allowed", key),
1765 ),
1766 0,
1767 );
1768 }
1769 }
1770 }
1771 }
1772 }
1773
1774 let tools = self.tools.read().await;
1776 let validation = validate_action(action, &self.state, &tools);
1777 drop(tools);
1778
1779 if !validation.valid() {
1780 let error = validation
1781 .errors
1782 .iter()
1783 .map(|e| e.reason.as_str())
1784 .collect::<Vec<_>>()
1785 .join("; ");
1786 let mut log = self.log.lock().await;
1787 log.append(
1788 EventKind::ActionRejected,
1789 Some(&action.id),
1790 Some(proposal_id),
1791 HashMap::new(),
1792 );
1793 return (rejected_result(&action.id, error), 0);
1794 }
1795
1796 {
1802 let mut violations = {
1803 let policies = self.policies.read().await;
1804 policies.check(action, &self.state)
1805 };
1806 if let Some(sid) = session_id {
1807 let session_engine = {
1812 let sessions = self.session_policies.read().await;
1813 sessions.get(sid).cloned()
1814 };
1815 if let Some(engine) = session_engine {
1816 let engine = engine.read().await;
1817 violations.extend(engine.check(action, &self.state));
1818 } else {
1819 let mut log = self.log.lock().await;
1825 log.append(
1826 EventKind::PolicyViolation,
1827 Some(&action.id),
1828 Some(proposal_id),
1829 HashMap::new(),
1830 );
1831 return (
1832 rejected_result(
1833 &action.id,
1834 format!(
1835 "unknown session id '{sid}' — open one via Runtime::open_session before executing under a session"
1836 ),
1837 ),
1838 0,
1839 );
1840 }
1841 }
1842 if !violations.is_empty() {
1843 let error = violations
1844 .iter()
1845 .map(|v| format!("policy '{}': {}", v.policy_name, v.reason))
1846 .collect::<Vec<_>>()
1847 .join("; ");
1848 let mut log = self.log.lock().await;
1849 log.append(
1850 EventKind::PolicyViolation,
1851 Some(&action.id),
1852 Some(proposal_id),
1853 HashMap::new(),
1854 );
1855 return (rejected_result(&action.id, error), 0);
1856 }
1857 }
1858
1859 {
1861 let mut log = self.log.lock().await;
1862 log.append(
1863 EventKind::ActionValidated,
1864 Some(&action.id),
1865 Some(proposal_id),
1866 HashMap::new(),
1867 );
1868 }
1869
1870 let (result, retries) = self.execute_with_retry(action, proposal_id, scope).await;
1872
1873 if action.idempotent && result.status == ActionStatus::Succeeded {
1875 let mut cache = self.idempotency_cache.lock().await;
1876 cache.insert(idempotency_key(action), result.clone());
1877 }
1878
1879 tracing::info!(
1880 status = ?result.status,
1881 duration_ms = result.duration_ms,
1882 "action completed"
1883 );
1884
1885 (result, retries)
1886 }
1887
1888 async fn execute_with_retry(
1891 &self,
1892 action: &Action,
1893 proposal_id: &str,
1894 scope: Option<&crate::scope::RuntimeScope>,
1895 ) -> (ActionResult, u32) {
1896 let max_attempts = if action.failure_behavior == FailureBehavior::Retry {
1897 action.max_retries + 1
1898 } else {
1899 1
1900 };
1901
1902 let mut last_error: Option<String> = None;
1903 let mut retries: u32 = 0;
1904
1905 for attempt in 0..max_attempts {
1906 if attempt > 0 {
1907 retries += 1;
1908 let delay = RETRY_BASE_DELAY_MS * RETRY_BACKOFF_FACTOR.pow(attempt as u32 - 1);
1909 tokio::time::sleep(Duration::from_millis(delay)).await;
1910 let mut log = self.log.lock().await;
1911 log.append(
1912 EventKind::ActionRetrying,
1913 Some(&action.id),
1914 Some(proposal_id),
1915 [("attempt".to_string(), Value::from(attempt + 1))].into(),
1916 );
1917 }
1918
1919 {
1920 let mut log = self.log.lock().await;
1921 log.append(
1922 EventKind::ActionExecuting,
1923 Some(&action.id),
1924 Some(proposal_id),
1925 HashMap::new(),
1926 );
1927 }
1928
1929 let start = std::time::Instant::now();
1930 let transitions_before = self.state.transition_count();
1931
1932 let exec_result = if let Some(timeout_ms) = action.timeout_ms {
1934 match timeout(
1935 Duration::from_millis(timeout_ms),
1936 self.dispatch(action, scope),
1937 )
1938 .await
1939 {
1940 Ok(r) => r,
1941 Err(_) => Err(format!("action timed out after {}ms", timeout_ms)),
1942 }
1943 } else {
1944 self.dispatch(action, scope).await
1945 };
1946
1947 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
1948
1949 match exec_result {
1950 Ok(output) => {
1951 let tenant_for_effects = scope.and_then(|s| s.tenant_id.as_deref());
1957 let scoped_state = self.state.scoped(tenant_for_effects);
1958 let mut state_changes: HashMap<String, Value> = HashMap::new();
1959 for (key, value) in &action.expected_effects {
1960 scoped_state.set(key, value.clone(), &action.id);
1961 state_changes.insert(key.clone(), value.clone());
1962 }
1963
1964 for t in self.state.transitions_since(transitions_before) {
1966 if !state_changes.contains_key(&t.key) {
1967 if let Some(v) = t.new_value {
1968 state_changes.insert(t.key.clone(), v);
1969 }
1970 }
1971 }
1972
1973 let mut log = self.log.lock().await;
1974 log.append(
1975 EventKind::ActionSucceeded,
1976 Some(&action.id),
1977 Some(proposal_id),
1978 [("duration_ms".to_string(), Value::from(duration_ms))].into(),
1979 );
1980
1981 if !state_changes.is_empty() {
1982 log.append(
1983 EventKind::StateChanged,
1984 Some(&action.id),
1985 Some(proposal_id),
1986 [(
1987 "changes".to_string(),
1988 serde_json::to_value(&state_changes).unwrap_or_default(),
1989 )]
1990 .into(),
1991 );
1992 }
1993
1994 return (
1995 ActionResult {
1996 action_id: action.id.clone(),
1997 status: ActionStatus::Succeeded,
1998 output: Some(output),
1999 error: None,
2000 state_changes,
2001 duration_ms: Some(duration_ms),
2002 timestamp: chrono::Utc::now(),
2003 },
2004 retries,
2005 );
2006 }
2007 Err(e) => {
2008 last_error = Some(e.clone());
2009 let mut log = self.log.lock().await;
2010 log.append(
2011 EventKind::ActionFailed,
2012 Some(&action.id),
2013 Some(proposal_id),
2014 [
2015 ("error".to_string(), Value::from(e.as_str())),
2016 ("attempt".to_string(), Value::from(attempt + 1)),
2017 ]
2018 .into(),
2019 );
2020 }
2021 }
2022 }
2023
2024 if action.failure_behavior == FailureBehavior::Skip {
2026 return (
2027 skipped_result(
2028 &action.id,
2029 last_error.as_deref().unwrap_or("all attempts exhausted"),
2030 ),
2031 retries,
2032 );
2033 }
2034
2035 (
2036 ActionResult {
2037 action_id: action.id.clone(),
2038 status: ActionStatus::Failed,
2039 output: None,
2040 error: last_error,
2041 state_changes: HashMap::new(),
2042 duration_ms: None,
2043 timestamp: chrono::Utc::now(),
2044 },
2045 retries,
2046 )
2047 }
2048
2049 async fn dispatch(
2059 &self,
2060 action: &Action,
2061 scope: Option<&crate::scope::RuntimeScope>,
2062 ) -> Result<Value, String> {
2063 match action.action_type {
2064 ActionType::ToolCall => {
2065 let tool_name = action.tool.as_deref().ok_or("tool_call has no tool")?;
2066 let params = Value::Object(
2067 action
2068 .parameters
2069 .iter()
2070 .map(|(k, v)| (k.clone(), v.clone()))
2071 .collect(),
2072 );
2073
2074 if let Some(cached) = self.result_cache.get(tool_name, ¶ms).await {
2076 return Ok(cached);
2077 }
2078
2079 self.rate_limiter.acquire(tool_name).await;
2081
2082 if matches!(
2084 tool_name,
2085 "infer" | "infer.grounded" | "embed" | "classify" | "transcribe" | "synthesize"
2086 ) {
2087 if let Some(ref engine) = self.inference_engine {
2088 let params = {
2091 let should_ground =
2092 tool_name == "infer.grounded" || tool_name == "infer";
2093 if should_ground {
2094 if let Some(ref memgine) = self.memgine {
2095 if let Some(prompt) =
2096 params.get("prompt").and_then(|v| v.as_str())
2097 {
2098 let ctx = {
2099 let mut m = memgine.lock().await;
2100 m.build_context(prompt)
2101 };
2102 if !ctx.is_empty() {
2103 let mut p = params.clone();
2104 if let Some(obj) = p.as_object_mut() {
2105 obj.insert("context".to_string(), Value::from(ctx));
2106 }
2107 p
2108 } else {
2109 params
2110 }
2111 } else {
2112 params
2113 }
2114 } else {
2115 params
2116 }
2117 } else {
2118 params
2119 }
2120 };
2121
2122 let effective_tool = if tool_name == "infer.grounded" {
2124 "infer"
2125 } else {
2126 tool_name
2127 };
2128 let result =
2129 car_inference::service::execute_tool(engine, effective_tool, ¶ms)
2130 .await
2131 .map_err(|e| e.to_string());
2132
2133 if let Ok(ref value) = result {
2134 self.result_cache
2135 .put(tool_name, ¶ms, value.clone())
2136 .await;
2137 }
2138
2139 return result;
2140 }
2141 }
2142
2143 if tool_name == "memory.consolidate" {
2145 if let Some(ref memgine) = self.memgine {
2146 let report = {
2147 let mut m = memgine.lock().await;
2148 m.consolidate().await
2149 };
2150 {
2152 let mut log = self.log.lock().await;
2153 log.append(
2154 EventKind::Consolidated,
2155 None,
2156 None,
2157 [
2158 (
2159 "expired_pruned".to_string(),
2160 Value::from(report.expired_pruned),
2161 ),
2162 (
2163 "superseded_gc".to_string(),
2164 Value::from(report.superseded_gc),
2165 ),
2166 (
2167 "stale_embeddings_removed".to_string(),
2168 Value::from(report.stale_embeddings_removed),
2169 ),
2170 (
2171 "nodes_embedded".to_string(),
2172 Value::from(report.nodes_embedded),
2173 ),
2174 (
2175 "domains_evolved".to_string(),
2176 Value::from(report.domains_evolved.clone()),
2177 ),
2178 ("total_nodes".to_string(), Value::from(report.total_nodes)),
2179 ("total_edges".to_string(), Value::from(report.total_edges)),
2180 ]
2181 .into(),
2182 );
2183 }
2184 return Ok(serde_json::to_value(&report).unwrap_or(Value::Null));
2185 } else {
2186 return Err(
2187 "memory.consolidate requires memgine (attach with with_learning)"
2188 .into(),
2189 );
2190 }
2191 }
2192
2193 let configured = {
2199 let guard = self.tool_executor.lock().await;
2200 guard.as_ref().cloned()
2201 };
2202
2203 if let Some(ref executor) = configured {
2204 let result = executor
2205 .execute_with_action(tool_name, ¶ms, &action.id)
2206 .await;
2207 let fall_through = matches!(&result, Err(e) if e.starts_with("unknown tool"));
2208 if !fall_through {
2209 if let Ok(ref value) = result {
2210 self.result_cache
2211 .put(tool_name, ¶ms, value.clone())
2212 .await;
2213 }
2214 return result;
2215 }
2216 }
2217
2218 if let Some(result) = crate::agent_basics::execute(tool_name, ¶ms).await {
2219 if let Ok(ref value) = result {
2220 self.result_cache
2221 .put(tool_name, ¶ms, value.clone())
2222 .await;
2223 }
2224 return result;
2225 }
2226
2227 Err(format!("no handler for tool '{}'", tool_name))
2228 }
2229 ActionType::StateWrite => {
2230 let key = action
2231 .parameters
2232 .get("key")
2233 .and_then(|v| v.as_str())
2234 .ok_or("state_write requires 'key' parameter")?;
2235 let value = action
2236 .parameters
2237 .get("value")
2238 .cloned()
2239 .unwrap_or(Value::Null);
2240 let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2241 self.state.scoped(tenant).set(key, value, &action.id);
2242 Ok(Value::from(format!("written: {}", key)))
2243 }
2244 ActionType::StateRead => {
2245 let key = action
2246 .parameters
2247 .get("key")
2248 .and_then(|v| v.as_str())
2249 .ok_or("state_read requires 'key' parameter")?;
2250 let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2251 Ok(self.state.scoped(tenant).get(key).unwrap_or(Value::Null))
2252 }
2253 ActionType::Assertion => {
2254 let key = action
2255 .parameters
2256 .get("key")
2257 .and_then(|v| v.as_str())
2258 .ok_or("assertion requires 'key' parameter")?;
2259 let expected = action
2260 .parameters
2261 .get("expected")
2262 .cloned()
2263 .unwrap_or(Value::Null);
2264 let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2265 let actual = self.state.scoped(tenant).get(key).unwrap_or(Value::Null);
2266 if actual != expected {
2267 Err(format!(
2268 "assertion failed: state['{}'] = {:?}, expected {:?}",
2269 key, actual, expected
2270 ))
2271 } else {
2272 Ok(serde_json::json!({"asserted": key, "value": actual}))
2273 }
2274 }
2275 }
2276 }
2277
2278 pub async fn save_checkpoint(&self) -> Checkpoint {
2282 let state = self.state.snapshot();
2283 let tools: Vec<String> = self.tools.read().await.keys().cloned().collect();
2284 let log = self.log.lock().await;
2285 let events: Vec<Value> = log
2286 .events()
2287 .iter()
2288 .map(|e| serde_json::to_value(e).unwrap_or_default())
2289 .collect();
2290
2291 Checkpoint {
2292 checkpoint_id: Uuid::new_v4().to_string(),
2293 created_at: chrono::Utc::now(),
2294 state,
2295 events,
2296 tools,
2297 metadata: HashMap::new(),
2298 }
2299 }
2300
2301 pub async fn save_checkpoint_to_file(&self, path: &str) -> Result<(), String> {
2303 let checkpoint = self.save_checkpoint().await;
2304 let json = serde_json::to_string_pretty(&checkpoint)
2305 .map_err(|e| format!("serialize error: {}", e))?;
2306 tokio::fs::write(path, json)
2307 .await
2308 .map_err(|e| format!("write error: {}", e))?;
2309 Ok(())
2310 }
2311
2312 pub async fn load_checkpoint_from_file(&self, path: &str) -> Result<Checkpoint, String> {
2314 let json = tokio::fs::read_to_string(path)
2315 .await
2316 .map_err(|e| format!("read error: {}", e))?;
2317 let checkpoint: Checkpoint =
2318 serde_json::from_str(&json).map_err(|e| format!("deserialize error: {}", e))?;
2319 self.restore_checkpoint(&checkpoint).await;
2320 Ok(checkpoint)
2321 }
2322
2323 pub async fn restore_checkpoint(&self, checkpoint: &Checkpoint) {
2325 self.state.replace_all(checkpoint.state.clone());
2327 self.idempotency_cache.lock().await.clear();
2330 let mut tools = self.tools.write().await;
2332 tools.clear();
2333 for tool_name in &checkpoint.tools {
2334 let schema = ToolSchema {
2335 name: tool_name.clone(),
2336 description: String::new(),
2337 parameters: serde_json::Value::Object(Default::default()),
2338 returns: None,
2339 idempotent: false,
2340 cache_ttl_secs: None,
2341 rate_limit: None,
2342 };
2343 tools.insert(tool_name.clone(), schema);
2344 }
2345 }
2346
2347 pub async fn register_subprocess_tool(
2352 &self,
2353 name: &str,
2354 tool: crate::subprocess::SubprocessTool,
2355 ) {
2356 use crate::subprocess::SubprocessToolExecutor;
2357
2358 let schema = ToolSchema {
2359 name: name.to_string(),
2360 description: format!("Subprocess tool: {}", tool.command),
2361 parameters: serde_json::Value::Object(Default::default()),
2362 returns: None,
2363 idempotent: false,
2364 cache_ttl_secs: None,
2365 rate_limit: None,
2366 };
2367 self.register_tool_schema(schema).await;
2368
2369 let mut guard = self.tool_executor.lock().await;
2370 let mut executor = match guard.take() {
2371 Some(existing) => {
2372 let mut sub = SubprocessToolExecutor::new();
2373 sub = sub.with_fallback(existing);
2374 sub
2375 }
2376 None => SubprocessToolExecutor::new(),
2377 };
2378 executor.register(name, tool);
2379 *guard = Some(std::sync::Arc::new(executor));
2380 }
2381}
2382
2383impl Default for Runtime {
2384 fn default() -> Self {
2385 Self::new()
2386 }
2387}