1use crate::cache::ResultCache;
4use crate::capabilities::CapabilitySet;
5use crate::checkpoint::Checkpoint;
6use crate::rate_limit::{RateLimit, RateLimiter};
7use car_eventlog::{EventKind, EventLog, SpanStatus};
8use car_ir::{
9 build_dag, Action, ActionProposal, ActionResult, ActionStatus, ActionType, CostSummary,
10 FailureBehavior, ProposalResult, ToolSchema,
11};
12use car_policy::PolicyEngine;
13use car_state::StateStore;
14use car_validator::validate_action;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
20use tokio::time::timeout;
21use tracing::instrument;
22use uuid::Uuid;
23
24const RETRY_BASE_DELAY_MS: u64 = 100;
26const RETRY_BACKOFF_FACTOR: u64 = 2;
27
28#[async_trait::async_trait]
36pub trait ReplanCallback: Send + Sync {
37 async fn replan(&self, ctx: &ReplanContext) -> Result<ActionProposal, String>;
38}
39
40#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42pub struct ReplanContext {
43 pub proposal_id: String,
45 pub attempt: u32,
47 pub failed_actions: Vec<FailedActionSummary>,
49 pub completed_action_ids: Vec<String>,
51 pub state_snapshot: HashMap<String, Value>,
53 pub replans_remaining: u32,
55 pub original_source: String,
57 pub original_action_count: usize,
59 pub original_context: HashMap<String, Value>,
61}
62
63#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
65pub struct FailedActionSummary {
66 pub action_id: String,
67 pub tool: Option<String>,
68 pub error: String,
69 pub parameters: HashMap<String, Value>,
70}
71
72#[derive(Debug, Clone)]
74pub struct ReplanConfig {
75 pub max_replans: u32,
77 pub delay_ms: u64,
80 pub verify_before_execute: bool,
84}
85
86impl Default for ReplanConfig {
87 fn default() -> Self {
88 Self {
89 max_replans: 0,
90 delay_ms: 0,
91 verify_before_execute: true,
92 }
93 }
94}
95
96#[async_trait::async_trait]
101pub trait ToolExecutor: Send + Sync {
102 async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String>;
103
104 async fn execute_with_action(
120 &self,
121 tool: &str,
122 params: &Value,
123 _action_id: &str,
124 _timeout_ms: Option<u64>,
125 ) -> Result<Value, String> {
126 self.execute(tool, params).await
127 }
128}
129
130fn idempotency_key(action: &Action) -> String {
132 let sorted: std::collections::BTreeMap<_, _> = action.parameters.iter().collect();
133 let params = serde_json::to_string(&sorted).unwrap_or_default();
134 format!(
135 "{}:{}:{}",
136 serde_json::to_string(&action.action_type).unwrap_or_default(),
137 action.tool.as_deref().unwrap_or(""),
138 params
139 )
140}
141
142fn rejected_result(action_id: &str, error: String) -> ActionResult {
143 ActionResult {
144 action_id: action_id.to_string(),
145 status: ActionStatus::Rejected,
146 output: None,
147 error: Some(error),
148 state_changes: HashMap::new(),
149 duration_ms: None,
150 timestamp: chrono::Utc::now(),
151 }
152}
153
154fn snapshot_relevant_keys(
158 state: &car_state::StateStore,
159 action: &Action,
160) -> HashMap<String, Value> {
161 let mut keys: std::collections::HashSet<&str> = std::collections::HashSet::new();
162 for dep in &action.state_dependencies {
163 keys.insert(dep.as_str());
164 }
165 for key in action.expected_effects.keys() {
166 keys.insert(key.as_str());
167 }
168 if action.action_type == ActionType::StateWrite {
170 if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
171 keys.insert(key);
172 }
173 }
174
175 if keys.is_empty() {
176 return HashMap::new();
178 }
179
180 keys.iter()
181 .filter_map(|&k| state.get(k).map(|v| (k.to_string(), v)))
182 .collect()
183}
184
185fn skipped_result(action_id: &str, reason: &str) -> ActionResult {
186 ActionResult {
187 action_id: action_id.to_string(),
188 status: ActionStatus::Skipped,
189 output: None,
190 error: Some(reason.to_string()),
191 state_changes: HashMap::new(),
192 duration_ms: None,
193 timestamp: chrono::Utc::now(),
194 }
195}
196
197pub const CANCELED_PREFIX: &str = "canceled: ";
204
205fn canceled_result(action_id: &str, reason: &str) -> ActionResult {
210 ActionResult {
211 action_id: action_id.to_string(),
212 status: ActionStatus::Skipped,
213 output: None,
214 error: Some(format!("{}{}", CANCELED_PREFIX, reason)),
215 state_changes: HashMap::new(),
216 duration_ms: None,
217 timestamp: chrono::Utc::now(),
218 }
219}
220
221pub fn format_tool_result(result: &ActionResult) -> String {
223 match result.status {
224 ActionStatus::Succeeded => match &result.output {
225 Some(v) => serde_json::to_string(v).unwrap_or_else(|_| v.to_string()),
226 None => String::new(),
227 },
228 ActionStatus::Rejected => format!("[REJECTED] {}", result.error.as_deref().unwrap_or("")),
229 ActionStatus::Failed => format!("[FAILED] {}", result.error.as_deref().unwrap_or("")),
230 _ => format!(
231 "[{:?}] {}",
232 result.status,
233 result.error.as_deref().unwrap_or("")
234 ),
235 }
236}
237
238#[derive(Debug, Clone)]
240pub struct CostBudget {
241 pub max_tool_calls: Option<u32>,
242 pub max_duration_ms: Option<f64>,
243 pub max_actions: Option<u32>,
244}
245
246pub struct Runtime {
260 pub state: Arc<StateStore>,
261 pub tools: Arc<TokioRwLock<HashMap<String, ToolSchema>>>,
262 pub policies: Arc<TokioRwLock<PolicyEngine>>,
263 pub session_policies: Arc<TokioRwLock<HashMap<String, Arc<TokioRwLock<PolicyEngine>>>>>,
274 pub log: Arc<TokioMutex<EventLog>>,
275 pub rate_limiter: Arc<RateLimiter>,
276 pub result_cache: Arc<ResultCache>,
277 tool_executor: TokioMutex<Option<Arc<dyn ToolExecutor>>>,
278 idempotency_cache: TokioMutex<HashMap<String, ActionResult>>,
279 cost_budget: TokioRwLock<Option<CostBudget>>,
280 capabilities: TokioRwLock<Option<CapabilitySet>>,
281 inference_engine: Option<Arc<car_inference::InferenceEngine>>,
282 memgine: Option<Arc<TokioMutex<car_memgine::MemgineEngine>>>,
284 auto_distill: bool,
286 trajectory_store: Option<Arc<car_memgine::TrajectoryStore>>,
288 replan_callback: TokioMutex<Option<Arc<dyn ReplanCallback>>>,
290 replan_config: TokioRwLock<ReplanConfig>,
292 pub registry: Arc<crate::registry::ToolRegistry>,
294}
295
296impl Runtime {
297 pub fn new() -> Self {
298 Self {
299 state: Arc::new(StateStore::new()),
300 tools: Arc::new(TokioRwLock::new(HashMap::new())),
301 policies: Arc::new(TokioRwLock::new(PolicyEngine::new())),
302 session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
303 log: Arc::new(TokioMutex::new(EventLog::new())),
304 rate_limiter: Arc::new(RateLimiter::new()),
305 result_cache: Arc::new(ResultCache::new()),
306 tool_executor: TokioMutex::new(None),
307 idempotency_cache: TokioMutex::new(HashMap::new()),
308 cost_budget: TokioRwLock::new(None),
309 capabilities: TokioRwLock::new(None),
310 inference_engine: None,
311 memgine: None,
312 auto_distill: false,
313 trajectory_store: None,
314 replan_callback: TokioMutex::new(None),
315 replan_config: TokioRwLock::new(ReplanConfig::default()),
316 registry: Arc::new(crate::registry::ToolRegistry::new()),
317 }
318 }
319
320 pub fn with_shared(
323 state: Arc<StateStore>,
324 log: Arc<TokioMutex<EventLog>>,
325 policies: Arc<TokioRwLock<PolicyEngine>>,
326 ) -> Self {
327 Self {
328 state,
329 tools: Arc::new(TokioRwLock::new(HashMap::new())),
330 policies,
331 session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
336 log,
337 rate_limiter: Arc::new(RateLimiter::new()),
338 result_cache: Arc::new(ResultCache::new()),
339 tool_executor: TokioMutex::new(None),
340 idempotency_cache: TokioMutex::new(HashMap::new()),
341 cost_budget: TokioRwLock::new(None),
342 capabilities: TokioRwLock::new(None),
343 inference_engine: None,
344 memgine: None,
345 auto_distill: false,
346 trajectory_store: None,
347 replan_callback: TokioMutex::new(None),
348 replan_config: TokioRwLock::new(ReplanConfig::default()),
349 registry: Arc::new(crate::registry::ToolRegistry::new()),
350 }
351 }
352
353 pub async fn open_session(&self) -> String {
371 let id = Uuid::new_v4().to_string();
372 let mut sessions = self.session_policies.write().await;
373 sessions.insert(id.clone(), Arc::new(TokioRwLock::new(PolicyEngine::new())));
374 id
375 }
376
377 pub async fn close_session(&self, session_id: &str) -> bool {
382 let mut sessions = self.session_policies.write().await;
383 sessions.remove(session_id).is_some()
384 }
385
386 pub async fn register_policy_in_session(
395 &self,
396 session_id: &str,
397 name: &str,
398 check: car_policy::PolicyCheck,
399 description: &str,
400 ) -> Result<(), String> {
401 let engine = {
402 let sessions = self.session_policies.read().await;
403 sessions
404 .get(session_id)
405 .cloned()
406 .ok_or_else(|| format!("unknown session id '{session_id}'"))?
407 };
408 let mut engine = engine.write().await;
409 engine.register(name, check, description);
410 Ok(())
411 }
412
413 pub async fn session_exists(&self, session_id: &str) -> bool {
417 self.session_policies.read().await.contains_key(session_id)
418 }
419
420 pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
423 self.inference_engine = Some(engine);
424 if let Ok(mut tools) = self.tools.try_write() {
426 for schema in car_inference::service::all_schemas() {
427 tools.insert(schema.name.clone(), schema);
428 }
429 }
430 self
431 }
432
433 pub fn with_learning(
437 mut self,
438 memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>,
439 auto_distill: bool,
440 ) -> Self {
441 self.memgine = Some(memgine);
442 self.auto_distill = auto_distill;
443 self
444 }
445
446 pub fn with_memgine(self, memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>) -> Self {
448 self.with_learning(memgine, true)
449 }
450
451 pub fn with_trajectory_store(mut self, store: Arc<car_memgine::TrajectoryStore>) -> Self {
453 self.trajectory_store = Some(store);
454 self
455 }
456
457 pub fn with_executor(self, executor: Arc<dyn ToolExecutor>) -> Self {
458 if let Ok(mut guard) = self.tool_executor.try_lock() {
460 *guard = Some(executor);
461 }
462 self
463 }
464
465 pub async fn set_executor(&self, executor: Arc<dyn ToolExecutor>) {
468 *self.tool_executor.lock().await = Some(executor);
469 }
470
471 pub fn with_event_log(mut self, log: EventLog) -> Self {
472 self.log = Arc::new(TokioMutex::new(log));
473 self
474 }
475
476 pub fn with_replan(self, callback: Arc<dyn ReplanCallback>, config: ReplanConfig) -> Self {
478 if let Ok(mut guard) = self.replan_callback.try_lock() {
479 *guard = Some(callback);
480 }
481 if let Ok(mut guard) = self.replan_config.try_write() {
482 *guard = config;
483 }
484 self
485 }
486
487 pub async fn set_replan_callback(&self, callback: Arc<dyn ReplanCallback>) {
489 *self.replan_callback.lock().await = Some(callback);
490 }
491
492 pub async fn set_replan_config(&self, config: ReplanConfig) {
494 *self.replan_config.write().await = config;
495 }
496
497 pub async fn register_tool(&self, name: &str) {
499 let schema = ToolSchema {
500 name: name.to_string(),
501 description: String::new(),
502 parameters: serde_json::Value::Object(Default::default()),
503 returns: None,
504 idempotent: false,
505 cache_ttl_secs: None,
506 rate_limit: None,
507 };
508 self.register_tool_schema(schema).await;
509 }
510
511 pub async fn register_tool_schema(&self, schema: ToolSchema) {
513 if let Some(ttl) = schema.cache_ttl_secs {
515 self.result_cache.enable_caching(&schema.name, ttl).await;
516 }
517 if let Some(ref rl) = schema.rate_limit {
519 self.rate_limiter
520 .set_limit(
521 &schema.name,
522 RateLimit {
523 max_calls: rl.max_calls,
524 interval_secs: rl.interval_secs,
525 },
526 )
527 .await;
528 }
529 self.tools.write().await.insert(schema.name.clone(), schema);
530 }
531
532 pub async fn register_tool_entry(&self, entry: crate::registry::ToolEntry) {
536 let schema = entry.schema.clone();
537 self.registry.register(entry).await;
538 self.register_tool_schema(schema).await;
539 }
540
541 pub async fn register_agent_basics(&self) {
546 for entry in crate::agent_basics::entries() {
547 self.register_tool_entry(entry).await;
548 }
549 }
550
551 pub async fn tool_schemas(&self) -> Vec<ToolSchema> {
553 self.tools.read().await.values().cloned().collect()
554 }
555
556 pub async fn set_cost_budget(&self, budget: CostBudget) {
558 *self.cost_budget.write().await = Some(budget);
559 }
560
561 pub async fn set_capabilities(&self, caps: CapabilitySet) {
563 *self.capabilities.write().await = Some(caps);
564 }
565
566 pub async fn set_rate_limit(&self, tool: &str, max_calls: u32, interval_secs: f64) {
572 self.rate_limiter
573 .set_limit(
574 tool,
575 RateLimit {
576 max_calls,
577 interval_secs,
578 },
579 )
580 .await;
581 }
582
583 pub async fn enable_tool_cache(&self, tool: &str, ttl_secs: u64) {
585 self.result_cache.enable_caching(tool, ttl_secs).await;
586 }
587
588 #[instrument(
598 name = "proposal.execute",
599 skip_all,
600 fields(
601 proposal_id = %proposal.id,
602 action_count = proposal.actions.len(),
603 )
604 )]
605 pub async fn execute(&self, proposal: &ActionProposal) -> ProposalResult {
606 let token = tokio_util::sync::CancellationToken::new();
609 self.execute_with_cancel(proposal, &token).await
610 }
611
612 pub async fn execute_with_session(
624 &self,
625 proposal: &ActionProposal,
626 session_id: &str,
627 ) -> ProposalResult {
628 let token = tokio_util::sync::CancellationToken::new();
629 self.execute_with_session_and_cancel(proposal, session_id, &token)
630 .await
631 }
632
633 pub async fn execute_with_session_and_cancel(
637 &self,
638 proposal: &ActionProposal,
639 session_id: &str,
640 cancel: &tokio_util::sync::CancellationToken,
641 ) -> ProposalResult {
642 self.execute_with_optional_session(proposal, Some(session_id), None, cancel)
643 .await
644 }
645
646 pub async fn execute_with_cancel(
674 &self,
675 proposal: &ActionProposal,
676 cancel: &tokio_util::sync::CancellationToken,
677 ) -> ProposalResult {
678 self.execute_with_optional_session(proposal, None, None, cancel)
679 .await
680 }
681
682 pub async fn execute_scoped(
699 &self,
700 proposal: &ActionProposal,
701 scope: &crate::scope::RuntimeScope,
702 ) -> ProposalResult {
703 let token = tokio_util::sync::CancellationToken::new();
704 self.execute_scoped_with_cancel(proposal, scope, &token)
705 .await
706 }
707
708 pub async fn execute_scoped_with_cancel(
713 &self,
714 proposal: &ActionProposal,
715 scope: &crate::scope::RuntimeScope,
716 cancel: &tokio_util::sync::CancellationToken,
717 ) -> ProposalResult {
718 self.execute_with_optional_session(proposal, None, Some(scope), cancel)
719 .await
720 }
721
722 async fn execute_with_optional_session(
728 &self,
729 proposal: &ActionProposal,
730 session_id: Option<&str>,
731 scope: Option<&crate::scope::RuntimeScope>,
732 cancel: &tokio_util::sync::CancellationToken,
733 ) -> ProposalResult {
734 let config = self.replan_config.read().await.clone();
735 let mut current_proposal = proposal.clone();
736 let mut attempt: u32 = 0;
737
738 if let Some(s) = scope {
744 if !s.is_unscoped() {
745 let mut props: HashMap<String, Value> = HashMap::new();
746 if let Some(cid) = &s.caller_id {
747 props.insert("caller_id".to_string(), Value::from(cid.as_str()));
748 }
749 if let Some(tid) = &s.tenant_id {
750 props.insert("tenant_id".to_string(), Value::from(tid.as_str()));
751 }
752 if !s.claims.is_empty() {
753 if let Ok(claims_json) = serde_json::to_value(&s.claims) {
754 props.insert("claims".to_string(), claims_json);
755 }
756 }
757 let mut log = self.log.lock().await;
758 log.append(EventKind::SessionScope, None, Some(&proposal.id), props);
759 }
760 }
761
762 loop {
763 let (result, state_before_map) = self
764 .execute_inner_with_cancel(¤t_proposal, Some(cancel), session_id, scope)
765 .await;
766
767 let aborted = result
769 .results
770 .iter()
771 .any(|r| r.status == ActionStatus::Failed);
772 if !aborted || attempt >= config.max_replans {
773 if aborted && attempt > 0 {
774 let mut log = self.log.lock().await;
776 log.append(
777 EventKind::ReplanExhausted,
778 None,
779 Some(&proposal.id),
780 [("attempts".to_string(), Value::from(attempt))].into(),
781 );
782 }
783
784 let outcome = if !aborted {
786 if attempt > 0 {
787 car_memgine::TrajectoryOutcome::ReplanSuccess
788 } else {
789 car_memgine::TrajectoryOutcome::Success
790 }
791 } else if attempt > 0 {
792 car_memgine::TrajectoryOutcome::ReplanExhausted
793 } else {
794 car_memgine::TrajectoryOutcome::Failed
795 };
796 if let Some(err) = self.persist_trajectory(
797 proposal,
798 ¤t_proposal,
799 &result,
800 outcome,
801 attempt,
802 &state_before_map,
803 ) {
804 let mut log = self.log.lock().await;
805 log.append(
806 EventKind::ActionFailed,
807 None,
808 Some(&proposal.id),
809 [(
810 "trajectory_persist_error".to_string(),
811 Value::from(err.as_str()),
812 )]
813 .into(),
814 );
815 }
816
817 return result;
818 }
819
820 let callback = {
822 let guard = self.replan_callback.lock().await;
823 guard.clone()
824 };
825 let Some(callback) = callback else {
826 if let Some(err) = self.persist_trajectory(
828 proposal,
829 ¤t_proposal,
830 &result,
831 car_memgine::TrajectoryOutcome::Failed,
832 attempt,
833 &state_before_map,
834 ) {
835 let mut log = self.log.lock().await;
836 log.append(
837 EventKind::ActionFailed,
838 None,
839 Some(&proposal.id),
840 [(
841 "trajectory_persist_error".to_string(),
842 Value::from(err.as_str()),
843 )]
844 .into(),
845 );
846 }
847 return result;
848 };
849
850 let failed_actions: Vec<FailedActionSummary> = result
852 .results
853 .iter()
854 .filter(|r| r.status == ActionStatus::Failed)
855 .map(|r| {
856 let action = current_proposal
857 .actions
858 .iter()
859 .find(|a| a.id == r.action_id);
860 FailedActionSummary {
861 action_id: r.action_id.clone(),
862 tool: action.and_then(|a| a.tool.clone()),
863 error: r.error.clone().unwrap_or_default(),
864 parameters: action.map(|a| a.parameters.clone()).unwrap_or_default(),
865 }
866 })
867 .collect();
868
869 let completed_action_ids: Vec<String> = result
870 .results
871 .iter()
872 .filter(|r| r.status == ActionStatus::Succeeded)
873 .map(|r| r.action_id.clone())
874 .collect();
875
876 let ctx = ReplanContext {
877 proposal_id: proposal.id.clone(),
878 attempt: attempt + 1,
879 failed_actions,
880 completed_action_ids,
881 state_snapshot: self.state.snapshot(),
882 replans_remaining: config.max_replans.saturating_sub(attempt + 1),
883 original_source: proposal.source.clone(),
884 original_action_count: proposal.actions.len(),
885 original_context: proposal.context.clone(),
886 };
887
888 if config.delay_ms > 0 {
890 tokio::time::sleep(Duration::from_millis(config.delay_ms)).await;
891 }
892
893 {
895 let mut log = self.log.lock().await;
896 log.append(
897 EventKind::ReplanAttempted,
898 None,
899 Some(&proposal.id),
900 [
901 ("attempt".to_string(), Value::from(attempt + 1)),
902 (
903 "failed_count".to_string(),
904 Value::from(ctx.failed_actions.len()),
905 ),
906 ]
907 .into(),
908 );
909 }
910
911 match callback.replan(&ctx).await {
913 Ok(new_proposal) => {
914 if config.verify_before_execute {
916 let current_state = self.state.snapshot();
917 let tools_guard = self.tools.read().await;
924 let vr = car_verify::verify_with_schemas(
925 &new_proposal,
926 Some(¤t_state),
927 Some(&tools_guard),
928 100,
929 );
930 drop(tools_guard);
931 if !vr.valid {
932 let error_msgs: Vec<String> = vr
933 .issues
934 .iter()
935 .filter(|i| i.severity == "error")
936 .map(|i| i.message.clone())
937 .collect();
938 let mut log = self.log.lock().await;
939 log.append(
940 EventKind::ReplanRejected,
941 None,
942 Some(&proposal.id),
943 [
944 ("errors".to_string(), Value::from(error_msgs.join("; "))),
945 ("attempt".to_string(), Value::from(attempt + 1)),
946 ]
947 .into(),
948 );
949 attempt += 1;
951 continue;
952 }
953 }
954
955 {
957 let mut log = self.log.lock().await;
958 log.append(
959 EventKind::ReplanProposalReceived,
960 None,
961 Some(&proposal.id),
962 [
963 ("attempt".to_string(), Value::from(attempt + 1)),
964 (
965 "new_action_count".to_string(),
966 Value::from(new_proposal.actions.len()),
967 ),
968 ]
969 .into(),
970 );
971 }
972 current_proposal = new_proposal;
973 attempt += 1;
974 }
975 Err(e) => {
976 let mut log = self.log.lock().await;
978 log.append(
979 EventKind::ReplanExhausted,
980 None,
981 Some(&proposal.id),
982 [
983 ("reason".to_string(), Value::from("callback_error")),
984 ("error".to_string(), Value::from(e.as_str())),
985 ("attempt".to_string(), Value::from(attempt + 1)),
986 ]
987 .into(),
988 );
989 if let Some(err) = self.persist_trajectory(
990 proposal,
991 ¤t_proposal,
992 &result,
993 car_memgine::TrajectoryOutcome::Failed,
994 attempt,
995 &state_before_map,
996 ) {
997 log.append(
998 EventKind::ActionFailed,
999 None,
1000 Some(&proposal.id),
1001 [(
1002 "trajectory_persist_error".to_string(),
1003 Value::from(err.as_str()),
1004 )]
1005 .into(),
1006 );
1007 }
1008 return result;
1009 }
1010 }
1011 }
1012 }
1013
1014 fn persist_trajectory(
1016 &self,
1017 proposal: &ActionProposal,
1018 current_proposal: &ActionProposal,
1019 result: &ProposalResult,
1020 outcome: car_memgine::TrajectoryOutcome,
1021 attempt: u32,
1022 state_before_map: &HashMap<String, HashMap<String, Value>>,
1023 ) -> Option<String> {
1024 let store = self.trajectory_store.as_ref()?;
1025
1026 let trace_events: Vec<car_memgine::TraceEvent> = result
1027 .results
1028 .iter()
1029 .map(|r| {
1030 let kind = match r.status {
1031 ActionStatus::Succeeded => "action_succeeded",
1032 ActionStatus::Failed => "action_failed",
1033 ActionStatus::Rejected => "action_rejected",
1034 ActionStatus::Skipped => "action_skipped",
1035 _ => "unknown",
1036 };
1037 let tool = current_proposal
1038 .actions
1039 .iter()
1040 .find(|a| a.id == r.action_id)
1041 .and_then(|a| a.tool.clone());
1042 let reward = match r.status {
1043 ActionStatus::Succeeded => Some(1.0),
1044 ActionStatus::Failed => Some(0.0),
1045 ActionStatus::Rejected => Some(0.0),
1046 ActionStatus::Skipped => None,
1047 _ => None,
1048 };
1049 car_memgine::TraceEvent {
1050 kind: kind.to_string(),
1051 action_id: Some(r.action_id.clone()),
1052 tool,
1053 data: r
1054 .error
1055 .as_ref()
1056 .map(|e| serde_json::json!({"error": e}))
1057 .unwrap_or(serde_json::json!({})),
1058 duration_ms: r.duration_ms,
1059 state_before: state_before_map.get(&r.action_id).cloned(),
1060 state_after: if !r.state_changes.is_empty() {
1061 Some(r.state_changes.clone())
1062 } else {
1063 None
1064 },
1065 reward,
1066 }
1067 })
1068 .collect();
1069
1070 let trajectory = car_memgine::Trajectory {
1071 proposal_id: proposal.id.clone(),
1072 source: proposal.source.clone(),
1073 action_count: current_proposal.actions.len(),
1074 events: trace_events,
1075 outcome,
1076 timestamp: chrono::Utc::now(),
1077 duration_ms: result.cost.total_duration_ms,
1078 replan_attempts: attempt,
1079 };
1080
1081 match store.append(&trajectory) {
1082 Ok(()) => None,
1083 Err(e) => Some(e.to_string()),
1084 }
1085 }
1086
1087 pub async fn plan_and_execute(
1093 &self,
1094 candidates: &[ActionProposal],
1095 planner_config: Option<car_planner::PlannerConfig>,
1096 feedback: Option<&car_planner::ToolFeedback>,
1097 ) -> ProposalResult {
1098 if candidates.is_empty() {
1099 return ProposalResult {
1100 proposal_id: "empty".to_string(),
1101 results: vec![],
1102 cost: car_ir::CostSummary::default(),
1103 };
1104 }
1105
1106 let planner = car_planner::Planner::new(planner_config.unwrap_or_default());
1108 let tools_guard = self.tools.read().await;
1109 let tool_names: std::collections::HashSet<String> = tools_guard.keys().cloned().collect();
1110 drop(tools_guard);
1111
1112 let pre_plan_snapshot = self.state.snapshot();
1113 let pre_plan_transitions = self.state.transition_count();
1114 let ranked = planner.rank_with_feedback(
1115 candidates,
1116 Some(&pre_plan_snapshot),
1117 Some(&tool_names),
1118 feedback,
1119 );
1120
1121 let mut first_failure: Option<ProposalResult> = None;
1123 for scored in &ranked {
1124 if !scored.valid {
1125 continue;
1126 }
1127
1128 self.state
1131 .restore(pre_plan_snapshot.clone(), pre_plan_transitions);
1132
1133 let proposal = &candidates[scored.index];
1134 let result = self.execute(proposal).await;
1135
1136 if result.all_succeeded() {
1137 return result;
1138 }
1139
1140 tracing::info!(
1141 proposal_id = %proposal.id,
1142 score = scored.score,
1143 "plan_and_execute: proposal failed, trying next candidate"
1144 );
1145
1146 if first_failure.is_none() {
1147 first_failure = Some(result);
1148 }
1149 }
1150
1151 first_failure.unwrap_or_else(|| ProposalResult {
1153 proposal_id: candidates[0].id.clone(),
1154 results: vec![],
1155 cost: car_ir::CostSummary::default(),
1156 })
1157 }
1158
1159 async fn execute_inner_with_cancel(
1167 &self,
1168 proposal: &ActionProposal,
1169 cancel: Option<&tokio_util::sync::CancellationToken>,
1170 session_id: Option<&str>,
1171 scope: Option<&crate::scope::RuntimeScope>,
1172 ) -> (ProposalResult, HashMap<String, HashMap<String, Value>>) {
1173 let trace_id = Uuid::new_v4().to_string();
1175
1176 let root_span_id = {
1178 let mut log = self.log.lock().await;
1179 log.begin_span(
1180 "proposal.execute",
1181 &trace_id,
1182 None,
1183 [("proposal_id".to_string(), Value::from(proposal.id.as_str()))].into(),
1184 )
1185 };
1186
1187 {
1189 let mut log = self.log.lock().await;
1190 log.append(
1191 EventKind::ProposalReceived,
1192 None,
1193 Some(&proposal.id),
1194 [
1195 ("source".to_string(), Value::from(proposal.source.as_str())),
1196 (
1197 "action_count".to_string(),
1198 Value::from(proposal.actions.len()),
1199 ),
1200 ]
1201 .into(),
1202 );
1203 }
1204
1205 {
1207 let caps = self.capabilities.read().await;
1208 if let Some(ref cap) = *caps {
1209 if !cap.actions_within_budget(proposal.actions.len() as u32) {
1210 let mut action_results = Vec::new();
1211 for action in &proposal.actions {
1212 action_results.push(rejected_result(
1213 &action.id,
1214 format!(
1215 "capability denied: proposal has {} actions, max allowed is {:?}",
1216 proposal.actions.len(),
1217 cap.max_actions
1218 ),
1219 ));
1220 }
1221 return (
1222 ProposalResult {
1223 proposal_id: proposal.id.clone(),
1224 results: action_results,
1225 cost: CostSummary::default(),
1226 },
1227 HashMap::new(),
1228 );
1229 }
1230 }
1231 }
1232
1233 let snapshot = self.state.snapshot();
1235 let transition_count = self.state.transition_count();
1236
1237 let mut results: Vec<ActionResult> = Vec::new();
1238 let mut state_before_map: HashMap<String, HashMap<String, Value>> = HashMap::new();
1240 let mut aborted = false;
1241 let mut budget_exceeded = false;
1242 let mut total_retries: u32 = 0;
1243
1244 let mut running_tool_calls: u32 = 0;
1246 let mut running_actions: u32 = 0;
1247 let mut running_duration_ms: f64 = 0.0;
1248
1249 let budget = self.cost_budget.read().await.clone();
1251
1252 let levels = build_dag(&proposal.actions);
1254
1255 let mut canceled = false;
1256 for level in &levels {
1257 if !canceled {
1263 if let Some(token) = cancel {
1264 if token.is_cancelled() {
1265 canceled = true;
1266 }
1267 }
1268 }
1269 if canceled {
1270 for &idx in level {
1271 results.push(canceled_result(
1272 &proposal.actions[idx].id,
1273 "cancellation requested by caller",
1274 ));
1275 }
1276 continue;
1277 }
1278 if aborted || budget_exceeded {
1279 let skip_reason = if budget_exceeded {
1280 "cost budget exceeded"
1281 } else {
1282 "skipped due to earlier abort"
1283 };
1284 for &idx in level {
1285 results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1286 }
1287 continue;
1288 }
1289
1290 let has_abort = level
1292 .iter()
1293 .any(|&i| proposal.actions[i].failure_behavior == FailureBehavior::Abort);
1294
1295 if level.len() == 1 || has_abort {
1296 for &idx in level {
1298 if aborted || budget_exceeded {
1299 let skip_reason = if budget_exceeded {
1300 "cost budget exceeded"
1301 } else {
1302 "skipped due to abort"
1303 };
1304 results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1305 continue;
1306 }
1307
1308 if let Some(ref b) = budget {
1310 if let Some(max) = b.max_actions {
1311 if running_actions >= max {
1312 budget_exceeded = true;
1313 results.push(skipped_result(
1314 &proposal.actions[idx].id,
1315 "cost budget exceeded",
1316 ));
1317 continue;
1318 }
1319 }
1320 if let Some(max) = b.max_tool_calls {
1321 if proposal.actions[idx].action_type == ActionType::ToolCall
1322 && running_tool_calls >= max
1323 {
1324 budget_exceeded = true;
1325 results.push(skipped_result(
1326 &proposal.actions[idx].id,
1327 "cost budget exceeded",
1328 ));
1329 continue;
1330 }
1331 }
1332 if let Some(max) = b.max_duration_ms {
1333 if running_duration_ms >= max {
1334 budget_exceeded = true;
1335 results.push(skipped_result(
1336 &proposal.actions[idx].id,
1337 "cost budget exceeded",
1338 ));
1339 continue;
1340 }
1341 }
1342 }
1343
1344 state_before_map.insert(
1345 proposal.actions[idx].id.clone(),
1346 snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1347 );
1348 let (ar, action_retries) = self
1349 .process_action(
1350 &proposal.actions[idx],
1351 &proposal.id,
1352 &trace_id,
1353 &root_span_id,
1354 session_id,
1355 scope,
1356 )
1357 .await;
1358 total_retries += action_retries;
1359
1360 if ar.status == ActionStatus::Succeeded
1362 && proposal.actions[idx].action_type == ActionType::ToolCall
1363 {
1364 running_tool_calls += 1;
1365 }
1366 if ar.status != ActionStatus::Skipped {
1367 running_actions += 1;
1368 }
1369 if let Some(d) = ar.duration_ms {
1370 running_duration_ms += d;
1371 }
1372
1373 if ar.status == ActionStatus::Failed
1374 && proposal.actions[idx].failure_behavior == FailureBehavior::Abort
1375 {
1376 aborted = true;
1377 }
1378 results.push(ar);
1379 }
1380 } else {
1381 for &idx in level {
1384 state_before_map.insert(
1385 proposal.actions[idx].id.clone(),
1386 snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1387 );
1388 }
1389 let futs: Vec<_> = level
1390 .iter()
1391 .map(|&idx| {
1392 self.process_action(
1393 &proposal.actions[idx],
1394 &proposal.id,
1395 &trace_id,
1396 &root_span_id,
1397 session_id,
1398 scope,
1399 )
1400 })
1401 .collect();
1402 let level_results = futures::future::join_all(futs).await;
1403
1404 for (i, (ar, action_retries)) in level_results.into_iter().enumerate() {
1405 let idx = level[i];
1406 total_retries += action_retries;
1407 if ar.status == ActionStatus::Succeeded
1408 && proposal.actions[idx].action_type == ActionType::ToolCall
1409 {
1410 running_tool_calls += 1;
1411 }
1412 if ar.status != ActionStatus::Skipped {
1413 running_actions += 1;
1414 }
1415 if let Some(d) = ar.duration_ms {
1416 running_duration_ms += d;
1417 }
1418 results.push(ar);
1419 }
1420 }
1421 }
1422
1423 if aborted {
1425 self.state.restore(snapshot.clone(), transition_count);
1426
1427 let mut log = self.log.lock().await;
1428 log.append(
1429 EventKind::StateSnapshot,
1430 None,
1431 Some(&proposal.id),
1432 [(
1433 "state".to_string(),
1434 serde_json::to_value(&snapshot).unwrap_or_default(),
1435 )]
1436 .into(),
1437 );
1438 log.append(
1439 EventKind::StateRollback,
1440 None,
1441 Some(&proposal.id),
1442 [(
1443 "rolled_back_to".to_string(),
1444 Value::from("pre-proposal snapshot"),
1445 )]
1446 .into(),
1447 );
1448
1449 let mut cache = self.idempotency_cache.lock().await;
1451 for r in &results {
1452 if r.status == ActionStatus::Succeeded {
1453 for action in &proposal.actions {
1454 if action.id == r.action_id && action.idempotent {
1455 cache.remove(&idempotency_key(action));
1456 }
1457 }
1458 }
1459 }
1460 }
1461
1462 let action_order: HashMap<String, usize> = proposal
1464 .actions
1465 .iter()
1466 .enumerate()
1467 .map(|(i, a)| (a.id.clone(), i))
1468 .collect();
1469 results.sort_by_key(|r| {
1470 action_order
1471 .get(&r.action_id)
1472 .copied()
1473 .unwrap_or(usize::MAX)
1474 });
1475
1476 let mut cost = CostSummary::default();
1478 for r in &results {
1479 let action = action_order
1480 .get(&r.action_id)
1481 .and_then(|&i| proposal.actions.get(i));
1482 match r.status {
1483 ActionStatus::Succeeded => {
1484 cost.actions_executed += 1;
1485 if let Some(a) = action {
1486 if a.action_type == ActionType::ToolCall {
1487 cost.tool_calls += 1;
1488 }
1489 }
1490 }
1491 ActionStatus::Failed | ActionStatus::Rejected => {
1492 cost.actions_executed += 1;
1493 }
1494 ActionStatus::Skipped => {
1495 cost.actions_skipped += 1;
1496 }
1497 _ => {}
1498 }
1499 if let Some(d) = r.duration_ms {
1500 cost.total_duration_ms += d;
1501 }
1502 }
1503
1504 cost.retries = total_retries;
1506
1507 {
1509 let span_status = if aborted {
1510 SpanStatus::Error
1511 } else {
1512 SpanStatus::Ok
1513 };
1514 let mut log = self.log.lock().await;
1515 log.end_span(&root_span_id, span_status);
1516 }
1517
1518 let proposal_result = ProposalResult {
1519 proposal_id: proposal.id.clone(),
1520 results,
1521 cost,
1522 };
1523
1524 if self.auto_distill {
1526 if let Some(ref memgine) = self.memgine {
1527 let trace_events: Vec<car_memgine::TraceEvent> = proposal_result
1529 .results
1530 .iter()
1531 .map(|r| {
1532 let kind = match r.status {
1533 ActionStatus::Succeeded => "action_succeeded",
1534 ActionStatus::Failed => "action_failed",
1535 ActionStatus::Rejected => "action_rejected",
1536 ActionStatus::Skipped => "action_skipped",
1537 _ => "unknown",
1538 };
1539 let tool = proposal
1541 .actions
1542 .iter()
1543 .find(|a| a.id == r.action_id)
1544 .and_then(|a| a.tool.clone());
1545 let mut data = serde_json::Map::new();
1546 if let Some(ref e) = r.error {
1547 data.insert("error".into(), Value::from(e.as_str()));
1548 }
1549 if let Some(ref o) = r.output {
1550 data.insert("output".into(), o.clone());
1551 }
1552 car_memgine::TraceEvent {
1553 kind: kind.to_string(),
1554 action_id: Some(r.action_id.clone()),
1555 tool,
1556 data: Value::Object(data),
1557 duration_ms: r.duration_ms,
1558 reward: match r.status {
1559 ActionStatus::Succeeded => Some(1.0),
1560 ActionStatus::Failed | ActionStatus::Rejected => Some(0.0),
1561 _ => None,
1562 },
1563 ..Default::default()
1564 }
1565 })
1566 .collect();
1567
1568 let mut engine = memgine.lock().await;
1569 let skills = engine.distill_skills(&trace_events).await;
1570 if !skills.is_empty() {
1571 let count = skills.len();
1572 let tenant = scope.and_then(|s| s.tenant_id.as_deref());
1579 let provisional = engine.ingest_provisional_candidates(&skills, tenant);
1580
1581 let mut log = self.log.lock().await;
1583 log.append(
1584 EventKind::SkillDistilled,
1585 None,
1586 Some(&proposal_result.proposal_id),
1587 [
1588 ("skills_count".to_string(), Value::from(count)),
1589 (
1590 "provisional_ingested".to_string(),
1591 Value::from(provisional),
1592 ),
1593 (
1594 "skill_names".to_string(),
1595 Value::from(
1596 skills.iter().map(|s| s.name.as_str()).collect::<Vec<_>>(),
1597 ),
1598 ),
1599 ]
1600 .into(),
1601 );
1602
1603 let threshold = engine.evolution_threshold();
1605 let domains = engine.domains_needing_evolution(threshold);
1606 for domain in &domains {
1607 let failed: Vec<car_memgine::TraceEvent> = trace_events
1609 .iter()
1610 .filter(|e| {
1611 matches!(e.kind.as_str(), "action_failed" | "action_rejected")
1612 })
1613 .cloned()
1614 .collect();
1615 if !failed.is_empty() {
1616 let evolved = engine.evolve_skills(&failed, domain).await;
1617 if !evolved.is_empty() {
1618 log.append(
1619 EventKind::EvolutionTriggered,
1620 None,
1621 Some(&proposal_result.proposal_id),
1622 [
1623 ("domain".to_string(), Value::from(domain.as_str())),
1624 ("new_skills".to_string(), Value::from(evolved.len())),
1625 ]
1626 .into(),
1627 );
1628 }
1629 }
1630 }
1631 }
1632 }
1633 }
1634
1635 (proposal_result, state_before_map)
1636 }
1637
1638 async fn process_action(
1641 &self,
1642 action: &Action,
1643 proposal_id: &str,
1644 trace_id: &str,
1645 parent_span_id: &str,
1646 session_id: Option<&str>,
1647 scope: Option<&crate::scope::RuntimeScope>,
1648 ) -> (ActionResult, u32) {
1649 let action_type_name = serde_json::to_string(&action.action_type)
1651 .unwrap_or_default()
1652 .trim_matches('"')
1653 .to_string();
1654 let span_name = format!("action.{}", action_type_name);
1655
1656 let action_span_id = {
1658 let mut attrs: HashMap<String, Value> = HashMap::new();
1659 attrs.insert("action_id".to_string(), Value::from(action.id.as_str()));
1660 if let Some(ref tool) = action.tool {
1661 attrs.insert("tool".to_string(), Value::from(tool.as_str()));
1662 }
1663 let mut log = self.log.lock().await;
1664 log.begin_span(&span_name, trace_id, Some(parent_span_id), attrs)
1665 };
1666
1667 let (result, retries) = self
1669 .process_action_inner(action, proposal_id, session_id, scope)
1670 .await;
1671
1672 let span_status = match result.status {
1674 ActionStatus::Succeeded => SpanStatus::Ok,
1675 ActionStatus::Failed | ActionStatus::Rejected => SpanStatus::Error,
1676 _ => SpanStatus::Unset,
1677 };
1678 {
1679 let mut log = self.log.lock().await;
1680 log.end_span(&action_span_id, span_status);
1681 }
1682
1683 (result, retries)
1684 }
1685
1686 #[instrument(
1689 name = "action.process",
1690 skip_all,
1691 fields(
1692 action_id = %action.id,
1693 action_type = ?action.action_type,
1694 tool = action.tool.as_deref().unwrap_or("none"),
1695 )
1696 )]
1697 async fn process_action_inner(
1698 &self,
1699 action: &Action,
1700 proposal_id: &str,
1701 session_id: Option<&str>,
1702 scope: Option<&crate::scope::RuntimeScope>,
1703 ) -> (ActionResult, u32) {
1704 if action.idempotent {
1706 let key = idempotency_key(action);
1707 let cache = self.idempotency_cache.lock().await;
1708 if let Some(cached) = cache.get(&key) {
1709 let mut log = self.log.lock().await;
1710 log.append(
1711 EventKind::ActionDeduplicated,
1712 Some(&action.id),
1713 Some(proposal_id),
1714 [(
1715 "cached_action_id".to_string(),
1716 Value::from(cached.action_id.as_str()),
1717 )]
1718 .into(),
1719 );
1720 return (
1721 ActionResult {
1722 action_id: action.id.clone(),
1723 status: cached.status.clone(),
1724 output: cached.output.clone(),
1725 error: cached.error.clone(),
1726 state_changes: cached.state_changes.clone(),
1727 duration_ms: Some(0.0),
1728 timestamp: chrono::Utc::now(),
1729 },
1730 0,
1731 );
1732 }
1733 }
1734
1735 {
1737 let caps = self.capabilities.read().await;
1738 if let Some(ref cap) = *caps {
1739 if action.action_type == ActionType::ToolCall {
1741 if let Some(ref tool_name) = action.tool {
1742 if !cap.tool_allowed(tool_name) {
1743 let mut log = self.log.lock().await;
1744 log.append(
1745 EventKind::ActionRejected,
1746 Some(&action.id),
1747 Some(proposal_id),
1748 HashMap::new(),
1749 );
1750 return (
1751 rejected_result(
1752 &action.id,
1753 format!("capability denied: tool '{}' not allowed", tool_name),
1754 ),
1755 0,
1756 );
1757 }
1758 }
1759 }
1760
1761 if action.action_type == ActionType::StateWrite
1763 || action.action_type == ActionType::StateRead
1764 {
1765 if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
1766 if !cap.state_key_allowed(key) {
1767 let mut log = self.log.lock().await;
1768 log.append(
1769 EventKind::ActionRejected,
1770 Some(&action.id),
1771 Some(proposal_id),
1772 HashMap::new(),
1773 );
1774 return (
1775 rejected_result(
1776 &action.id,
1777 format!("capability denied: state key '{}' not allowed", key),
1778 ),
1779 0,
1780 );
1781 }
1782 }
1783 }
1784 }
1785 }
1786
1787 let tools = self.tools.read().await;
1789 let validation = validate_action(action, &self.state, &tools);
1790 drop(tools);
1791
1792 if !validation.valid() {
1793 let error = validation
1794 .errors
1795 .iter()
1796 .map(|e| e.reason.as_str())
1797 .collect::<Vec<_>>()
1798 .join("; ");
1799 let mut log = self.log.lock().await;
1800 log.append(
1801 EventKind::ActionRejected,
1802 Some(&action.id),
1803 Some(proposal_id),
1804 HashMap::new(),
1805 );
1806 return (rejected_result(&action.id, error), 0);
1807 }
1808
1809 {
1815 let mut violations = {
1816 let policies = self.policies.read().await;
1817 policies.check(action, &self.state)
1818 };
1819 if let Some(sid) = session_id {
1820 let session_engine = {
1825 let sessions = self.session_policies.read().await;
1826 sessions.get(sid).cloned()
1827 };
1828 if let Some(engine) = session_engine {
1829 let engine = engine.read().await;
1830 violations.extend(engine.check(action, &self.state));
1831 } else {
1832 let mut log = self.log.lock().await;
1838 log.append(
1839 EventKind::PolicyViolation,
1840 Some(&action.id),
1841 Some(proposal_id),
1842 HashMap::new(),
1843 );
1844 return (
1845 rejected_result(
1846 &action.id,
1847 format!(
1848 "unknown session id '{sid}' — open one via Runtime::open_session before executing under a session"
1849 ),
1850 ),
1851 0,
1852 );
1853 }
1854 }
1855 if !violations.is_empty() {
1856 let error = violations
1857 .iter()
1858 .map(|v| format!("policy '{}': {}", v.policy_name, v.reason))
1859 .collect::<Vec<_>>()
1860 .join("; ");
1861 let mut log = self.log.lock().await;
1862 log.append(
1863 EventKind::PolicyViolation,
1864 Some(&action.id),
1865 Some(proposal_id),
1866 HashMap::new(),
1867 );
1868 return (rejected_result(&action.id, error), 0);
1869 }
1870 }
1871
1872 {
1874 let mut log = self.log.lock().await;
1875 log.append(
1876 EventKind::ActionValidated,
1877 Some(&action.id),
1878 Some(proposal_id),
1879 HashMap::new(),
1880 );
1881 }
1882
1883 let (result, retries) = self.execute_with_retry(action, proposal_id, scope).await;
1885
1886 if action.idempotent && result.status == ActionStatus::Succeeded {
1888 let mut cache = self.idempotency_cache.lock().await;
1889 cache.insert(idempotency_key(action), result.clone());
1890 }
1891
1892 tracing::info!(
1893 status = ?result.status,
1894 duration_ms = result.duration_ms,
1895 "action completed"
1896 );
1897
1898 (result, retries)
1899 }
1900
1901 async fn execute_with_retry(
1904 &self,
1905 action: &Action,
1906 proposal_id: &str,
1907 scope: Option<&crate::scope::RuntimeScope>,
1908 ) -> (ActionResult, u32) {
1909 let max_attempts = if action.failure_behavior == FailureBehavior::Retry {
1910 action.max_retries + 1
1911 } else {
1912 1
1913 };
1914
1915 let mut last_error: Option<String> = None;
1916 let mut retries: u32 = 0;
1917
1918 for attempt in 0..max_attempts {
1919 if attempt > 0 {
1920 retries += 1;
1921 let delay = RETRY_BASE_DELAY_MS * RETRY_BACKOFF_FACTOR.pow(attempt as u32 - 1);
1922 tokio::time::sleep(Duration::from_millis(delay)).await;
1923 let mut log = self.log.lock().await;
1924 log.append(
1925 EventKind::ActionRetrying,
1926 Some(&action.id),
1927 Some(proposal_id),
1928 [("attempt".to_string(), Value::from(attempt + 1))].into(),
1929 );
1930 }
1931
1932 {
1933 let mut log = self.log.lock().await;
1934 log.append(
1935 EventKind::ActionExecuting,
1936 Some(&action.id),
1937 Some(proposal_id),
1938 HashMap::new(),
1939 );
1940 }
1941
1942 let start = std::time::Instant::now();
1943 let transitions_before = self.state.transition_count();
1944
1945 let exec_result = if let Some(timeout_ms) = action.timeout_ms {
1947 match timeout(
1948 Duration::from_millis(timeout_ms),
1949 self.dispatch(action, scope),
1950 )
1951 .await
1952 {
1953 Ok(r) => r,
1954 Err(_) => Err(format!("action timed out after {}ms", timeout_ms)),
1955 }
1956 } else {
1957 self.dispatch(action, scope).await
1958 };
1959
1960 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
1961
1962 match exec_result {
1963 Ok(output) => {
1964 let tenant_for_effects = scope.and_then(|s| s.tenant_id.as_deref());
1970 let scoped_state = self.state.scoped(tenant_for_effects);
1971 let mut state_changes: HashMap<String, Value> = HashMap::new();
1972 for (key, value) in &action.expected_effects {
1973 scoped_state.set(key, value.clone(), &action.id);
1974 state_changes.insert(key.clone(), value.clone());
1975 }
1976
1977 for t in self.state.transitions_since(transitions_before) {
1979 if !state_changes.contains_key(&t.key) {
1980 if let Some(v) = t.new_value {
1981 state_changes.insert(t.key.clone(), v);
1982 }
1983 }
1984 }
1985
1986 let mut log = self.log.lock().await;
1987 log.append(
1988 EventKind::ActionSucceeded,
1989 Some(&action.id),
1990 Some(proposal_id),
1991 [("duration_ms".to_string(), Value::from(duration_ms))].into(),
1992 );
1993
1994 if !state_changes.is_empty() {
1995 log.append(
1996 EventKind::StateChanged,
1997 Some(&action.id),
1998 Some(proposal_id),
1999 [(
2000 "changes".to_string(),
2001 serde_json::to_value(&state_changes).unwrap_or_default(),
2002 )]
2003 .into(),
2004 );
2005 }
2006
2007 return (
2008 ActionResult {
2009 action_id: action.id.clone(),
2010 status: ActionStatus::Succeeded,
2011 output: Some(output),
2012 error: None,
2013 state_changes,
2014 duration_ms: Some(duration_ms),
2015 timestamp: chrono::Utc::now(),
2016 },
2017 retries,
2018 );
2019 }
2020 Err(e) => {
2021 last_error = Some(e.clone());
2022 let mut log = self.log.lock().await;
2023 log.append(
2024 EventKind::ActionFailed,
2025 Some(&action.id),
2026 Some(proposal_id),
2027 [
2028 ("error".to_string(), Value::from(e.as_str())),
2029 ("attempt".to_string(), Value::from(attempt + 1)),
2030 ]
2031 .into(),
2032 );
2033 }
2034 }
2035 }
2036
2037 if action.failure_behavior == FailureBehavior::Skip {
2039 return (
2040 skipped_result(
2041 &action.id,
2042 last_error.as_deref().unwrap_or("all attempts exhausted"),
2043 ),
2044 retries,
2045 );
2046 }
2047
2048 (
2049 ActionResult {
2050 action_id: action.id.clone(),
2051 status: ActionStatus::Failed,
2052 output: None,
2053 error: last_error,
2054 state_changes: HashMap::new(),
2055 duration_ms: None,
2056 timestamp: chrono::Utc::now(),
2057 },
2058 retries,
2059 )
2060 }
2061
2062 async fn dispatch(
2072 &self,
2073 action: &Action,
2074 scope: Option<&crate::scope::RuntimeScope>,
2075 ) -> Result<Value, String> {
2076 match action.action_type {
2077 ActionType::ToolCall => {
2078 let tool_name = action.tool.as_deref().ok_or("tool_call has no tool")?;
2079 let params = Value::Object(
2080 action
2081 .parameters
2082 .iter()
2083 .map(|(k, v)| (k.clone(), v.clone()))
2084 .collect(),
2085 );
2086
2087 if let Some(cached) = self.result_cache.get(tool_name, ¶ms).await {
2089 return Ok(cached);
2090 }
2091
2092 self.rate_limiter.acquire(tool_name).await;
2094
2095 if matches!(
2097 tool_name,
2098 "infer" | "infer.grounded" | "embed" | "classify" | "transcribe" | "synthesize"
2099 ) {
2100 if let Some(ref engine) = self.inference_engine {
2101 let params = {
2104 let should_ground =
2105 tool_name == "infer.grounded" || tool_name == "infer";
2106 if should_ground {
2107 if let Some(ref memgine) = self.memgine {
2108 if let Some(prompt) =
2109 params.get("prompt").and_then(|v| v.as_str())
2110 {
2111 let ctx = {
2112 let mut m = memgine.lock().await;
2113 m.build_context(prompt)
2114 };
2115 if !ctx.is_empty() {
2116 let mut p = params.clone();
2117 if let Some(obj) = p.as_object_mut() {
2118 obj.insert("context".to_string(), Value::from(ctx));
2119 }
2120 p
2121 } else {
2122 params
2123 }
2124 } else {
2125 params
2126 }
2127 } else {
2128 params
2129 }
2130 } else {
2131 params
2132 }
2133 };
2134
2135 let effective_tool = if tool_name == "infer.grounded" {
2137 "infer"
2138 } else {
2139 tool_name
2140 };
2141 let result =
2142 car_inference::service::execute_tool(engine, effective_tool, ¶ms)
2143 .await
2144 .map_err(|e| e.to_string());
2145
2146 if let Ok(ref value) = result {
2147 self.result_cache
2148 .put(tool_name, ¶ms, value.clone())
2149 .await;
2150 }
2151
2152 return result;
2153 }
2154 }
2155
2156 if tool_name == "memory.consolidate" {
2158 if let Some(ref memgine) = self.memgine {
2159 let report = {
2160 let mut m = memgine.lock().await;
2161 m.consolidate().await
2162 };
2163 {
2165 let mut log = self.log.lock().await;
2166 log.append(
2167 EventKind::Consolidated,
2168 None,
2169 None,
2170 [
2171 (
2172 "expired_pruned".to_string(),
2173 Value::from(report.expired_pruned),
2174 ),
2175 (
2176 "superseded_gc".to_string(),
2177 Value::from(report.superseded_gc),
2178 ),
2179 (
2180 "stale_embeddings_removed".to_string(),
2181 Value::from(report.stale_embeddings_removed),
2182 ),
2183 (
2184 "nodes_embedded".to_string(),
2185 Value::from(report.nodes_embedded),
2186 ),
2187 (
2188 "domains_evolved".to_string(),
2189 Value::from(report.domains_evolved.clone()),
2190 ),
2191 ("total_nodes".to_string(), Value::from(report.total_nodes)),
2192 ("total_edges".to_string(), Value::from(report.total_edges)),
2193 ]
2194 .into(),
2195 );
2196 for key in &report.candidates_promoted {
2200 log.append(
2201 EventKind::CandidatePromoted,
2202 None,
2203 None,
2204 [("candidate".to_string(), Value::from(key.as_str()))].into(),
2205 );
2206 }
2207 for key in &report.candidates_rejected {
2208 log.append(
2209 EventKind::CandidateRejected,
2210 None,
2211 None,
2212 [("candidate".to_string(), Value::from(key.as_str()))].into(),
2213 );
2214 }
2215 }
2216 return Ok(serde_json::to_value(&report).unwrap_or(Value::Null));
2217 } else {
2218 return Err(
2219 "memory.consolidate requires memgine (attach with with_learning)"
2220 .into(),
2221 );
2222 }
2223 }
2224
2225 let configured = {
2231 let guard = self.tool_executor.lock().await;
2232 guard.as_ref().cloned()
2233 };
2234
2235 if let Some(ref executor) = configured {
2236 let result = executor
2237 .execute_with_action(tool_name, ¶ms, &action.id, action.timeout_ms)
2238 .await;
2239 let fall_through = matches!(&result, Err(e) if e.starts_with("unknown tool"));
2240 if !fall_through {
2241 if let Ok(ref value) = result {
2242 self.result_cache
2243 .put(tool_name, ¶ms, value.clone())
2244 .await;
2245 }
2246 return result;
2247 }
2248 }
2249
2250 if let Some(result) = crate::agent_basics::execute(tool_name, ¶ms).await {
2251 if let Ok(ref value) = result {
2252 self.result_cache
2253 .put(tool_name, ¶ms, value.clone())
2254 .await;
2255 }
2256 return result;
2257 }
2258
2259 Err(format!("no handler for tool '{}'", tool_name))
2260 }
2261 ActionType::StateWrite => {
2262 let key = action
2263 .parameters
2264 .get("key")
2265 .and_then(|v| v.as_str())
2266 .ok_or("state_write requires 'key' parameter")?;
2267 let value = action
2268 .parameters
2269 .get("value")
2270 .cloned()
2271 .unwrap_or(Value::Null);
2272 let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2273 self.state.scoped(tenant).set(key, value, &action.id);
2274 Ok(Value::from(format!("written: {}", key)))
2275 }
2276 ActionType::StateRead => {
2277 let key = action
2278 .parameters
2279 .get("key")
2280 .and_then(|v| v.as_str())
2281 .ok_or("state_read requires 'key' parameter")?;
2282 let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2283 Ok(self.state.scoped(tenant).get(key).unwrap_or(Value::Null))
2284 }
2285 ActionType::Assertion => {
2286 let key = action
2287 .parameters
2288 .get("key")
2289 .and_then(|v| v.as_str())
2290 .ok_or("assertion requires 'key' parameter")?;
2291 let expected = action
2292 .parameters
2293 .get("expected")
2294 .cloned()
2295 .unwrap_or(Value::Null);
2296 let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2297 let actual = self.state.scoped(tenant).get(key).unwrap_or(Value::Null);
2298 if actual != expected {
2299 Err(format!(
2300 "assertion failed: state['{}'] = {:?}, expected {:?}",
2301 key, actual, expected
2302 ))
2303 } else {
2304 Ok(serde_json::json!({"asserted": key, "value": actual}))
2305 }
2306 }
2307 }
2308 }
2309
2310 pub async fn save_checkpoint(&self) -> Checkpoint {
2314 let state = self.state.snapshot();
2315 let tools: Vec<String> = self.tools.read().await.keys().cloned().collect();
2316 let log = self.log.lock().await;
2317 let events: Vec<Value> = log
2318 .events()
2319 .iter()
2320 .map(|e| serde_json::to_value(e).unwrap_or_default())
2321 .collect();
2322
2323 Checkpoint {
2324 checkpoint_id: Uuid::new_v4().to_string(),
2325 created_at: chrono::Utc::now(),
2326 state,
2327 events,
2328 tools,
2329 metadata: HashMap::new(),
2330 }
2331 }
2332
2333 pub async fn save_checkpoint_to_file(&self, path: &str) -> Result<(), String> {
2335 let checkpoint = self.save_checkpoint().await;
2336 let json = serde_json::to_string_pretty(&checkpoint)
2337 .map_err(|e| format!("serialize error: {}", e))?;
2338 tokio::fs::write(path, json)
2339 .await
2340 .map_err(|e| format!("write error: {}", e))?;
2341 Ok(())
2342 }
2343
2344 pub async fn load_checkpoint_from_file(&self, path: &str) -> Result<Checkpoint, String> {
2346 let json = tokio::fs::read_to_string(path)
2347 .await
2348 .map_err(|e| format!("read error: {}", e))?;
2349 let checkpoint: Checkpoint =
2350 serde_json::from_str(&json).map_err(|e| format!("deserialize error: {}", e))?;
2351 self.restore_checkpoint(&checkpoint).await;
2352 Ok(checkpoint)
2353 }
2354
2355 pub async fn restore_checkpoint(&self, checkpoint: &Checkpoint) {
2357 self.state.replace_all(checkpoint.state.clone());
2359 self.idempotency_cache.lock().await.clear();
2362 let mut tools = self.tools.write().await;
2364 tools.clear();
2365 for tool_name in &checkpoint.tools {
2366 let schema = ToolSchema {
2367 name: tool_name.clone(),
2368 description: String::new(),
2369 parameters: serde_json::Value::Object(Default::default()),
2370 returns: None,
2371 idempotent: false,
2372 cache_ttl_secs: None,
2373 rate_limit: None,
2374 };
2375 tools.insert(tool_name.clone(), schema);
2376 }
2377 }
2378
2379 pub async fn register_subprocess_tool(
2384 &self,
2385 name: &str,
2386 tool: crate::subprocess::SubprocessTool,
2387 ) {
2388 use crate::subprocess::SubprocessToolExecutor;
2389
2390 let schema = ToolSchema {
2391 name: name.to_string(),
2392 description: format!("Subprocess tool: {}", tool.command),
2393 parameters: serde_json::Value::Object(Default::default()),
2394 returns: None,
2395 idempotent: false,
2396 cache_ttl_secs: None,
2397 rate_limit: None,
2398 };
2399 self.register_tool_schema(schema).await;
2400
2401 let mut guard = self.tool_executor.lock().await;
2402 let mut executor = match guard.take() {
2403 Some(existing) => {
2404 let mut sub = SubprocessToolExecutor::new();
2405 sub = sub.with_fallback(existing);
2406 sub
2407 }
2408 None => SubprocessToolExecutor::new(),
2409 };
2410 executor.register(name, tool);
2411 *guard = Some(std::sync::Arc::new(executor));
2412 }
2413}
2414
2415impl Default for Runtime {
2416 fn default() -> Self {
2417 Self::new()
2418 }
2419}