1use crate::cache::ResultCache;
4use crate::capabilities::CapabilitySet;
5use crate::checkpoint::Checkpoint;
6use crate::rate_limit::{RateLimit, RateLimiter};
7use car_eventlog::{EventKind, EventLog, SpanStatus};
8use car_ir::{
9 build_dag, Action, ActionProposal, ActionResult, ActionStatus, ActionType, CostSummary,
10 FailureBehavior, ProposalResult, ToolSchema,
11};
12use car_policy::PolicyEngine;
13use car_state::StateStore;
14use car_validator::validate_action;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
20use tokio::time::timeout;
21use tracing::instrument;
22use uuid::Uuid;
23
24const RETRY_BASE_DELAY_MS: u64 = 100;
26const RETRY_BACKOFF_FACTOR: u64 = 2;
27
28#[async_trait::async_trait]
36pub trait ReplanCallback: Send + Sync {
37 async fn replan(&self, ctx: &ReplanContext) -> Result<ActionProposal, String>;
38}
39
40#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42pub struct ReplanContext {
43 pub proposal_id: String,
45 pub attempt: u32,
47 pub failed_actions: Vec<FailedActionSummary>,
49 pub completed_action_ids: Vec<String>,
51 pub state_snapshot: HashMap<String, Value>,
53 pub replans_remaining: u32,
55 pub original_source: String,
57 pub original_action_count: usize,
59 pub original_context: HashMap<String, Value>,
61}
62
63#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
65pub struct FailedActionSummary {
66 pub action_id: String,
67 pub tool: Option<String>,
68 pub error: String,
69 pub parameters: HashMap<String, Value>,
70}
71
72#[derive(Debug, Clone)]
74pub struct ReplanConfig {
75 pub max_replans: u32,
77 pub delay_ms: u64,
80 pub verify_before_execute: bool,
84}
85
86impl Default for ReplanConfig {
87 fn default() -> Self {
88 Self {
89 max_replans: 0,
90 delay_ms: 0,
91 verify_before_execute: true,
92 }
93 }
94}
95
96#[async_trait::async_trait]
101pub trait ToolExecutor: Send + Sync {
102 async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String>;
103}
104
105fn idempotency_key(action: &Action) -> String {
107 let sorted: std::collections::BTreeMap<_, _> = action.parameters.iter().collect();
108 let params = serde_json::to_string(&sorted).unwrap_or_default();
109 format!(
110 "{}:{}:{}",
111 serde_json::to_string(&action.action_type).unwrap_or_default(),
112 action.tool.as_deref().unwrap_or(""),
113 params
114 )
115}
116
117fn rejected_result(action_id: &str, error: String) -> ActionResult {
118 ActionResult {
119 action_id: action_id.to_string(),
120 status: ActionStatus::Rejected,
121 output: None,
122 error: Some(error),
123 state_changes: HashMap::new(),
124 duration_ms: None,
125 timestamp: chrono::Utc::now(),
126 }
127}
128
129fn snapshot_relevant_keys(
133 state: &car_state::StateStore,
134 action: &Action,
135) -> HashMap<String, Value> {
136 let mut keys: std::collections::HashSet<&str> = std::collections::HashSet::new();
137 for dep in &action.state_dependencies {
138 keys.insert(dep.as_str());
139 }
140 for key in action.expected_effects.keys() {
141 keys.insert(key.as_str());
142 }
143 if action.action_type == ActionType::StateWrite {
145 if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
146 keys.insert(key);
147 }
148 }
149
150 if keys.is_empty() {
151 return HashMap::new();
153 }
154
155 keys.iter()
156 .filter_map(|&k| state.get(k).map(|v| (k.to_string(), v)))
157 .collect()
158}
159
160fn skipped_result(action_id: &str, reason: &str) -> ActionResult {
161 ActionResult {
162 action_id: action_id.to_string(),
163 status: ActionStatus::Skipped,
164 output: None,
165 error: Some(reason.to_string()),
166 state_changes: HashMap::new(),
167 duration_ms: None,
168 timestamp: chrono::Utc::now(),
169 }
170}
171
172pub const CANCELED_PREFIX: &str = "canceled: ";
179
180fn canceled_result(action_id: &str, reason: &str) -> ActionResult {
185 ActionResult {
186 action_id: action_id.to_string(),
187 status: ActionStatus::Skipped,
188 output: None,
189 error: Some(format!("{}{}", CANCELED_PREFIX, reason)),
190 state_changes: HashMap::new(),
191 duration_ms: None,
192 timestamp: chrono::Utc::now(),
193 }
194}
195
196pub fn format_tool_result(result: &ActionResult) -> String {
198 match result.status {
199 ActionStatus::Succeeded => match &result.output {
200 Some(v) => serde_json::to_string(v).unwrap_or_else(|_| v.to_string()),
201 None => String::new(),
202 },
203 ActionStatus::Rejected => format!("[REJECTED] {}", result.error.as_deref().unwrap_or("")),
204 ActionStatus::Failed => format!("[FAILED] {}", result.error.as_deref().unwrap_or("")),
205 _ => format!(
206 "[{:?}] {}",
207 result.status,
208 result.error.as_deref().unwrap_or("")
209 ),
210 }
211}
212
213#[derive(Debug, Clone)]
215pub struct CostBudget {
216 pub max_tool_calls: Option<u32>,
217 pub max_duration_ms: Option<f64>,
218 pub max_actions: Option<u32>,
219}
220
221pub struct Runtime {
235 pub state: Arc<StateStore>,
236 pub tools: Arc<TokioRwLock<HashMap<String, ToolSchema>>>,
237 pub policies: Arc<TokioRwLock<PolicyEngine>>,
238 pub session_policies: Arc<TokioRwLock<HashMap<String, Arc<TokioRwLock<PolicyEngine>>>>>,
249 pub log: Arc<TokioMutex<EventLog>>,
250 pub rate_limiter: Arc<RateLimiter>,
251 pub result_cache: Arc<ResultCache>,
252 tool_executor: TokioMutex<Option<Arc<dyn ToolExecutor>>>,
253 idempotency_cache: TokioMutex<HashMap<String, ActionResult>>,
254 cost_budget: TokioRwLock<Option<CostBudget>>,
255 capabilities: TokioRwLock<Option<CapabilitySet>>,
256 inference_engine: Option<Arc<car_inference::InferenceEngine>>,
257 memgine: Option<Arc<TokioMutex<car_memgine::MemgineEngine>>>,
259 auto_distill: bool,
261 trajectory_store: Option<Arc<car_memgine::TrajectoryStore>>,
263 replan_callback: TokioMutex<Option<Arc<dyn ReplanCallback>>>,
265 replan_config: TokioRwLock<ReplanConfig>,
267 pub registry: Arc<crate::registry::ToolRegistry>,
269}
270
271impl Runtime {
272 pub fn new() -> Self {
273 Self {
274 state: Arc::new(StateStore::new()),
275 tools: Arc::new(TokioRwLock::new(HashMap::new())),
276 policies: Arc::new(TokioRwLock::new(PolicyEngine::new())),
277 session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
278 log: Arc::new(TokioMutex::new(EventLog::new())),
279 rate_limiter: Arc::new(RateLimiter::new()),
280 result_cache: Arc::new(ResultCache::new()),
281 tool_executor: TokioMutex::new(None),
282 idempotency_cache: TokioMutex::new(HashMap::new()),
283 cost_budget: TokioRwLock::new(None),
284 capabilities: TokioRwLock::new(None),
285 inference_engine: None,
286 memgine: None,
287 auto_distill: false,
288 trajectory_store: None,
289 replan_callback: TokioMutex::new(None),
290 replan_config: TokioRwLock::new(ReplanConfig::default()),
291 registry: Arc::new(crate::registry::ToolRegistry::new()),
292 }
293 }
294
295 pub fn with_shared(
298 state: Arc<StateStore>,
299 log: Arc<TokioMutex<EventLog>>,
300 policies: Arc<TokioRwLock<PolicyEngine>>,
301 ) -> Self {
302 Self {
303 state,
304 tools: Arc::new(TokioRwLock::new(HashMap::new())),
305 policies,
306 session_policies: Arc::new(TokioRwLock::new(HashMap::new())),
311 log,
312 rate_limiter: Arc::new(RateLimiter::new()),
313 result_cache: Arc::new(ResultCache::new()),
314 tool_executor: TokioMutex::new(None),
315 idempotency_cache: TokioMutex::new(HashMap::new()),
316 cost_budget: TokioRwLock::new(None),
317 capabilities: TokioRwLock::new(None),
318 inference_engine: None,
319 memgine: None,
320 auto_distill: false,
321 trajectory_store: None,
322 replan_callback: TokioMutex::new(None),
323 replan_config: TokioRwLock::new(ReplanConfig::default()),
324 registry: Arc::new(crate::registry::ToolRegistry::new()),
325 }
326 }
327
328 pub async fn open_session(&self) -> String {
346 let id = Uuid::new_v4().to_string();
347 let mut sessions = self.session_policies.write().await;
348 sessions.insert(id.clone(), Arc::new(TokioRwLock::new(PolicyEngine::new())));
349 id
350 }
351
352 pub async fn close_session(&self, session_id: &str) -> bool {
357 let mut sessions = self.session_policies.write().await;
358 sessions.remove(session_id).is_some()
359 }
360
361 pub async fn register_policy_in_session(
370 &self,
371 session_id: &str,
372 name: &str,
373 check: car_policy::PolicyCheck,
374 description: &str,
375 ) -> Result<(), String> {
376 let engine = {
377 let sessions = self.session_policies.read().await;
378 sessions
379 .get(session_id)
380 .cloned()
381 .ok_or_else(|| format!("unknown session id '{session_id}'"))?
382 };
383 let mut engine = engine.write().await;
384 engine.register(name, check, description);
385 Ok(())
386 }
387
388 pub async fn session_exists(&self, session_id: &str) -> bool {
392 self.session_policies.read().await.contains_key(session_id)
393 }
394
395 pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
398 self.inference_engine = Some(engine);
399 if let Ok(mut tools) = self.tools.try_write() {
401 for schema in car_inference::service::all_schemas() {
402 tools.insert(schema.name.clone(), schema);
403 }
404 }
405 self
406 }
407
408 pub fn with_learning(
412 mut self,
413 memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>,
414 auto_distill: bool,
415 ) -> Self {
416 self.memgine = Some(memgine);
417 self.auto_distill = auto_distill;
418 self
419 }
420
421 pub fn with_memgine(self, memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>) -> Self {
423 self.with_learning(memgine, true)
424 }
425
426 pub fn with_trajectory_store(mut self, store: Arc<car_memgine::TrajectoryStore>) -> Self {
428 self.trajectory_store = Some(store);
429 self
430 }
431
432 pub fn with_executor(self, executor: Arc<dyn ToolExecutor>) -> Self {
433 if let Ok(mut guard) = self.tool_executor.try_lock() {
435 *guard = Some(executor);
436 }
437 self
438 }
439
440 pub async fn set_executor(&self, executor: Arc<dyn ToolExecutor>) {
443 *self.tool_executor.lock().await = Some(executor);
444 }
445
446 pub fn with_event_log(mut self, log: EventLog) -> Self {
447 self.log = Arc::new(TokioMutex::new(log));
448 self
449 }
450
451 pub fn with_replan(self, callback: Arc<dyn ReplanCallback>, config: ReplanConfig) -> Self {
453 if let Ok(mut guard) = self.replan_callback.try_lock() {
454 *guard = Some(callback);
455 }
456 if let Ok(mut guard) = self.replan_config.try_write() {
457 *guard = config;
458 }
459 self
460 }
461
462 pub async fn set_replan_callback(&self, callback: Arc<dyn ReplanCallback>) {
464 *self.replan_callback.lock().await = Some(callback);
465 }
466
467 pub async fn set_replan_config(&self, config: ReplanConfig) {
469 *self.replan_config.write().await = config;
470 }
471
472 pub async fn register_tool(&self, name: &str) {
474 let schema = ToolSchema {
475 name: name.to_string(),
476 description: String::new(),
477 parameters: serde_json::Value::Object(Default::default()),
478 returns: None,
479 idempotent: false,
480 cache_ttl_secs: None,
481 rate_limit: None,
482 };
483 self.register_tool_schema(schema).await;
484 }
485
486 pub async fn register_tool_schema(&self, schema: ToolSchema) {
488 if let Some(ttl) = schema.cache_ttl_secs {
490 self.result_cache.enable_caching(&schema.name, ttl).await;
491 }
492 if let Some(ref rl) = schema.rate_limit {
494 self.rate_limiter
495 .set_limit(
496 &schema.name,
497 RateLimit {
498 max_calls: rl.max_calls,
499 interval_secs: rl.interval_secs,
500 },
501 )
502 .await;
503 }
504 self.tools.write().await.insert(schema.name.clone(), schema);
505 }
506
507 pub async fn register_tool_entry(&self, entry: crate::registry::ToolEntry) {
511 let schema = entry.schema.clone();
512 self.registry.register(entry).await;
513 self.register_tool_schema(schema).await;
514 }
515
516 pub async fn register_agent_basics(&self) {
521 for entry in crate::agent_basics::entries() {
522 self.register_tool_entry(entry).await;
523 }
524 }
525
526 pub async fn tool_schemas(&self) -> Vec<ToolSchema> {
528 self.tools.read().await.values().cloned().collect()
529 }
530
531 pub async fn set_cost_budget(&self, budget: CostBudget) {
533 *self.cost_budget.write().await = Some(budget);
534 }
535
536 pub async fn set_capabilities(&self, caps: CapabilitySet) {
538 *self.capabilities.write().await = Some(caps);
539 }
540
541 pub async fn set_rate_limit(&self, tool: &str, max_calls: u32, interval_secs: f64) {
547 self.rate_limiter
548 .set_limit(
549 tool,
550 RateLimit {
551 max_calls,
552 interval_secs,
553 },
554 )
555 .await;
556 }
557
558 pub async fn enable_tool_cache(&self, tool: &str, ttl_secs: u64) {
560 self.result_cache.enable_caching(tool, ttl_secs).await;
561 }
562
563 #[instrument(
573 name = "proposal.execute",
574 skip_all,
575 fields(
576 proposal_id = %proposal.id,
577 action_count = proposal.actions.len(),
578 )
579 )]
580 pub async fn execute(&self, proposal: &ActionProposal) -> ProposalResult {
581 let token = tokio_util::sync::CancellationToken::new();
584 self.execute_with_cancel(proposal, &token).await
585 }
586
587 pub async fn execute_with_session(
599 &self,
600 proposal: &ActionProposal,
601 session_id: &str,
602 ) -> ProposalResult {
603 let token = tokio_util::sync::CancellationToken::new();
604 self.execute_with_session_and_cancel(proposal, session_id, &token).await
605 }
606
607 pub async fn execute_with_session_and_cancel(
611 &self,
612 proposal: &ActionProposal,
613 session_id: &str,
614 cancel: &tokio_util::sync::CancellationToken,
615 ) -> ProposalResult {
616 self.execute_with_optional_session(proposal, Some(session_id), None, cancel)
617 .await
618 }
619
620 pub async fn execute_with_cancel(
648 &self,
649 proposal: &ActionProposal,
650 cancel: &tokio_util::sync::CancellationToken,
651 ) -> ProposalResult {
652 self.execute_with_optional_session(proposal, None, None, cancel)
653 .await
654 }
655
656 pub async fn execute_scoped(
673 &self,
674 proposal: &ActionProposal,
675 scope: &crate::scope::RuntimeScope,
676 ) -> ProposalResult {
677 let token = tokio_util::sync::CancellationToken::new();
678 self.execute_scoped_with_cancel(proposal, scope, &token).await
679 }
680
681 pub async fn execute_scoped_with_cancel(
686 &self,
687 proposal: &ActionProposal,
688 scope: &crate::scope::RuntimeScope,
689 cancel: &tokio_util::sync::CancellationToken,
690 ) -> ProposalResult {
691 self.execute_with_optional_session(proposal, None, Some(scope), cancel)
692 .await
693 }
694
695 async fn execute_with_optional_session(
701 &self,
702 proposal: &ActionProposal,
703 session_id: Option<&str>,
704 scope: Option<&crate::scope::RuntimeScope>,
705 cancel: &tokio_util::sync::CancellationToken,
706 ) -> ProposalResult {
707 let config = self.replan_config.read().await.clone();
708 let mut current_proposal = proposal.clone();
709 let mut attempt: u32 = 0;
710
711 if let Some(s) = scope {
717 if !s.is_unscoped() {
718 let mut props: HashMap<String, Value> = HashMap::new();
719 if let Some(cid) = &s.caller_id {
720 props.insert("caller_id".to_string(), Value::from(cid.as_str()));
721 }
722 if let Some(tid) = &s.tenant_id {
723 props.insert("tenant_id".to_string(), Value::from(tid.as_str()));
724 }
725 if !s.claims.is_empty() {
726 if let Ok(claims_json) = serde_json::to_value(&s.claims) {
727 props.insert("claims".to_string(), claims_json);
728 }
729 }
730 let mut log = self.log.lock().await;
731 log.append(
732 EventKind::SessionScope,
733 None,
734 Some(&proposal.id),
735 props,
736 );
737 }
738 }
739
740 loop {
741 let (result, state_before_map) = self
742 .execute_inner_with_cancel(¤t_proposal, Some(cancel), session_id, scope)
743 .await;
744
745 let aborted = result
747 .results
748 .iter()
749 .any(|r| r.status == ActionStatus::Failed);
750 if !aborted || attempt >= config.max_replans {
751 if aborted && attempt > 0 {
752 let mut log = self.log.lock().await;
754 log.append(
755 EventKind::ReplanExhausted,
756 None,
757 Some(&proposal.id),
758 [("attempts".to_string(), Value::from(attempt))].into(),
759 );
760 }
761
762 let outcome = if !aborted {
764 if attempt > 0 {
765 car_memgine::TrajectoryOutcome::ReplanSuccess
766 } else {
767 car_memgine::TrajectoryOutcome::Success
768 }
769 } else if attempt > 0 {
770 car_memgine::TrajectoryOutcome::ReplanExhausted
771 } else {
772 car_memgine::TrajectoryOutcome::Failed
773 };
774 if let Some(err) = self.persist_trajectory(
775 proposal,
776 ¤t_proposal,
777 &result,
778 outcome,
779 attempt,
780 &state_before_map,
781 ) {
782 let mut log = self.log.lock().await;
783 log.append(
784 EventKind::ActionFailed,
785 None,
786 Some(&proposal.id),
787 [(
788 "trajectory_persist_error".to_string(),
789 Value::from(err.as_str()),
790 )]
791 .into(),
792 );
793 }
794
795 return result;
796 }
797
798 let callback = {
800 let guard = self.replan_callback.lock().await;
801 guard.clone()
802 };
803 let Some(callback) = callback else {
804 if let Some(err) = self.persist_trajectory(
806 proposal,
807 ¤t_proposal,
808 &result,
809 car_memgine::TrajectoryOutcome::Failed,
810 attempt,
811 &state_before_map,
812 ) {
813 let mut log = self.log.lock().await;
814 log.append(
815 EventKind::ActionFailed,
816 None,
817 Some(&proposal.id),
818 [(
819 "trajectory_persist_error".to_string(),
820 Value::from(err.as_str()),
821 )]
822 .into(),
823 );
824 }
825 return result;
826 };
827
828 let failed_actions: Vec<FailedActionSummary> = result
830 .results
831 .iter()
832 .filter(|r| r.status == ActionStatus::Failed)
833 .map(|r| {
834 let action = current_proposal
835 .actions
836 .iter()
837 .find(|a| a.id == r.action_id);
838 FailedActionSummary {
839 action_id: r.action_id.clone(),
840 tool: action.and_then(|a| a.tool.clone()),
841 error: r.error.clone().unwrap_or_default(),
842 parameters: action.map(|a| a.parameters.clone()).unwrap_or_default(),
843 }
844 })
845 .collect();
846
847 let completed_action_ids: Vec<String> = result
848 .results
849 .iter()
850 .filter(|r| r.status == ActionStatus::Succeeded)
851 .map(|r| r.action_id.clone())
852 .collect();
853
854 let ctx = ReplanContext {
855 proposal_id: proposal.id.clone(),
856 attempt: attempt + 1,
857 failed_actions,
858 completed_action_ids,
859 state_snapshot: self.state.snapshot(),
860 replans_remaining: config.max_replans.saturating_sub(attempt + 1),
861 original_source: proposal.source.clone(),
862 original_action_count: proposal.actions.len(),
863 original_context: proposal.context.clone(),
864 };
865
866 if config.delay_ms > 0 {
868 tokio::time::sleep(Duration::from_millis(config.delay_ms)).await;
869 }
870
871 {
873 let mut log = self.log.lock().await;
874 log.append(
875 EventKind::ReplanAttempted,
876 None,
877 Some(&proposal.id),
878 [
879 ("attempt".to_string(), Value::from(attempt + 1)),
880 (
881 "failed_count".to_string(),
882 Value::from(ctx.failed_actions.len()),
883 ),
884 ]
885 .into(),
886 );
887 }
888
889 match callback.replan(&ctx).await {
891 Ok(new_proposal) => {
892 if config.verify_before_execute {
894 let tools_guard = self.tools.read().await;
895 let tool_names: std::collections::HashSet<String> =
896 tools_guard.keys().cloned().collect();
897 drop(tools_guard);
898
899 let current_state = self.state.snapshot();
900 let vr = car_verify::verify(
901 &new_proposal,
902 Some(¤t_state),
903 Some(&tool_names),
904 100,
905 );
906 if !vr.valid {
907 let error_msgs: Vec<String> = vr
908 .issues
909 .iter()
910 .filter(|i| i.severity == "error")
911 .map(|i| i.message.clone())
912 .collect();
913 let mut log = self.log.lock().await;
914 log.append(
915 EventKind::ReplanRejected,
916 None,
917 Some(&proposal.id),
918 [
919 ("errors".to_string(), Value::from(error_msgs.join("; "))),
920 ("attempt".to_string(), Value::from(attempt + 1)),
921 ]
922 .into(),
923 );
924 attempt += 1;
926 continue;
927 }
928 }
929
930 {
932 let mut log = self.log.lock().await;
933 log.append(
934 EventKind::ReplanProposalReceived,
935 None,
936 Some(&proposal.id),
937 [
938 ("attempt".to_string(), Value::from(attempt + 1)),
939 (
940 "new_action_count".to_string(),
941 Value::from(new_proposal.actions.len()),
942 ),
943 ]
944 .into(),
945 );
946 }
947 current_proposal = new_proposal;
948 attempt += 1;
949 }
950 Err(e) => {
951 let mut log = self.log.lock().await;
953 log.append(
954 EventKind::ReplanExhausted,
955 None,
956 Some(&proposal.id),
957 [
958 ("reason".to_string(), Value::from("callback_error")),
959 ("error".to_string(), Value::from(e.as_str())),
960 ("attempt".to_string(), Value::from(attempt + 1)),
961 ]
962 .into(),
963 );
964 if let Some(err) = self.persist_trajectory(
965 proposal,
966 ¤t_proposal,
967 &result,
968 car_memgine::TrajectoryOutcome::Failed,
969 attempt,
970 &state_before_map,
971 ) {
972 log.append(
973 EventKind::ActionFailed,
974 None,
975 Some(&proposal.id),
976 [(
977 "trajectory_persist_error".to_string(),
978 Value::from(err.as_str()),
979 )]
980 .into(),
981 );
982 }
983 return result;
984 }
985 }
986 }
987 }
988
989 fn persist_trajectory(
991 &self,
992 proposal: &ActionProposal,
993 current_proposal: &ActionProposal,
994 result: &ProposalResult,
995 outcome: car_memgine::TrajectoryOutcome,
996 attempt: u32,
997 state_before_map: &HashMap<String, HashMap<String, Value>>,
998 ) -> Option<String> {
999 let store = self.trajectory_store.as_ref()?;
1000
1001 let trace_events: Vec<car_memgine::TraceEvent> = result
1002 .results
1003 .iter()
1004 .map(|r| {
1005 let kind = match r.status {
1006 ActionStatus::Succeeded => "action_succeeded",
1007 ActionStatus::Failed => "action_failed",
1008 ActionStatus::Rejected => "action_rejected",
1009 ActionStatus::Skipped => "action_skipped",
1010 _ => "unknown",
1011 };
1012 let tool = current_proposal
1013 .actions
1014 .iter()
1015 .find(|a| a.id == r.action_id)
1016 .and_then(|a| a.tool.clone());
1017 let reward = match r.status {
1018 ActionStatus::Succeeded => Some(1.0),
1019 ActionStatus::Failed => Some(0.0),
1020 ActionStatus::Rejected => Some(0.0),
1021 ActionStatus::Skipped => None,
1022 _ => None,
1023 };
1024 car_memgine::TraceEvent {
1025 kind: kind.to_string(),
1026 action_id: Some(r.action_id.clone()),
1027 tool,
1028 data: r
1029 .error
1030 .as_ref()
1031 .map(|e| serde_json::json!({"error": e}))
1032 .unwrap_or(serde_json::json!({})),
1033 duration_ms: r.duration_ms,
1034 state_before: state_before_map.get(&r.action_id).cloned(),
1035 state_after: if !r.state_changes.is_empty() {
1036 Some(r.state_changes.clone())
1037 } else {
1038 None
1039 },
1040 reward,
1041 }
1042 })
1043 .collect();
1044
1045 let trajectory = car_memgine::Trajectory {
1046 proposal_id: proposal.id.clone(),
1047 source: proposal.source.clone(),
1048 action_count: current_proposal.actions.len(),
1049 events: trace_events,
1050 outcome,
1051 timestamp: chrono::Utc::now(),
1052 duration_ms: result.cost.total_duration_ms,
1053 replan_attempts: attempt,
1054 };
1055
1056 match store.append(&trajectory) {
1057 Ok(()) => None,
1058 Err(e) => Some(e.to_string()),
1059 }
1060 }
1061
1062 pub async fn plan_and_execute(
1068 &self,
1069 candidates: &[ActionProposal],
1070 planner_config: Option<car_planner::PlannerConfig>,
1071 feedback: Option<&car_planner::ToolFeedback>,
1072 ) -> ProposalResult {
1073 if candidates.is_empty() {
1074 return ProposalResult {
1075 proposal_id: "empty".to_string(),
1076 results: vec![],
1077 cost: car_ir::CostSummary::default(),
1078 };
1079 }
1080
1081 let planner = car_planner::Planner::new(planner_config.unwrap_or_default());
1083 let tools_guard = self.tools.read().await;
1084 let tool_names: std::collections::HashSet<String> = tools_guard.keys().cloned().collect();
1085 drop(tools_guard);
1086
1087 let pre_plan_snapshot = self.state.snapshot();
1088 let pre_plan_transitions = self.state.transition_count();
1089 let ranked = planner.rank_with_feedback(
1090 candidates,
1091 Some(&pre_plan_snapshot),
1092 Some(&tool_names),
1093 feedback,
1094 );
1095
1096 let mut first_failure: Option<ProposalResult> = None;
1098 for scored in &ranked {
1099 if !scored.valid {
1100 continue;
1101 }
1102
1103 self.state
1106 .restore(pre_plan_snapshot.clone(), pre_plan_transitions);
1107
1108 let proposal = &candidates[scored.index];
1109 let result = self.execute(proposal).await;
1110
1111 if result.all_succeeded() {
1112 return result;
1113 }
1114
1115 tracing::info!(
1116 proposal_id = %proposal.id,
1117 score = scored.score,
1118 "plan_and_execute: proposal failed, trying next candidate"
1119 );
1120
1121 if first_failure.is_none() {
1122 first_failure = Some(result);
1123 }
1124 }
1125
1126 first_failure.unwrap_or_else(|| ProposalResult {
1128 proposal_id: candidates[0].id.clone(),
1129 results: vec![],
1130 cost: car_ir::CostSummary::default(),
1131 })
1132 }
1133
1134 async fn execute_inner_with_cancel(
1142 &self,
1143 proposal: &ActionProposal,
1144 cancel: Option<&tokio_util::sync::CancellationToken>,
1145 session_id: Option<&str>,
1146 scope: Option<&crate::scope::RuntimeScope>,
1147 ) -> (ProposalResult, HashMap<String, HashMap<String, Value>>) {
1148 let trace_id = Uuid::new_v4().to_string();
1150
1151 let root_span_id = {
1153 let mut log = self.log.lock().await;
1154 log.begin_span(
1155 "proposal.execute",
1156 &trace_id,
1157 None,
1158 [("proposal_id".to_string(), Value::from(proposal.id.as_str()))].into(),
1159 )
1160 };
1161
1162 {
1164 let mut log = self.log.lock().await;
1165 log.append(
1166 EventKind::ProposalReceived,
1167 None,
1168 Some(&proposal.id),
1169 [
1170 ("source".to_string(), Value::from(proposal.source.as_str())),
1171 (
1172 "action_count".to_string(),
1173 Value::from(proposal.actions.len()),
1174 ),
1175 ]
1176 .into(),
1177 );
1178 }
1179
1180 {
1182 let caps = self.capabilities.read().await;
1183 if let Some(ref cap) = *caps {
1184 if !cap.actions_within_budget(proposal.actions.len() as u32) {
1185 let mut action_results = Vec::new();
1186 for action in &proposal.actions {
1187 action_results.push(rejected_result(
1188 &action.id,
1189 format!(
1190 "capability denied: proposal has {} actions, max allowed is {:?}",
1191 proposal.actions.len(),
1192 cap.max_actions
1193 ),
1194 ));
1195 }
1196 return (
1197 ProposalResult {
1198 proposal_id: proposal.id.clone(),
1199 results: action_results,
1200 cost: CostSummary::default(),
1201 },
1202 HashMap::new(),
1203 );
1204 }
1205 }
1206 }
1207
1208 let snapshot = self.state.snapshot();
1210 let transition_count = self.state.transition_count();
1211
1212 let mut results: Vec<ActionResult> = Vec::new();
1213 let mut state_before_map: HashMap<String, HashMap<String, Value>> = HashMap::new();
1215 let mut aborted = false;
1216 let mut budget_exceeded = false;
1217 let mut total_retries: u32 = 0;
1218
1219 let mut running_tool_calls: u32 = 0;
1221 let mut running_actions: u32 = 0;
1222 let mut running_duration_ms: f64 = 0.0;
1223
1224 let budget = self.cost_budget.read().await.clone();
1226
1227 let levels = build_dag(&proposal.actions);
1229
1230 let mut canceled = false;
1231 for level in &levels {
1232 if !canceled {
1238 if let Some(token) = cancel {
1239 if token.is_cancelled() {
1240 canceled = true;
1241 }
1242 }
1243 }
1244 if canceled {
1245 for &idx in level {
1246 results.push(canceled_result(
1247 &proposal.actions[idx].id,
1248 "cancellation requested by caller",
1249 ));
1250 }
1251 continue;
1252 }
1253 if aborted || budget_exceeded {
1254 let skip_reason = if budget_exceeded {
1255 "cost budget exceeded"
1256 } else {
1257 "skipped due to earlier abort"
1258 };
1259 for &idx in level {
1260 results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1261 }
1262 continue;
1263 }
1264
1265 let has_abort = level
1267 .iter()
1268 .any(|&i| proposal.actions[i].failure_behavior == FailureBehavior::Abort);
1269
1270 if level.len() == 1 || has_abort {
1271 for &idx in level {
1273 if aborted || budget_exceeded {
1274 let skip_reason = if budget_exceeded {
1275 "cost budget exceeded"
1276 } else {
1277 "skipped due to abort"
1278 };
1279 results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1280 continue;
1281 }
1282
1283 if let Some(ref b) = budget {
1285 if let Some(max) = b.max_actions {
1286 if running_actions >= max {
1287 budget_exceeded = true;
1288 results.push(skipped_result(
1289 &proposal.actions[idx].id,
1290 "cost budget exceeded",
1291 ));
1292 continue;
1293 }
1294 }
1295 if let Some(max) = b.max_tool_calls {
1296 if proposal.actions[idx].action_type == ActionType::ToolCall
1297 && running_tool_calls >= max
1298 {
1299 budget_exceeded = true;
1300 results.push(skipped_result(
1301 &proposal.actions[idx].id,
1302 "cost budget exceeded",
1303 ));
1304 continue;
1305 }
1306 }
1307 if let Some(max) = b.max_duration_ms {
1308 if running_duration_ms >= max {
1309 budget_exceeded = true;
1310 results.push(skipped_result(
1311 &proposal.actions[idx].id,
1312 "cost budget exceeded",
1313 ));
1314 continue;
1315 }
1316 }
1317 }
1318
1319 state_before_map.insert(
1320 proposal.actions[idx].id.clone(),
1321 snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1322 );
1323 let (ar, action_retries) = self
1324 .process_action(
1325 &proposal.actions[idx],
1326 &proposal.id,
1327 &trace_id,
1328 &root_span_id,
1329 session_id,
1330 scope,
1331 )
1332 .await;
1333 total_retries += action_retries;
1334
1335 if ar.status == ActionStatus::Succeeded
1337 && proposal.actions[idx].action_type == ActionType::ToolCall
1338 {
1339 running_tool_calls += 1;
1340 }
1341 if ar.status != ActionStatus::Skipped {
1342 running_actions += 1;
1343 }
1344 if let Some(d) = ar.duration_ms {
1345 running_duration_ms += d;
1346 }
1347
1348 if ar.status == ActionStatus::Failed
1349 && proposal.actions[idx].failure_behavior == FailureBehavior::Abort
1350 {
1351 aborted = true;
1352 }
1353 results.push(ar);
1354 }
1355 } else {
1356 for &idx in level {
1359 state_before_map.insert(
1360 proposal.actions[idx].id.clone(),
1361 snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1362 );
1363 }
1364 let futs: Vec<_> = level
1365 .iter()
1366 .map(|&idx| {
1367 self.process_action(
1368 &proposal.actions[idx],
1369 &proposal.id,
1370 &trace_id,
1371 &root_span_id,
1372 session_id,
1373 scope,
1374 )
1375 })
1376 .collect();
1377 let level_results = futures::future::join_all(futs).await;
1378
1379 for (i, (ar, action_retries)) in level_results.into_iter().enumerate() {
1380 let idx = level[i];
1381 total_retries += action_retries;
1382 if ar.status == ActionStatus::Succeeded
1383 && proposal.actions[idx].action_type == ActionType::ToolCall
1384 {
1385 running_tool_calls += 1;
1386 }
1387 if ar.status != ActionStatus::Skipped {
1388 running_actions += 1;
1389 }
1390 if let Some(d) = ar.duration_ms {
1391 running_duration_ms += d;
1392 }
1393 results.push(ar);
1394 }
1395 }
1396 }
1397
1398 if aborted {
1400 self.state.restore(snapshot.clone(), transition_count);
1401
1402 let mut log = self.log.lock().await;
1403 log.append(
1404 EventKind::StateSnapshot,
1405 None,
1406 Some(&proposal.id),
1407 [(
1408 "state".to_string(),
1409 serde_json::to_value(&snapshot).unwrap_or_default(),
1410 )]
1411 .into(),
1412 );
1413 log.append(
1414 EventKind::StateRollback,
1415 None,
1416 Some(&proposal.id),
1417 [(
1418 "rolled_back_to".to_string(),
1419 Value::from("pre-proposal snapshot"),
1420 )]
1421 .into(),
1422 );
1423
1424 let mut cache = self.idempotency_cache.lock().await;
1426 for r in &results {
1427 if r.status == ActionStatus::Succeeded {
1428 for action in &proposal.actions {
1429 if action.id == r.action_id && action.idempotent {
1430 cache.remove(&idempotency_key(action));
1431 }
1432 }
1433 }
1434 }
1435 }
1436
1437 let action_order: HashMap<String, usize> = proposal
1439 .actions
1440 .iter()
1441 .enumerate()
1442 .map(|(i, a)| (a.id.clone(), i))
1443 .collect();
1444 results.sort_by_key(|r| {
1445 action_order
1446 .get(&r.action_id)
1447 .copied()
1448 .unwrap_or(usize::MAX)
1449 });
1450
1451 let mut cost = CostSummary::default();
1453 for r in &results {
1454 let action = action_order
1455 .get(&r.action_id)
1456 .and_then(|&i| proposal.actions.get(i));
1457 match r.status {
1458 ActionStatus::Succeeded => {
1459 cost.actions_executed += 1;
1460 if let Some(a) = action {
1461 if a.action_type == ActionType::ToolCall {
1462 cost.tool_calls += 1;
1463 }
1464 }
1465 }
1466 ActionStatus::Failed | ActionStatus::Rejected => {
1467 cost.actions_executed += 1;
1468 }
1469 ActionStatus::Skipped => {
1470 cost.actions_skipped += 1;
1471 }
1472 _ => {}
1473 }
1474 if let Some(d) = r.duration_ms {
1475 cost.total_duration_ms += d;
1476 }
1477 }
1478
1479 cost.retries = total_retries;
1481
1482 {
1484 let span_status = if aborted {
1485 SpanStatus::Error
1486 } else {
1487 SpanStatus::Ok
1488 };
1489 let mut log = self.log.lock().await;
1490 log.end_span(&root_span_id, span_status);
1491 }
1492
1493 let proposal_result = ProposalResult {
1494 proposal_id: proposal.id.clone(),
1495 results,
1496 cost,
1497 };
1498
1499 if self.auto_distill {
1501 if let Some(ref memgine) = self.memgine {
1502 let trace_events: Vec<car_memgine::TraceEvent> = proposal_result
1504 .results
1505 .iter()
1506 .map(|r| {
1507 let kind = match r.status {
1508 ActionStatus::Succeeded => "action_succeeded",
1509 ActionStatus::Failed => "action_failed",
1510 ActionStatus::Rejected => "action_rejected",
1511 ActionStatus::Skipped => "action_skipped",
1512 _ => "unknown",
1513 };
1514 let tool = proposal
1516 .actions
1517 .iter()
1518 .find(|a| a.id == r.action_id)
1519 .and_then(|a| a.tool.clone());
1520 let mut data = serde_json::Map::new();
1521 if let Some(ref e) = r.error {
1522 data.insert("error".into(), Value::from(e.as_str()));
1523 }
1524 if let Some(ref o) = r.output {
1525 data.insert("output".into(), o.clone());
1526 }
1527 car_memgine::TraceEvent {
1528 kind: kind.to_string(),
1529 action_id: Some(r.action_id.clone()),
1530 tool,
1531 data: Value::Object(data),
1532 duration_ms: r.duration_ms,
1533 reward: match r.status {
1534 ActionStatus::Succeeded => Some(1.0),
1535 ActionStatus::Failed | ActionStatus::Rejected => Some(0.0),
1536 _ => None,
1537 },
1538 ..Default::default()
1539 }
1540 })
1541 .collect();
1542
1543 let mut engine = memgine.lock().await;
1544 let skills = engine.distill_skills(&trace_events).await;
1545 if !skills.is_empty() {
1546 let count = skills.len();
1547 let tenant = scope.and_then(|s| s.tenant_id.as_deref());
1553 engine.scoped(tenant).ingest_distilled_skills(&skills);
1554
1555 let mut log = self.log.lock().await;
1557 log.append(
1558 EventKind::SkillDistilled,
1559 None,
1560 Some(&proposal_result.proposal_id),
1561 [
1562 ("skills_count".to_string(), Value::from(count)),
1563 (
1564 "skill_names".to_string(),
1565 Value::from(
1566 skills.iter().map(|s| s.name.as_str()).collect::<Vec<_>>(),
1567 ),
1568 ),
1569 ]
1570 .into(),
1571 );
1572
1573 let threshold = engine.evolution_threshold();
1575 let domains = engine.domains_needing_evolution(threshold);
1576 for domain in &domains {
1577 let failed: Vec<car_memgine::TraceEvent> = trace_events
1579 .iter()
1580 .filter(|e| {
1581 matches!(e.kind.as_str(), "action_failed" | "action_rejected")
1582 })
1583 .cloned()
1584 .collect();
1585 if !failed.is_empty() {
1586 let evolved = engine.evolve_skills(&failed, domain).await;
1587 if !evolved.is_empty() {
1588 log.append(
1589 EventKind::EvolutionTriggered,
1590 None,
1591 Some(&proposal_result.proposal_id),
1592 [
1593 ("domain".to_string(), Value::from(domain.as_str())),
1594 ("new_skills".to_string(), Value::from(evolved.len())),
1595 ]
1596 .into(),
1597 );
1598 }
1599 }
1600 }
1601 }
1602 }
1603 }
1604
1605 (proposal_result, state_before_map)
1606 }
1607
1608 async fn process_action(
1611 &self,
1612 action: &Action,
1613 proposal_id: &str,
1614 trace_id: &str,
1615 parent_span_id: &str,
1616 session_id: Option<&str>,
1617 scope: Option<&crate::scope::RuntimeScope>,
1618 ) -> (ActionResult, u32) {
1619 let action_type_name = serde_json::to_string(&action.action_type)
1621 .unwrap_or_default()
1622 .trim_matches('"')
1623 .to_string();
1624 let span_name = format!("action.{}", action_type_name);
1625
1626 let action_span_id = {
1628 let mut attrs: HashMap<String, Value> = HashMap::new();
1629 attrs.insert("action_id".to_string(), Value::from(action.id.as_str()));
1630 if let Some(ref tool) = action.tool {
1631 attrs.insert("tool".to_string(), Value::from(tool.as_str()));
1632 }
1633 let mut log = self.log.lock().await;
1634 log.begin_span(&span_name, trace_id, Some(parent_span_id), attrs)
1635 };
1636
1637 let (result, retries) = self
1639 .process_action_inner(action, proposal_id, session_id, scope)
1640 .await;
1641
1642 let span_status = match result.status {
1644 ActionStatus::Succeeded => SpanStatus::Ok,
1645 ActionStatus::Failed | ActionStatus::Rejected => SpanStatus::Error,
1646 _ => SpanStatus::Unset,
1647 };
1648 {
1649 let mut log = self.log.lock().await;
1650 log.end_span(&action_span_id, span_status);
1651 }
1652
1653 (result, retries)
1654 }
1655
1656 #[instrument(
1659 name = "action.process",
1660 skip_all,
1661 fields(
1662 action_id = %action.id,
1663 action_type = ?action.action_type,
1664 tool = action.tool.as_deref().unwrap_or("none"),
1665 )
1666 )]
1667 async fn process_action_inner(
1668 &self,
1669 action: &Action,
1670 proposal_id: &str,
1671 session_id: Option<&str>,
1672 scope: Option<&crate::scope::RuntimeScope>,
1673 ) -> (ActionResult, u32) {
1674 if action.idempotent {
1676 let key = idempotency_key(action);
1677 let cache = self.idempotency_cache.lock().await;
1678 if let Some(cached) = cache.get(&key) {
1679 let mut log = self.log.lock().await;
1680 log.append(
1681 EventKind::ActionDeduplicated,
1682 Some(&action.id),
1683 Some(proposal_id),
1684 [(
1685 "cached_action_id".to_string(),
1686 Value::from(cached.action_id.as_str()),
1687 )]
1688 .into(),
1689 );
1690 return (
1691 ActionResult {
1692 action_id: action.id.clone(),
1693 status: cached.status.clone(),
1694 output: cached.output.clone(),
1695 error: cached.error.clone(),
1696 state_changes: cached.state_changes.clone(),
1697 duration_ms: Some(0.0),
1698 timestamp: chrono::Utc::now(),
1699 },
1700 0,
1701 );
1702 }
1703 }
1704
1705 {
1707 let caps = self.capabilities.read().await;
1708 if let Some(ref cap) = *caps {
1709 if action.action_type == ActionType::ToolCall {
1711 if let Some(ref tool_name) = action.tool {
1712 if !cap.tool_allowed(tool_name) {
1713 let mut log = self.log.lock().await;
1714 log.append(
1715 EventKind::ActionRejected,
1716 Some(&action.id),
1717 Some(proposal_id),
1718 HashMap::new(),
1719 );
1720 return (
1721 rejected_result(
1722 &action.id,
1723 format!("capability denied: tool '{}' not allowed", tool_name),
1724 ),
1725 0,
1726 );
1727 }
1728 }
1729 }
1730
1731 if action.action_type == ActionType::StateWrite
1733 || action.action_type == ActionType::StateRead
1734 {
1735 if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
1736 if !cap.state_key_allowed(key) {
1737 let mut log = self.log.lock().await;
1738 log.append(
1739 EventKind::ActionRejected,
1740 Some(&action.id),
1741 Some(proposal_id),
1742 HashMap::new(),
1743 );
1744 return (
1745 rejected_result(
1746 &action.id,
1747 format!("capability denied: state key '{}' not allowed", key),
1748 ),
1749 0,
1750 );
1751 }
1752 }
1753 }
1754 }
1755 }
1756
1757 let tools = self.tools.read().await;
1759 let validation = validate_action(action, &self.state, &tools);
1760 drop(tools);
1761
1762 if !validation.valid() {
1763 let error = validation
1764 .errors
1765 .iter()
1766 .map(|e| e.reason.as_str())
1767 .collect::<Vec<_>>()
1768 .join("; ");
1769 let mut log = self.log.lock().await;
1770 log.append(
1771 EventKind::ActionRejected,
1772 Some(&action.id),
1773 Some(proposal_id),
1774 HashMap::new(),
1775 );
1776 return (rejected_result(&action.id, error), 0);
1777 }
1778
1779 {
1785 let mut violations = {
1786 let policies = self.policies.read().await;
1787 policies.check(action, &self.state)
1788 };
1789 if let Some(sid) = session_id {
1790 let session_engine = {
1795 let sessions = self.session_policies.read().await;
1796 sessions.get(sid).cloned()
1797 };
1798 if let Some(engine) = session_engine {
1799 let engine = engine.read().await;
1800 violations.extend(engine.check(action, &self.state));
1801 } else {
1802 let mut log = self.log.lock().await;
1808 log.append(
1809 EventKind::PolicyViolation,
1810 Some(&action.id),
1811 Some(proposal_id),
1812 HashMap::new(),
1813 );
1814 return (
1815 rejected_result(
1816 &action.id,
1817 format!(
1818 "unknown session id '{sid}' — open one via Runtime::open_session before executing under a session"
1819 ),
1820 ),
1821 0,
1822 );
1823 }
1824 }
1825 if !violations.is_empty() {
1826 let error = violations
1827 .iter()
1828 .map(|v| format!("policy '{}': {}", v.policy_name, v.reason))
1829 .collect::<Vec<_>>()
1830 .join("; ");
1831 let mut log = self.log.lock().await;
1832 log.append(
1833 EventKind::PolicyViolation,
1834 Some(&action.id),
1835 Some(proposal_id),
1836 HashMap::new(),
1837 );
1838 return (rejected_result(&action.id, error), 0);
1839 }
1840 }
1841
1842 {
1844 let mut log = self.log.lock().await;
1845 log.append(
1846 EventKind::ActionValidated,
1847 Some(&action.id),
1848 Some(proposal_id),
1849 HashMap::new(),
1850 );
1851 }
1852
1853 let (result, retries) = self.execute_with_retry(action, proposal_id, scope).await;
1855
1856 if action.idempotent && result.status == ActionStatus::Succeeded {
1858 let mut cache = self.idempotency_cache.lock().await;
1859 cache.insert(idempotency_key(action), result.clone());
1860 }
1861
1862 tracing::info!(
1863 status = ?result.status,
1864 duration_ms = result.duration_ms,
1865 "action completed"
1866 );
1867
1868 (result, retries)
1869 }
1870
1871 async fn execute_with_retry(
1874 &self,
1875 action: &Action,
1876 proposal_id: &str,
1877 scope: Option<&crate::scope::RuntimeScope>,
1878 ) -> (ActionResult, u32) {
1879 let max_attempts = if action.failure_behavior == FailureBehavior::Retry {
1880 action.max_retries + 1
1881 } else {
1882 1
1883 };
1884
1885 let mut last_error: Option<String> = None;
1886 let mut retries: u32 = 0;
1887
1888 for attempt in 0..max_attempts {
1889 if attempt > 0 {
1890 retries += 1;
1891 let delay = RETRY_BASE_DELAY_MS * RETRY_BACKOFF_FACTOR.pow(attempt as u32 - 1);
1892 tokio::time::sleep(Duration::from_millis(delay)).await;
1893 let mut log = self.log.lock().await;
1894 log.append(
1895 EventKind::ActionRetrying,
1896 Some(&action.id),
1897 Some(proposal_id),
1898 [("attempt".to_string(), Value::from(attempt + 1))].into(),
1899 );
1900 }
1901
1902 {
1903 let mut log = self.log.lock().await;
1904 log.append(
1905 EventKind::ActionExecuting,
1906 Some(&action.id),
1907 Some(proposal_id),
1908 HashMap::new(),
1909 );
1910 }
1911
1912 let start = std::time::Instant::now();
1913 let transitions_before = self.state.transition_count();
1914
1915 let exec_result = if let Some(timeout_ms) = action.timeout_ms {
1917 match timeout(Duration::from_millis(timeout_ms), self.dispatch(action, scope)).await {
1918 Ok(r) => r,
1919 Err(_) => Err(format!("action timed out after {}ms", timeout_ms)),
1920 }
1921 } else {
1922 self.dispatch(action, scope).await
1923 };
1924
1925 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
1926
1927 match exec_result {
1928 Ok(output) => {
1929 let tenant_for_effects = scope.and_then(|s| s.tenant_id.as_deref());
1935 let scoped_state = self.state.scoped(tenant_for_effects);
1936 let mut state_changes: HashMap<String, Value> = HashMap::new();
1937 for (key, value) in &action.expected_effects {
1938 scoped_state.set(key, value.clone(), &action.id);
1939 state_changes.insert(key.clone(), value.clone());
1940 }
1941
1942 for t in self.state.transitions_since(transitions_before) {
1944 if !state_changes.contains_key(&t.key) {
1945 if let Some(v) = t.new_value {
1946 state_changes.insert(t.key.clone(), v);
1947 }
1948 }
1949 }
1950
1951 let mut log = self.log.lock().await;
1952 log.append(
1953 EventKind::ActionSucceeded,
1954 Some(&action.id),
1955 Some(proposal_id),
1956 [("duration_ms".to_string(), Value::from(duration_ms))].into(),
1957 );
1958
1959 if !state_changes.is_empty() {
1960 log.append(
1961 EventKind::StateChanged,
1962 Some(&action.id),
1963 Some(proposal_id),
1964 [(
1965 "changes".to_string(),
1966 serde_json::to_value(&state_changes).unwrap_or_default(),
1967 )]
1968 .into(),
1969 );
1970 }
1971
1972 return (
1973 ActionResult {
1974 action_id: action.id.clone(),
1975 status: ActionStatus::Succeeded,
1976 output: Some(output),
1977 error: None,
1978 state_changes,
1979 duration_ms: Some(duration_ms),
1980 timestamp: chrono::Utc::now(),
1981 },
1982 retries,
1983 );
1984 }
1985 Err(e) => {
1986 last_error = Some(e.clone());
1987 let mut log = self.log.lock().await;
1988 log.append(
1989 EventKind::ActionFailed,
1990 Some(&action.id),
1991 Some(proposal_id),
1992 [
1993 ("error".to_string(), Value::from(e.as_str())),
1994 ("attempt".to_string(), Value::from(attempt + 1)),
1995 ]
1996 .into(),
1997 );
1998 }
1999 }
2000 }
2001
2002 if action.failure_behavior == FailureBehavior::Skip {
2004 return (
2005 skipped_result(
2006 &action.id,
2007 last_error.as_deref().unwrap_or("all attempts exhausted"),
2008 ),
2009 retries,
2010 );
2011 }
2012
2013 (
2014 ActionResult {
2015 action_id: action.id.clone(),
2016 status: ActionStatus::Failed,
2017 output: None,
2018 error: last_error,
2019 state_changes: HashMap::new(),
2020 duration_ms: None,
2021 timestamp: chrono::Utc::now(),
2022 },
2023 retries,
2024 )
2025 }
2026
2027 async fn dispatch(
2037 &self,
2038 action: &Action,
2039 scope: Option<&crate::scope::RuntimeScope>,
2040 ) -> Result<Value, String> {
2041 match action.action_type {
2042 ActionType::ToolCall => {
2043 let tool_name = action.tool.as_deref().ok_or("tool_call has no tool")?;
2044 let params = Value::Object(
2045 action
2046 .parameters
2047 .iter()
2048 .map(|(k, v)| (k.clone(), v.clone()))
2049 .collect(),
2050 );
2051
2052 if let Some(cached) = self.result_cache.get(tool_name, ¶ms).await {
2054 return Ok(cached);
2055 }
2056
2057 self.rate_limiter.acquire(tool_name).await;
2059
2060 if matches!(
2062 tool_name,
2063 "infer" | "infer.grounded" | "embed" | "classify" | "transcribe" | "synthesize"
2064 ) {
2065 if let Some(ref engine) = self.inference_engine {
2066 let params = {
2069 let should_ground =
2070 tool_name == "infer.grounded" || tool_name == "infer";
2071 if should_ground {
2072 if let Some(ref memgine) = self.memgine {
2073 if let Some(prompt) =
2074 params.get("prompt").and_then(|v| v.as_str())
2075 {
2076 let ctx = {
2077 let mut m = memgine.lock().await;
2078 m.build_context(prompt)
2079 };
2080 if !ctx.is_empty() {
2081 let mut p = params.clone();
2082 if let Some(obj) = p.as_object_mut() {
2083 obj.insert("context".to_string(), Value::from(ctx));
2084 }
2085 p
2086 } else {
2087 params
2088 }
2089 } else {
2090 params
2091 }
2092 } else {
2093 params
2094 }
2095 } else {
2096 params
2097 }
2098 };
2099
2100 let effective_tool = if tool_name == "infer.grounded" {
2102 "infer"
2103 } else {
2104 tool_name
2105 };
2106 let result =
2107 car_inference::service::execute_tool(engine, effective_tool, ¶ms)
2108 .await
2109 .map_err(|e| e.to_string());
2110
2111 if let Ok(ref value) = result {
2112 self.result_cache
2113 .put(tool_name, ¶ms, value.clone())
2114 .await;
2115 }
2116
2117 return result;
2118 }
2119 }
2120
2121 if tool_name == "memory.consolidate" {
2123 if let Some(ref memgine) = self.memgine {
2124 let report = {
2125 let mut m = memgine.lock().await;
2126 m.consolidate().await
2127 };
2128 {
2130 let mut log = self.log.lock().await;
2131 log.append(
2132 EventKind::Consolidated,
2133 None,
2134 None,
2135 [
2136 (
2137 "expired_pruned".to_string(),
2138 Value::from(report.expired_pruned),
2139 ),
2140 (
2141 "superseded_gc".to_string(),
2142 Value::from(report.superseded_gc),
2143 ),
2144 (
2145 "stale_embeddings_removed".to_string(),
2146 Value::from(report.stale_embeddings_removed),
2147 ),
2148 (
2149 "nodes_embedded".to_string(),
2150 Value::from(report.nodes_embedded),
2151 ),
2152 (
2153 "domains_evolved".to_string(),
2154 Value::from(report.domains_evolved.clone()),
2155 ),
2156 ("total_nodes".to_string(), Value::from(report.total_nodes)),
2157 ("total_edges".to_string(), Value::from(report.total_edges)),
2158 ]
2159 .into(),
2160 );
2161 }
2162 return Ok(serde_json::to_value(&report).unwrap_or(Value::Null));
2163 } else {
2164 return Err(
2165 "memory.consolidate requires memgine (attach with with_learning)"
2166 .into(),
2167 );
2168 }
2169 }
2170
2171 let configured = {
2177 let guard = self.tool_executor.lock().await;
2178 guard.as_ref().cloned()
2179 };
2180
2181 if let Some(ref executor) = configured {
2182 let result = executor.execute(tool_name, ¶ms).await;
2183 let fall_through = matches!(&result, Err(e) if e.starts_with("unknown tool"));
2184 if !fall_through {
2185 if let Ok(ref value) = result {
2186 self.result_cache
2187 .put(tool_name, ¶ms, value.clone())
2188 .await;
2189 }
2190 return result;
2191 }
2192 }
2193
2194 if let Some(result) = crate::agent_basics::execute(tool_name, ¶ms).await {
2195 if let Ok(ref value) = result {
2196 self.result_cache
2197 .put(tool_name, ¶ms, value.clone())
2198 .await;
2199 }
2200 return result;
2201 }
2202
2203 Err(format!("no handler for tool '{}'", tool_name))
2204 }
2205 ActionType::StateWrite => {
2206 let key = action
2207 .parameters
2208 .get("key")
2209 .and_then(|v| v.as_str())
2210 .ok_or("state_write requires 'key' parameter")?;
2211 let value = action
2212 .parameters
2213 .get("value")
2214 .cloned()
2215 .unwrap_or(Value::Null);
2216 let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2217 self.state.scoped(tenant).set(key, value, &action.id);
2218 Ok(Value::from(format!("written: {}", key)))
2219 }
2220 ActionType::StateRead => {
2221 let key = action
2222 .parameters
2223 .get("key")
2224 .and_then(|v| v.as_str())
2225 .ok_or("state_read requires 'key' parameter")?;
2226 let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2227 Ok(self.state.scoped(tenant).get(key).unwrap_or(Value::Null))
2228 }
2229 ActionType::Assertion => {
2230 let key = action
2231 .parameters
2232 .get("key")
2233 .and_then(|v| v.as_str())
2234 .ok_or("assertion requires 'key' parameter")?;
2235 let expected = action
2236 .parameters
2237 .get("expected")
2238 .cloned()
2239 .unwrap_or(Value::Null);
2240 let tenant = scope.and_then(|s| s.tenant_id.as_deref());
2241 let actual = self.state.scoped(tenant).get(key).unwrap_or(Value::Null);
2242 if actual != expected {
2243 Err(format!(
2244 "assertion failed: state['{}'] = {:?}, expected {:?}",
2245 key, actual, expected
2246 ))
2247 } else {
2248 Ok(serde_json::json!({"asserted": key, "value": actual}))
2249 }
2250 }
2251 }
2252 }
2253
2254 pub async fn save_checkpoint(&self) -> Checkpoint {
2258 let state = self.state.snapshot();
2259 let tools: Vec<String> = self.tools.read().await.keys().cloned().collect();
2260 let log = self.log.lock().await;
2261 let events: Vec<Value> = log
2262 .events()
2263 .iter()
2264 .map(|e| serde_json::to_value(e).unwrap_or_default())
2265 .collect();
2266
2267 Checkpoint {
2268 checkpoint_id: Uuid::new_v4().to_string(),
2269 created_at: chrono::Utc::now(),
2270 state,
2271 events,
2272 tools,
2273 metadata: HashMap::new(),
2274 }
2275 }
2276
2277 pub async fn save_checkpoint_to_file(&self, path: &str) -> Result<(), String> {
2279 let checkpoint = self.save_checkpoint().await;
2280 let json = serde_json::to_string_pretty(&checkpoint)
2281 .map_err(|e| format!("serialize error: {}", e))?;
2282 tokio::fs::write(path, json)
2283 .await
2284 .map_err(|e| format!("write error: {}", e))?;
2285 Ok(())
2286 }
2287
2288 pub async fn load_checkpoint_from_file(&self, path: &str) -> Result<Checkpoint, String> {
2290 let json = tokio::fs::read_to_string(path)
2291 .await
2292 .map_err(|e| format!("read error: {}", e))?;
2293 let checkpoint: Checkpoint =
2294 serde_json::from_str(&json).map_err(|e| format!("deserialize error: {}", e))?;
2295 self.restore_checkpoint(&checkpoint).await;
2296 Ok(checkpoint)
2297 }
2298
2299 pub async fn restore_checkpoint(&self, checkpoint: &Checkpoint) {
2301 self.state.replace_all(checkpoint.state.clone());
2303 self.idempotency_cache.lock().await.clear();
2306 let mut tools = self.tools.write().await;
2308 tools.clear();
2309 for tool_name in &checkpoint.tools {
2310 let schema = ToolSchema {
2311 name: tool_name.clone(),
2312 description: String::new(),
2313 parameters: serde_json::Value::Object(Default::default()),
2314 returns: None,
2315 idempotent: false,
2316 cache_ttl_secs: None,
2317 rate_limit: None,
2318 };
2319 tools.insert(tool_name.clone(), schema);
2320 }
2321 }
2322
2323 pub async fn register_subprocess_tool(
2328 &self,
2329 name: &str,
2330 tool: crate::subprocess::SubprocessTool,
2331 ) {
2332 use crate::subprocess::SubprocessToolExecutor;
2333
2334 let schema = ToolSchema {
2335 name: name.to_string(),
2336 description: format!("Subprocess tool: {}", tool.command),
2337 parameters: serde_json::Value::Object(Default::default()),
2338 returns: None,
2339 idempotent: false,
2340 cache_ttl_secs: None,
2341 rate_limit: None,
2342 };
2343 self.register_tool_schema(schema).await;
2344
2345 let mut guard = self.tool_executor.lock().await;
2346 let mut executor = match guard.take() {
2347 Some(existing) => {
2348 let mut sub = SubprocessToolExecutor::new();
2349 sub = sub.with_fallback(existing);
2350 sub
2351 }
2352 None => SubprocessToolExecutor::new(),
2353 };
2354 executor.register(name, tool);
2355 *guard = Some(std::sync::Arc::new(executor));
2356 }
2357}
2358
2359impl Default for Runtime {
2360 fn default() -> Self {
2361 Self::new()
2362 }
2363}