1use crate::cache::ResultCache;
4use crate::capabilities::CapabilitySet;
5use crate::checkpoint::Checkpoint;
6use crate::rate_limit::{RateLimit, RateLimiter};
7use car_eventlog::{EventKind, EventLog, SpanStatus};
8use car_ir::{
9 build_dag, Action, ActionProposal, ActionResult, ActionStatus, ActionType, CostSummary,
10 FailureBehavior, ProposalResult, ToolSchema,
11};
12use car_policy::PolicyEngine;
13use car_state::StateStore;
14use car_validator::validate_action;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
20use tokio::time::timeout;
21use tracing::instrument;
22use uuid::Uuid;
23
24const RETRY_BASE_DELAY_MS: u64 = 100;
26const RETRY_BACKOFF_FACTOR: u64 = 2;
27
28#[async_trait::async_trait]
36pub trait ReplanCallback: Send + Sync {
37 async fn replan(&self, ctx: &ReplanContext) -> Result<ActionProposal, String>;
38}
39
40#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42pub struct ReplanContext {
43 pub proposal_id: String,
45 pub attempt: u32,
47 pub failed_actions: Vec<FailedActionSummary>,
49 pub completed_action_ids: Vec<String>,
51 pub state_snapshot: HashMap<String, Value>,
53 pub replans_remaining: u32,
55 pub original_source: String,
57 pub original_action_count: usize,
59 pub original_context: HashMap<String, Value>,
61}
62
63#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
65pub struct FailedActionSummary {
66 pub action_id: String,
67 pub tool: Option<String>,
68 pub error: String,
69 pub parameters: HashMap<String, Value>,
70}
71
72#[derive(Debug, Clone)]
74pub struct ReplanConfig {
75 pub max_replans: u32,
77 pub delay_ms: u64,
80 pub verify_before_execute: bool,
84}
85
86impl Default for ReplanConfig {
87 fn default() -> Self {
88 Self {
89 max_replans: 0,
90 delay_ms: 0,
91 verify_before_execute: true,
92 }
93 }
94}
95
96#[async_trait::async_trait]
101pub trait ToolExecutor: Send + Sync {
102 async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String>;
103}
104
105fn idempotency_key(action: &Action) -> String {
107 let sorted: std::collections::BTreeMap<_, _> = action.parameters.iter().collect();
108 let params = serde_json::to_string(&sorted).unwrap_or_default();
109 format!(
110 "{}:{}:{}",
111 serde_json::to_string(&action.action_type).unwrap_or_default(),
112 action.tool.as_deref().unwrap_or(""),
113 params
114 )
115}
116
117fn rejected_result(action_id: &str, error: String) -> ActionResult {
118 ActionResult {
119 action_id: action_id.to_string(),
120 status: ActionStatus::Rejected,
121 output: None,
122 error: Some(error),
123 state_changes: HashMap::new(),
124 duration_ms: None,
125 timestamp: chrono::Utc::now(),
126 }
127}
128
129fn snapshot_relevant_keys(
133 state: &car_state::StateStore,
134 action: &Action,
135) -> HashMap<String, Value> {
136 let mut keys: std::collections::HashSet<&str> = std::collections::HashSet::new();
137 for dep in &action.state_dependencies {
138 keys.insert(dep.as_str());
139 }
140 for key in action.expected_effects.keys() {
141 keys.insert(key.as_str());
142 }
143 if action.action_type == ActionType::StateWrite {
145 if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
146 keys.insert(key);
147 }
148 }
149
150 if keys.is_empty() {
151 return HashMap::new();
153 }
154
155 keys.iter()
156 .filter_map(|&k| state.get(k).map(|v| (k.to_string(), v)))
157 .collect()
158}
159
160fn skipped_result(action_id: &str, reason: &str) -> ActionResult {
161 ActionResult {
162 action_id: action_id.to_string(),
163 status: ActionStatus::Skipped,
164 output: None,
165 error: Some(reason.to_string()),
166 state_changes: HashMap::new(),
167 duration_ms: None,
168 timestamp: chrono::Utc::now(),
169 }
170}
171
172pub const CANCELED_PREFIX: &str = "canceled: ";
179
180fn canceled_result(action_id: &str, reason: &str) -> ActionResult {
185 ActionResult {
186 action_id: action_id.to_string(),
187 status: ActionStatus::Skipped,
188 output: None,
189 error: Some(format!("{}{}", CANCELED_PREFIX, reason)),
190 state_changes: HashMap::new(),
191 duration_ms: None,
192 timestamp: chrono::Utc::now(),
193 }
194}
195
196pub fn format_tool_result(result: &ActionResult) -> String {
198 match result.status {
199 ActionStatus::Succeeded => match &result.output {
200 Some(v) => serde_json::to_string(v).unwrap_or_else(|_| v.to_string()),
201 None => String::new(),
202 },
203 ActionStatus::Rejected => format!("[REJECTED] {}", result.error.as_deref().unwrap_or("")),
204 ActionStatus::Failed => format!("[FAILED] {}", result.error.as_deref().unwrap_or("")),
205 _ => format!(
206 "[{:?}] {}",
207 result.status,
208 result.error.as_deref().unwrap_or("")
209 ),
210 }
211}
212
213#[derive(Debug, Clone)]
215pub struct CostBudget {
216 pub max_tool_calls: Option<u32>,
217 pub max_duration_ms: Option<f64>,
218 pub max_actions: Option<u32>,
219}
220
221pub struct Runtime {
234 pub state: Arc<StateStore>,
235 pub tools: Arc<TokioRwLock<HashMap<String, ToolSchema>>>,
236 pub policies: Arc<TokioRwLock<PolicyEngine>>,
237 pub log: Arc<TokioMutex<EventLog>>,
238 pub rate_limiter: Arc<RateLimiter>,
239 pub result_cache: Arc<ResultCache>,
240 tool_executor: TokioMutex<Option<Arc<dyn ToolExecutor>>>,
241 idempotency_cache: TokioMutex<HashMap<String, ActionResult>>,
242 cost_budget: TokioRwLock<Option<CostBudget>>,
243 capabilities: TokioRwLock<Option<CapabilitySet>>,
244 inference_engine: Option<Arc<car_inference::InferenceEngine>>,
245 memgine: Option<Arc<TokioMutex<car_memgine::MemgineEngine>>>,
247 auto_distill: bool,
249 trajectory_store: Option<Arc<car_memgine::TrajectoryStore>>,
251 replan_callback: TokioMutex<Option<Arc<dyn ReplanCallback>>>,
253 replan_config: TokioRwLock<ReplanConfig>,
255 pub registry: Arc<crate::registry::ToolRegistry>,
257}
258
259impl Runtime {
260 pub fn new() -> Self {
261 Self {
262 state: Arc::new(StateStore::new()),
263 tools: Arc::new(TokioRwLock::new(HashMap::new())),
264 policies: Arc::new(TokioRwLock::new(PolicyEngine::new())),
265 log: Arc::new(TokioMutex::new(EventLog::new())),
266 rate_limiter: Arc::new(RateLimiter::new()),
267 result_cache: Arc::new(ResultCache::new()),
268 tool_executor: TokioMutex::new(None),
269 idempotency_cache: TokioMutex::new(HashMap::new()),
270 cost_budget: TokioRwLock::new(None),
271 capabilities: TokioRwLock::new(None),
272 inference_engine: None,
273 memgine: None,
274 auto_distill: false,
275 trajectory_store: None,
276 replan_callback: TokioMutex::new(None),
277 replan_config: TokioRwLock::new(ReplanConfig::default()),
278 registry: Arc::new(crate::registry::ToolRegistry::new()),
279 }
280 }
281
282 pub fn with_shared(
285 state: Arc<StateStore>,
286 log: Arc<TokioMutex<EventLog>>,
287 policies: Arc<TokioRwLock<PolicyEngine>>,
288 ) -> Self {
289 Self {
290 state,
291 tools: Arc::new(TokioRwLock::new(HashMap::new())),
292 policies,
293 log,
294 rate_limiter: Arc::new(RateLimiter::new()),
295 result_cache: Arc::new(ResultCache::new()),
296 tool_executor: TokioMutex::new(None),
297 idempotency_cache: TokioMutex::new(HashMap::new()),
298 cost_budget: TokioRwLock::new(None),
299 capabilities: TokioRwLock::new(None),
300 inference_engine: None,
301 memgine: None,
302 auto_distill: false,
303 trajectory_store: None,
304 replan_callback: TokioMutex::new(None),
305 replan_config: TokioRwLock::new(ReplanConfig::default()),
306 registry: Arc::new(crate::registry::ToolRegistry::new()),
307 }
308 }
309
310 pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
313 self.inference_engine = Some(engine);
314 if let Ok(mut tools) = self.tools.try_write() {
316 for schema in car_inference::service::all_schemas() {
317 tools.insert(schema.name.clone(), schema);
318 }
319 }
320 self
321 }
322
323 pub fn with_learning(
327 mut self,
328 memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>,
329 auto_distill: bool,
330 ) -> Self {
331 self.memgine = Some(memgine);
332 self.auto_distill = auto_distill;
333 self
334 }
335
336 pub fn with_memgine(self, memgine: Arc<TokioMutex<car_memgine::MemgineEngine>>) -> Self {
338 self.with_learning(memgine, true)
339 }
340
341 pub fn with_trajectory_store(mut self, store: Arc<car_memgine::TrajectoryStore>) -> Self {
343 self.trajectory_store = Some(store);
344 self
345 }
346
347 pub fn with_executor(self, executor: Arc<dyn ToolExecutor>) -> Self {
348 if let Ok(mut guard) = self.tool_executor.try_lock() {
350 *guard = Some(executor);
351 }
352 self
353 }
354
355 pub async fn set_executor(&self, executor: Arc<dyn ToolExecutor>) {
358 *self.tool_executor.lock().await = Some(executor);
359 }
360
361 pub fn with_event_log(mut self, log: EventLog) -> Self {
362 self.log = Arc::new(TokioMutex::new(log));
363 self
364 }
365
366 pub fn with_replan(self, callback: Arc<dyn ReplanCallback>, config: ReplanConfig) -> Self {
368 if let Ok(mut guard) = self.replan_callback.try_lock() {
369 *guard = Some(callback);
370 }
371 if let Ok(mut guard) = self.replan_config.try_write() {
372 *guard = config;
373 }
374 self
375 }
376
377 pub async fn set_replan_callback(&self, callback: Arc<dyn ReplanCallback>) {
379 *self.replan_callback.lock().await = Some(callback);
380 }
381
382 pub async fn set_replan_config(&self, config: ReplanConfig) {
384 *self.replan_config.write().await = config;
385 }
386
387 pub async fn register_tool(&self, name: &str) {
389 let schema = ToolSchema {
390 name: name.to_string(),
391 description: String::new(),
392 parameters: serde_json::Value::Object(Default::default()),
393 returns: None,
394 idempotent: false,
395 cache_ttl_secs: None,
396 rate_limit: None,
397 };
398 self.register_tool_schema(schema).await;
399 }
400
401 pub async fn register_tool_schema(&self, schema: ToolSchema) {
403 if let Some(ttl) = schema.cache_ttl_secs {
405 self.result_cache.enable_caching(&schema.name, ttl).await;
406 }
407 if let Some(ref rl) = schema.rate_limit {
409 self.rate_limiter
410 .set_limit(
411 &schema.name,
412 RateLimit {
413 max_calls: rl.max_calls,
414 interval_secs: rl.interval_secs,
415 },
416 )
417 .await;
418 }
419 self.tools.write().await.insert(schema.name.clone(), schema);
420 }
421
422 pub async fn register_tool_entry(&self, entry: crate::registry::ToolEntry) {
426 let schema = entry.schema.clone();
427 self.registry.register(entry).await;
428 self.register_tool_schema(schema).await;
429 }
430
431 pub async fn register_agent_basics(&self) {
436 for entry in crate::agent_basics::entries() {
437 self.register_tool_entry(entry).await;
438 }
439 }
440
441 pub async fn tool_schemas(&self) -> Vec<ToolSchema> {
443 self.tools.read().await.values().cloned().collect()
444 }
445
446 pub async fn set_cost_budget(&self, budget: CostBudget) {
448 *self.cost_budget.write().await = Some(budget);
449 }
450
451 pub async fn set_capabilities(&self, caps: CapabilitySet) {
453 *self.capabilities.write().await = Some(caps);
454 }
455
456 pub async fn set_rate_limit(&self, tool: &str, max_calls: u32, interval_secs: f64) {
462 self.rate_limiter
463 .set_limit(
464 tool,
465 RateLimit {
466 max_calls,
467 interval_secs,
468 },
469 )
470 .await;
471 }
472
473 pub async fn enable_tool_cache(&self, tool: &str, ttl_secs: u64) {
475 self.result_cache.enable_caching(tool, ttl_secs).await;
476 }
477
478 #[instrument(
488 name = "proposal.execute",
489 skip_all,
490 fields(
491 proposal_id = %proposal.id,
492 action_count = proposal.actions.len(),
493 )
494 )]
495 pub async fn execute(&self, proposal: &ActionProposal) -> ProposalResult {
496 let token = tokio_util::sync::CancellationToken::new();
499 self.execute_with_cancel(proposal, &token).await
500 }
501
502 pub async fn execute_with_cancel(
530 &self,
531 proposal: &ActionProposal,
532 cancel: &tokio_util::sync::CancellationToken,
533 ) -> ProposalResult {
534 let config = self.replan_config.read().await.clone();
535 let mut current_proposal = proposal.clone();
536 let mut attempt: u32 = 0;
537
538 loop {
539 let (result, state_before_map) =
540 self.execute_inner_with_cancel(¤t_proposal, Some(cancel)).await;
541
542 let aborted = result
544 .results
545 .iter()
546 .any(|r| r.status == ActionStatus::Failed);
547 if !aborted || attempt >= config.max_replans {
548 if aborted && attempt > 0 {
549 let mut log = self.log.lock().await;
551 log.append(
552 EventKind::ReplanExhausted,
553 None,
554 Some(&proposal.id),
555 [("attempts".to_string(), Value::from(attempt))].into(),
556 );
557 }
558
559 let outcome = if !aborted {
561 if attempt > 0 {
562 car_memgine::TrajectoryOutcome::ReplanSuccess
563 } else {
564 car_memgine::TrajectoryOutcome::Success
565 }
566 } else if attempt > 0 {
567 car_memgine::TrajectoryOutcome::ReplanExhausted
568 } else {
569 car_memgine::TrajectoryOutcome::Failed
570 };
571 if let Some(err) = self.persist_trajectory(
572 proposal,
573 ¤t_proposal,
574 &result,
575 outcome,
576 attempt,
577 &state_before_map,
578 ) {
579 let mut log = self.log.lock().await;
580 log.append(
581 EventKind::ActionFailed,
582 None,
583 Some(&proposal.id),
584 [(
585 "trajectory_persist_error".to_string(),
586 Value::from(err.as_str()),
587 )]
588 .into(),
589 );
590 }
591
592 return result;
593 }
594
595 let callback = {
597 let guard = self.replan_callback.lock().await;
598 guard.clone()
599 };
600 let Some(callback) = callback else {
601 if let Some(err) = self.persist_trajectory(
603 proposal,
604 ¤t_proposal,
605 &result,
606 car_memgine::TrajectoryOutcome::Failed,
607 attempt,
608 &state_before_map,
609 ) {
610 let mut log = self.log.lock().await;
611 log.append(
612 EventKind::ActionFailed,
613 None,
614 Some(&proposal.id),
615 [(
616 "trajectory_persist_error".to_string(),
617 Value::from(err.as_str()),
618 )]
619 .into(),
620 );
621 }
622 return result;
623 };
624
625 let failed_actions: Vec<FailedActionSummary> = result
627 .results
628 .iter()
629 .filter(|r| r.status == ActionStatus::Failed)
630 .map(|r| {
631 let action = current_proposal
632 .actions
633 .iter()
634 .find(|a| a.id == r.action_id);
635 FailedActionSummary {
636 action_id: r.action_id.clone(),
637 tool: action.and_then(|a| a.tool.clone()),
638 error: r.error.clone().unwrap_or_default(),
639 parameters: action.map(|a| a.parameters.clone()).unwrap_or_default(),
640 }
641 })
642 .collect();
643
644 let completed_action_ids: Vec<String> = result
645 .results
646 .iter()
647 .filter(|r| r.status == ActionStatus::Succeeded)
648 .map(|r| r.action_id.clone())
649 .collect();
650
651 let ctx = ReplanContext {
652 proposal_id: proposal.id.clone(),
653 attempt: attempt + 1,
654 failed_actions,
655 completed_action_ids,
656 state_snapshot: self.state.snapshot(),
657 replans_remaining: config.max_replans.saturating_sub(attempt + 1),
658 original_source: proposal.source.clone(),
659 original_action_count: proposal.actions.len(),
660 original_context: proposal.context.clone(),
661 };
662
663 if config.delay_ms > 0 {
665 tokio::time::sleep(Duration::from_millis(config.delay_ms)).await;
666 }
667
668 {
670 let mut log = self.log.lock().await;
671 log.append(
672 EventKind::ReplanAttempted,
673 None,
674 Some(&proposal.id),
675 [
676 ("attempt".to_string(), Value::from(attempt + 1)),
677 (
678 "failed_count".to_string(),
679 Value::from(ctx.failed_actions.len()),
680 ),
681 ]
682 .into(),
683 );
684 }
685
686 match callback.replan(&ctx).await {
688 Ok(new_proposal) => {
689 if config.verify_before_execute {
691 let tools_guard = self.tools.read().await;
692 let tool_names: std::collections::HashSet<String> =
693 tools_guard.keys().cloned().collect();
694 drop(tools_guard);
695
696 let current_state = self.state.snapshot();
697 let vr = car_verify::verify(
698 &new_proposal,
699 Some(¤t_state),
700 Some(&tool_names),
701 100,
702 );
703 if !vr.valid {
704 let error_msgs: Vec<String> = vr
705 .issues
706 .iter()
707 .filter(|i| i.severity == "error")
708 .map(|i| i.message.clone())
709 .collect();
710 let mut log = self.log.lock().await;
711 log.append(
712 EventKind::ReplanRejected,
713 None,
714 Some(&proposal.id),
715 [
716 ("errors".to_string(), Value::from(error_msgs.join("; "))),
717 ("attempt".to_string(), Value::from(attempt + 1)),
718 ]
719 .into(),
720 );
721 attempt += 1;
723 continue;
724 }
725 }
726
727 {
729 let mut log = self.log.lock().await;
730 log.append(
731 EventKind::ReplanProposalReceived,
732 None,
733 Some(&proposal.id),
734 [
735 ("attempt".to_string(), Value::from(attempt + 1)),
736 (
737 "new_action_count".to_string(),
738 Value::from(new_proposal.actions.len()),
739 ),
740 ]
741 .into(),
742 );
743 }
744 current_proposal = new_proposal;
745 attempt += 1;
746 }
747 Err(e) => {
748 let mut log = self.log.lock().await;
750 log.append(
751 EventKind::ReplanExhausted,
752 None,
753 Some(&proposal.id),
754 [
755 ("reason".to_string(), Value::from("callback_error")),
756 ("error".to_string(), Value::from(e.as_str())),
757 ("attempt".to_string(), Value::from(attempt + 1)),
758 ]
759 .into(),
760 );
761 if let Some(err) = self.persist_trajectory(
762 proposal,
763 ¤t_proposal,
764 &result,
765 car_memgine::TrajectoryOutcome::Failed,
766 attempt,
767 &state_before_map,
768 ) {
769 log.append(
770 EventKind::ActionFailed,
771 None,
772 Some(&proposal.id),
773 [(
774 "trajectory_persist_error".to_string(),
775 Value::from(err.as_str()),
776 )]
777 .into(),
778 );
779 }
780 return result;
781 }
782 }
783 }
784 }
785
786 fn persist_trajectory(
788 &self,
789 proposal: &ActionProposal,
790 current_proposal: &ActionProposal,
791 result: &ProposalResult,
792 outcome: car_memgine::TrajectoryOutcome,
793 attempt: u32,
794 state_before_map: &HashMap<String, HashMap<String, Value>>,
795 ) -> Option<String> {
796 let store = self.trajectory_store.as_ref()?;
797
798 let trace_events: Vec<car_memgine::TraceEvent> = result
799 .results
800 .iter()
801 .map(|r| {
802 let kind = match r.status {
803 ActionStatus::Succeeded => "action_succeeded",
804 ActionStatus::Failed => "action_failed",
805 ActionStatus::Rejected => "action_rejected",
806 ActionStatus::Skipped => "action_skipped",
807 _ => "unknown",
808 };
809 let tool = current_proposal
810 .actions
811 .iter()
812 .find(|a| a.id == r.action_id)
813 .and_then(|a| a.tool.clone());
814 let reward = match r.status {
815 ActionStatus::Succeeded => Some(1.0),
816 ActionStatus::Failed => Some(0.0),
817 ActionStatus::Rejected => Some(0.0),
818 ActionStatus::Skipped => None,
819 _ => None,
820 };
821 car_memgine::TraceEvent {
822 kind: kind.to_string(),
823 action_id: Some(r.action_id.clone()),
824 tool,
825 data: r
826 .error
827 .as_ref()
828 .map(|e| serde_json::json!({"error": e}))
829 .unwrap_or(serde_json::json!({})),
830 duration_ms: r.duration_ms,
831 state_before: state_before_map.get(&r.action_id).cloned(),
832 state_after: if !r.state_changes.is_empty() {
833 Some(r.state_changes.clone())
834 } else {
835 None
836 },
837 reward,
838 }
839 })
840 .collect();
841
842 let trajectory = car_memgine::Trajectory {
843 proposal_id: proposal.id.clone(),
844 source: proposal.source.clone(),
845 action_count: current_proposal.actions.len(),
846 events: trace_events,
847 outcome,
848 timestamp: chrono::Utc::now(),
849 duration_ms: result.cost.total_duration_ms,
850 replan_attempts: attempt,
851 };
852
853 match store.append(&trajectory) {
854 Ok(()) => None,
855 Err(e) => Some(e.to_string()),
856 }
857 }
858
859 pub async fn plan_and_execute(
865 &self,
866 candidates: &[ActionProposal],
867 planner_config: Option<car_planner::PlannerConfig>,
868 feedback: Option<&car_planner::ToolFeedback>,
869 ) -> ProposalResult {
870 if candidates.is_empty() {
871 return ProposalResult {
872 proposal_id: "empty".to_string(),
873 results: vec![],
874 cost: car_ir::CostSummary::default(),
875 };
876 }
877
878 let planner = car_planner::Planner::new(planner_config.unwrap_or_default());
880 let tools_guard = self.tools.read().await;
881 let tool_names: std::collections::HashSet<String> = tools_guard.keys().cloned().collect();
882 drop(tools_guard);
883
884 let pre_plan_snapshot = self.state.snapshot();
885 let pre_plan_transitions = self.state.transition_count();
886 let ranked = planner.rank_with_feedback(
887 candidates,
888 Some(&pre_plan_snapshot),
889 Some(&tool_names),
890 feedback,
891 );
892
893 let mut first_failure: Option<ProposalResult> = None;
895 for scored in &ranked {
896 if !scored.valid {
897 continue;
898 }
899
900 self.state
903 .restore(pre_plan_snapshot.clone(), pre_plan_transitions);
904
905 let proposal = &candidates[scored.index];
906 let result = self.execute(proposal).await;
907
908 if result.all_succeeded() {
909 return result;
910 }
911
912 tracing::info!(
913 proposal_id = %proposal.id,
914 score = scored.score,
915 "plan_and_execute: proposal failed, trying next candidate"
916 );
917
918 if first_failure.is_none() {
919 first_failure = Some(result);
920 }
921 }
922
923 first_failure.unwrap_or_else(|| ProposalResult {
925 proposal_id: candidates[0].id.clone(),
926 results: vec![],
927 cost: car_ir::CostSummary::default(),
928 })
929 }
930
931 async fn execute_inner_with_cancel(
934 &self,
935 proposal: &ActionProposal,
936 cancel: Option<&tokio_util::sync::CancellationToken>,
937 ) -> (ProposalResult, HashMap<String, HashMap<String, Value>>) {
938 let trace_id = Uuid::new_v4().to_string();
940
941 let root_span_id = {
943 let mut log = self.log.lock().await;
944 log.begin_span(
945 "proposal.execute",
946 &trace_id,
947 None,
948 [("proposal_id".to_string(), Value::from(proposal.id.as_str()))].into(),
949 )
950 };
951
952 {
954 let mut log = self.log.lock().await;
955 log.append(
956 EventKind::ProposalReceived,
957 None,
958 Some(&proposal.id),
959 [
960 ("source".to_string(), Value::from(proposal.source.as_str())),
961 (
962 "action_count".to_string(),
963 Value::from(proposal.actions.len()),
964 ),
965 ]
966 .into(),
967 );
968 }
969
970 {
972 let caps = self.capabilities.read().await;
973 if let Some(ref cap) = *caps {
974 if !cap.actions_within_budget(proposal.actions.len() as u32) {
975 let mut action_results = Vec::new();
976 for action in &proposal.actions {
977 action_results.push(rejected_result(
978 &action.id,
979 format!(
980 "capability denied: proposal has {} actions, max allowed is {:?}",
981 proposal.actions.len(),
982 cap.max_actions
983 ),
984 ));
985 }
986 return (
987 ProposalResult {
988 proposal_id: proposal.id.clone(),
989 results: action_results,
990 cost: CostSummary::default(),
991 },
992 HashMap::new(),
993 );
994 }
995 }
996 }
997
998 let snapshot = self.state.snapshot();
1000 let transition_count = self.state.transition_count();
1001
1002 let mut results: Vec<ActionResult> = Vec::new();
1003 let mut state_before_map: HashMap<String, HashMap<String, Value>> = HashMap::new();
1005 let mut aborted = false;
1006 let mut budget_exceeded = false;
1007 let mut total_retries: u32 = 0;
1008
1009 let mut running_tool_calls: u32 = 0;
1011 let mut running_actions: u32 = 0;
1012 let mut running_duration_ms: f64 = 0.0;
1013
1014 let budget = self.cost_budget.read().await.clone();
1016
1017 let levels = build_dag(&proposal.actions);
1019
1020 let mut canceled = false;
1021 for level in &levels {
1022 if !canceled {
1028 if let Some(token) = cancel {
1029 if token.is_cancelled() {
1030 canceled = true;
1031 }
1032 }
1033 }
1034 if canceled {
1035 for &idx in level {
1036 results.push(canceled_result(
1037 &proposal.actions[idx].id,
1038 "cancellation requested by caller",
1039 ));
1040 }
1041 continue;
1042 }
1043 if aborted || budget_exceeded {
1044 let skip_reason = if budget_exceeded {
1045 "cost budget exceeded"
1046 } else {
1047 "skipped due to earlier abort"
1048 };
1049 for &idx in level {
1050 results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1051 }
1052 continue;
1053 }
1054
1055 let has_abort = level
1057 .iter()
1058 .any(|&i| proposal.actions[i].failure_behavior == FailureBehavior::Abort);
1059
1060 if level.len() == 1 || has_abort {
1061 for &idx in level {
1063 if aborted || budget_exceeded {
1064 let skip_reason = if budget_exceeded {
1065 "cost budget exceeded"
1066 } else {
1067 "skipped due to abort"
1068 };
1069 results.push(skipped_result(&proposal.actions[idx].id, skip_reason));
1070 continue;
1071 }
1072
1073 if let Some(ref b) = budget {
1075 if let Some(max) = b.max_actions {
1076 if running_actions >= max {
1077 budget_exceeded = true;
1078 results.push(skipped_result(
1079 &proposal.actions[idx].id,
1080 "cost budget exceeded",
1081 ));
1082 continue;
1083 }
1084 }
1085 if let Some(max) = b.max_tool_calls {
1086 if proposal.actions[idx].action_type == ActionType::ToolCall
1087 && running_tool_calls >= max
1088 {
1089 budget_exceeded = true;
1090 results.push(skipped_result(
1091 &proposal.actions[idx].id,
1092 "cost budget exceeded",
1093 ));
1094 continue;
1095 }
1096 }
1097 if let Some(max) = b.max_duration_ms {
1098 if running_duration_ms >= max {
1099 budget_exceeded = true;
1100 results.push(skipped_result(
1101 &proposal.actions[idx].id,
1102 "cost budget exceeded",
1103 ));
1104 continue;
1105 }
1106 }
1107 }
1108
1109 state_before_map.insert(
1110 proposal.actions[idx].id.clone(),
1111 snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1112 );
1113 let (ar, action_retries) = self
1114 .process_action(
1115 &proposal.actions[idx],
1116 &proposal.id,
1117 &trace_id,
1118 &root_span_id,
1119 )
1120 .await;
1121 total_retries += action_retries;
1122
1123 if ar.status == ActionStatus::Succeeded
1125 && proposal.actions[idx].action_type == ActionType::ToolCall
1126 {
1127 running_tool_calls += 1;
1128 }
1129 if ar.status != ActionStatus::Skipped {
1130 running_actions += 1;
1131 }
1132 if let Some(d) = ar.duration_ms {
1133 running_duration_ms += d;
1134 }
1135
1136 if ar.status == ActionStatus::Failed
1137 && proposal.actions[idx].failure_behavior == FailureBehavior::Abort
1138 {
1139 aborted = true;
1140 }
1141 results.push(ar);
1142 }
1143 } else {
1144 for &idx in level {
1147 state_before_map.insert(
1148 proposal.actions[idx].id.clone(),
1149 snapshot_relevant_keys(&self.state, &proposal.actions[idx]),
1150 );
1151 }
1152 let futs: Vec<_> = level
1153 .iter()
1154 .map(|&idx| {
1155 self.process_action(
1156 &proposal.actions[idx],
1157 &proposal.id,
1158 &trace_id,
1159 &root_span_id,
1160 )
1161 })
1162 .collect();
1163 let level_results = futures::future::join_all(futs).await;
1164
1165 for (i, (ar, action_retries)) in level_results.into_iter().enumerate() {
1166 let idx = level[i];
1167 total_retries += action_retries;
1168 if ar.status == ActionStatus::Succeeded
1169 && proposal.actions[idx].action_type == ActionType::ToolCall
1170 {
1171 running_tool_calls += 1;
1172 }
1173 if ar.status != ActionStatus::Skipped {
1174 running_actions += 1;
1175 }
1176 if let Some(d) = ar.duration_ms {
1177 running_duration_ms += d;
1178 }
1179 results.push(ar);
1180 }
1181 }
1182 }
1183
1184 if aborted {
1186 self.state.restore(snapshot.clone(), transition_count);
1187
1188 let mut log = self.log.lock().await;
1189 log.append(
1190 EventKind::StateSnapshot,
1191 None,
1192 Some(&proposal.id),
1193 [(
1194 "state".to_string(),
1195 serde_json::to_value(&snapshot).unwrap_or_default(),
1196 )]
1197 .into(),
1198 );
1199 log.append(
1200 EventKind::StateRollback,
1201 None,
1202 Some(&proposal.id),
1203 [(
1204 "rolled_back_to".to_string(),
1205 Value::from("pre-proposal snapshot"),
1206 )]
1207 .into(),
1208 );
1209
1210 let mut cache = self.idempotency_cache.lock().await;
1212 for r in &results {
1213 if r.status == ActionStatus::Succeeded {
1214 for action in &proposal.actions {
1215 if action.id == r.action_id && action.idempotent {
1216 cache.remove(&idempotency_key(action));
1217 }
1218 }
1219 }
1220 }
1221 }
1222
1223 let action_order: HashMap<String, usize> = proposal
1225 .actions
1226 .iter()
1227 .enumerate()
1228 .map(|(i, a)| (a.id.clone(), i))
1229 .collect();
1230 results.sort_by_key(|r| {
1231 action_order
1232 .get(&r.action_id)
1233 .copied()
1234 .unwrap_or(usize::MAX)
1235 });
1236
1237 let mut cost = CostSummary::default();
1239 for r in &results {
1240 let action = action_order
1241 .get(&r.action_id)
1242 .and_then(|&i| proposal.actions.get(i));
1243 match r.status {
1244 ActionStatus::Succeeded => {
1245 cost.actions_executed += 1;
1246 if let Some(a) = action {
1247 if a.action_type == ActionType::ToolCall {
1248 cost.tool_calls += 1;
1249 }
1250 }
1251 }
1252 ActionStatus::Failed | ActionStatus::Rejected => {
1253 cost.actions_executed += 1;
1254 }
1255 ActionStatus::Skipped => {
1256 cost.actions_skipped += 1;
1257 }
1258 _ => {}
1259 }
1260 if let Some(d) = r.duration_ms {
1261 cost.total_duration_ms += d;
1262 }
1263 }
1264
1265 cost.retries = total_retries;
1267
1268 {
1270 let span_status = if aborted {
1271 SpanStatus::Error
1272 } else {
1273 SpanStatus::Ok
1274 };
1275 let mut log = self.log.lock().await;
1276 log.end_span(&root_span_id, span_status);
1277 }
1278
1279 let proposal_result = ProposalResult {
1280 proposal_id: proposal.id.clone(),
1281 results,
1282 cost,
1283 };
1284
1285 if self.auto_distill {
1287 if let Some(ref memgine) = self.memgine {
1288 let trace_events: Vec<car_memgine::TraceEvent> = proposal_result
1290 .results
1291 .iter()
1292 .map(|r| {
1293 let kind = match r.status {
1294 ActionStatus::Succeeded => "action_succeeded",
1295 ActionStatus::Failed => "action_failed",
1296 ActionStatus::Rejected => "action_rejected",
1297 ActionStatus::Skipped => "action_skipped",
1298 _ => "unknown",
1299 };
1300 let tool = proposal
1302 .actions
1303 .iter()
1304 .find(|a| a.id == r.action_id)
1305 .and_then(|a| a.tool.clone());
1306 let mut data = serde_json::Map::new();
1307 if let Some(ref e) = r.error {
1308 data.insert("error".into(), Value::from(e.as_str()));
1309 }
1310 if let Some(ref o) = r.output {
1311 data.insert("output".into(), o.clone());
1312 }
1313 car_memgine::TraceEvent {
1314 kind: kind.to_string(),
1315 action_id: Some(r.action_id.clone()),
1316 tool,
1317 data: Value::Object(data),
1318 duration_ms: r.duration_ms,
1319 reward: match r.status {
1320 ActionStatus::Succeeded => Some(1.0),
1321 ActionStatus::Failed | ActionStatus::Rejected => Some(0.0),
1322 _ => None,
1323 },
1324 ..Default::default()
1325 }
1326 })
1327 .collect();
1328
1329 let mut engine = memgine.lock().await;
1330 let skills = engine.distill_skills(&trace_events).await;
1331 if !skills.is_empty() {
1332 let count = skills.len();
1333 engine.ingest_distilled_skills(&skills);
1334
1335 let mut log = self.log.lock().await;
1337 log.append(
1338 EventKind::SkillDistilled,
1339 None,
1340 Some(&proposal_result.proposal_id),
1341 [
1342 ("skills_count".to_string(), Value::from(count)),
1343 (
1344 "skill_names".to_string(),
1345 Value::from(
1346 skills.iter().map(|s| s.name.as_str()).collect::<Vec<_>>(),
1347 ),
1348 ),
1349 ]
1350 .into(),
1351 );
1352
1353 let threshold = engine.evolution_threshold();
1355 let domains = engine.domains_needing_evolution(threshold);
1356 for domain in &domains {
1357 let failed: Vec<car_memgine::TraceEvent> = trace_events
1359 .iter()
1360 .filter(|e| {
1361 matches!(e.kind.as_str(), "action_failed" | "action_rejected")
1362 })
1363 .cloned()
1364 .collect();
1365 if !failed.is_empty() {
1366 let evolved = engine.evolve_skills(&failed, domain).await;
1367 if !evolved.is_empty() {
1368 log.append(
1369 EventKind::EvolutionTriggered,
1370 None,
1371 Some(&proposal_result.proposal_id),
1372 [
1373 ("domain".to_string(), Value::from(domain.as_str())),
1374 ("new_skills".to_string(), Value::from(evolved.len())),
1375 ]
1376 .into(),
1377 );
1378 }
1379 }
1380 }
1381 }
1382 }
1383 }
1384
1385 (proposal_result, state_before_map)
1386 }
1387
1388 async fn process_action(
1391 &self,
1392 action: &Action,
1393 proposal_id: &str,
1394 trace_id: &str,
1395 parent_span_id: &str,
1396 ) -> (ActionResult, u32) {
1397 let action_type_name = serde_json::to_string(&action.action_type)
1399 .unwrap_or_default()
1400 .trim_matches('"')
1401 .to_string();
1402 let span_name = format!("action.{}", action_type_name);
1403
1404 let action_span_id = {
1406 let mut attrs: HashMap<String, Value> = HashMap::new();
1407 attrs.insert("action_id".to_string(), Value::from(action.id.as_str()));
1408 if let Some(ref tool) = action.tool {
1409 attrs.insert("tool".to_string(), Value::from(tool.as_str()));
1410 }
1411 let mut log = self.log.lock().await;
1412 log.begin_span(&span_name, trace_id, Some(parent_span_id), attrs)
1413 };
1414
1415 let (result, retries) = self.process_action_inner(action, proposal_id).await;
1417
1418 let span_status = match result.status {
1420 ActionStatus::Succeeded => SpanStatus::Ok,
1421 ActionStatus::Failed | ActionStatus::Rejected => SpanStatus::Error,
1422 _ => SpanStatus::Unset,
1423 };
1424 {
1425 let mut log = self.log.lock().await;
1426 log.end_span(&action_span_id, span_status);
1427 }
1428
1429 (result, retries)
1430 }
1431
1432 #[instrument(
1435 name = "action.process",
1436 skip_all,
1437 fields(
1438 action_id = %action.id,
1439 action_type = ?action.action_type,
1440 tool = action.tool.as_deref().unwrap_or("none"),
1441 )
1442 )]
1443 async fn process_action_inner(
1444 &self,
1445 action: &Action,
1446 proposal_id: &str,
1447 ) -> (ActionResult, u32) {
1448 if action.idempotent {
1450 let key = idempotency_key(action);
1451 let cache = self.idempotency_cache.lock().await;
1452 if let Some(cached) = cache.get(&key) {
1453 let mut log = self.log.lock().await;
1454 log.append(
1455 EventKind::ActionDeduplicated,
1456 Some(&action.id),
1457 Some(proposal_id),
1458 [(
1459 "cached_action_id".to_string(),
1460 Value::from(cached.action_id.as_str()),
1461 )]
1462 .into(),
1463 );
1464 return (
1465 ActionResult {
1466 action_id: action.id.clone(),
1467 status: cached.status.clone(),
1468 output: cached.output.clone(),
1469 error: cached.error.clone(),
1470 state_changes: cached.state_changes.clone(),
1471 duration_ms: Some(0.0),
1472 timestamp: chrono::Utc::now(),
1473 },
1474 0,
1475 );
1476 }
1477 }
1478
1479 {
1481 let caps = self.capabilities.read().await;
1482 if let Some(ref cap) = *caps {
1483 if action.action_type == ActionType::ToolCall {
1485 if let Some(ref tool_name) = action.tool {
1486 if !cap.tool_allowed(tool_name) {
1487 let mut log = self.log.lock().await;
1488 log.append(
1489 EventKind::ActionRejected,
1490 Some(&action.id),
1491 Some(proposal_id),
1492 HashMap::new(),
1493 );
1494 return (
1495 rejected_result(
1496 &action.id,
1497 format!("capability denied: tool '{}' not allowed", tool_name),
1498 ),
1499 0,
1500 );
1501 }
1502 }
1503 }
1504
1505 if action.action_type == ActionType::StateWrite
1507 || action.action_type == ActionType::StateRead
1508 {
1509 if let Some(key) = action.parameters.get("key").and_then(|v| v.as_str()) {
1510 if !cap.state_key_allowed(key) {
1511 let mut log = self.log.lock().await;
1512 log.append(
1513 EventKind::ActionRejected,
1514 Some(&action.id),
1515 Some(proposal_id),
1516 HashMap::new(),
1517 );
1518 return (
1519 rejected_result(
1520 &action.id,
1521 format!("capability denied: state key '{}' not allowed", key),
1522 ),
1523 0,
1524 );
1525 }
1526 }
1527 }
1528 }
1529 }
1530
1531 let tools = self.tools.read().await;
1533 let validation = validate_action(action, &self.state, &tools);
1534 drop(tools);
1535
1536 if !validation.valid() {
1537 let error = validation
1538 .errors
1539 .iter()
1540 .map(|e| e.reason.as_str())
1541 .collect::<Vec<_>>()
1542 .join("; ");
1543 let mut log = self.log.lock().await;
1544 log.append(
1545 EventKind::ActionRejected,
1546 Some(&action.id),
1547 Some(proposal_id),
1548 HashMap::new(),
1549 );
1550 return (rejected_result(&action.id, error), 0);
1551 }
1552
1553 {
1555 let policies = self.policies.read().await;
1556 let violations = policies.check(action, &self.state);
1557 if !violations.is_empty() {
1558 let error = violations
1559 .iter()
1560 .map(|v| format!("policy '{}': {}", v.policy_name, v.reason))
1561 .collect::<Vec<_>>()
1562 .join("; ");
1563 let mut log = self.log.lock().await;
1564 log.append(
1565 EventKind::PolicyViolation,
1566 Some(&action.id),
1567 Some(proposal_id),
1568 HashMap::new(),
1569 );
1570 return (rejected_result(&action.id, error), 0);
1571 }
1572 }
1573
1574 {
1576 let mut log = self.log.lock().await;
1577 log.append(
1578 EventKind::ActionValidated,
1579 Some(&action.id),
1580 Some(proposal_id),
1581 HashMap::new(),
1582 );
1583 }
1584
1585 let (result, retries) = self.execute_with_retry(action, proposal_id).await;
1587
1588 if action.idempotent && result.status == ActionStatus::Succeeded {
1590 let mut cache = self.idempotency_cache.lock().await;
1591 cache.insert(idempotency_key(action), result.clone());
1592 }
1593
1594 tracing::info!(
1595 status = ?result.status,
1596 duration_ms = result.duration_ms,
1597 "action completed"
1598 );
1599
1600 (result, retries)
1601 }
1602
1603 async fn execute_with_retry(&self, action: &Action, proposal_id: &str) -> (ActionResult, u32) {
1606 let max_attempts = if action.failure_behavior == FailureBehavior::Retry {
1607 action.max_retries + 1
1608 } else {
1609 1
1610 };
1611
1612 let mut last_error: Option<String> = None;
1613 let mut retries: u32 = 0;
1614
1615 for attempt in 0..max_attempts {
1616 if attempt > 0 {
1617 retries += 1;
1618 let delay = RETRY_BASE_DELAY_MS * RETRY_BACKOFF_FACTOR.pow(attempt as u32 - 1);
1619 tokio::time::sleep(Duration::from_millis(delay)).await;
1620 let mut log = self.log.lock().await;
1621 log.append(
1622 EventKind::ActionRetrying,
1623 Some(&action.id),
1624 Some(proposal_id),
1625 [("attempt".to_string(), Value::from(attempt + 1))].into(),
1626 );
1627 }
1628
1629 {
1630 let mut log = self.log.lock().await;
1631 log.append(
1632 EventKind::ActionExecuting,
1633 Some(&action.id),
1634 Some(proposal_id),
1635 HashMap::new(),
1636 );
1637 }
1638
1639 let start = std::time::Instant::now();
1640 let transitions_before = self.state.transition_count();
1641
1642 let exec_result = if let Some(timeout_ms) = action.timeout_ms {
1644 match timeout(Duration::from_millis(timeout_ms), self.dispatch(action)).await {
1645 Ok(r) => r,
1646 Err(_) => Err(format!("action timed out after {}ms", timeout_ms)),
1647 }
1648 } else {
1649 self.dispatch(action).await
1650 };
1651
1652 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
1653
1654 match exec_result {
1655 Ok(output) => {
1656 let mut state_changes: HashMap<String, Value> = HashMap::new();
1658 for (key, value) in &action.expected_effects {
1659 self.state.set(key, value.clone(), &action.id);
1660 state_changes.insert(key.clone(), value.clone());
1661 }
1662
1663 for t in self.state.transitions_since(transitions_before) {
1665 if !state_changes.contains_key(&t.key) {
1666 if let Some(v) = t.new_value {
1667 state_changes.insert(t.key.clone(), v);
1668 }
1669 }
1670 }
1671
1672 let mut log = self.log.lock().await;
1673 log.append(
1674 EventKind::ActionSucceeded,
1675 Some(&action.id),
1676 Some(proposal_id),
1677 [("duration_ms".to_string(), Value::from(duration_ms))].into(),
1678 );
1679
1680 if !state_changes.is_empty() {
1681 log.append(
1682 EventKind::StateChanged,
1683 Some(&action.id),
1684 Some(proposal_id),
1685 [(
1686 "changes".to_string(),
1687 serde_json::to_value(&state_changes).unwrap_or_default(),
1688 )]
1689 .into(),
1690 );
1691 }
1692
1693 return (
1694 ActionResult {
1695 action_id: action.id.clone(),
1696 status: ActionStatus::Succeeded,
1697 output: Some(output),
1698 error: None,
1699 state_changes,
1700 duration_ms: Some(duration_ms),
1701 timestamp: chrono::Utc::now(),
1702 },
1703 retries,
1704 );
1705 }
1706 Err(e) => {
1707 last_error = Some(e.clone());
1708 let mut log = self.log.lock().await;
1709 log.append(
1710 EventKind::ActionFailed,
1711 Some(&action.id),
1712 Some(proposal_id),
1713 [
1714 ("error".to_string(), Value::from(e.as_str())),
1715 ("attempt".to_string(), Value::from(attempt + 1)),
1716 ]
1717 .into(),
1718 );
1719 }
1720 }
1721 }
1722
1723 if action.failure_behavior == FailureBehavior::Skip {
1725 return (
1726 skipped_result(
1727 &action.id,
1728 last_error.as_deref().unwrap_or("all attempts exhausted"),
1729 ),
1730 retries,
1731 );
1732 }
1733
1734 (
1735 ActionResult {
1736 action_id: action.id.clone(),
1737 status: ActionStatus::Failed,
1738 output: None,
1739 error: last_error,
1740 state_changes: HashMap::new(),
1741 duration_ms: None,
1742 timestamp: chrono::Utc::now(),
1743 },
1744 retries,
1745 )
1746 }
1747
1748 async fn dispatch(&self, action: &Action) -> Result<Value, String> {
1750 match action.action_type {
1751 ActionType::ToolCall => {
1752 let tool_name = action.tool.as_deref().ok_or("tool_call has no tool")?;
1753 let params = Value::Object(
1754 action
1755 .parameters
1756 .iter()
1757 .map(|(k, v)| (k.clone(), v.clone()))
1758 .collect(),
1759 );
1760
1761 if let Some(cached) = self.result_cache.get(tool_name, ¶ms).await {
1763 return Ok(cached);
1764 }
1765
1766 self.rate_limiter.acquire(tool_name).await;
1768
1769 if matches!(
1771 tool_name,
1772 "infer" | "infer.grounded" | "embed" | "classify" | "transcribe" | "synthesize"
1773 ) {
1774 if let Some(ref engine) = self.inference_engine {
1775 let params = {
1778 let should_ground =
1779 tool_name == "infer.grounded" || tool_name == "infer";
1780 if should_ground {
1781 if let Some(ref memgine) = self.memgine {
1782 if let Some(prompt) =
1783 params.get("prompt").and_then(|v| v.as_str())
1784 {
1785 let ctx = {
1786 let mut m = memgine.lock().await;
1787 m.build_context(prompt)
1788 };
1789 if !ctx.is_empty() {
1790 let mut p = params.clone();
1791 if let Some(obj) = p.as_object_mut() {
1792 obj.insert("context".to_string(), Value::from(ctx));
1793 }
1794 p
1795 } else {
1796 params
1797 }
1798 } else {
1799 params
1800 }
1801 } else {
1802 params
1803 }
1804 } else {
1805 params
1806 }
1807 };
1808
1809 let effective_tool = if tool_name == "infer.grounded" {
1811 "infer"
1812 } else {
1813 tool_name
1814 };
1815 let result =
1816 car_inference::service::execute_tool(engine, effective_tool, ¶ms)
1817 .await
1818 .map_err(|e| e.to_string());
1819
1820 if let Ok(ref value) = result {
1821 self.result_cache
1822 .put(tool_name, ¶ms, value.clone())
1823 .await;
1824 }
1825
1826 return result;
1827 }
1828 }
1829
1830 if tool_name == "memory.consolidate" {
1832 if let Some(ref memgine) = self.memgine {
1833 let report = {
1834 let mut m = memgine.lock().await;
1835 m.consolidate().await
1836 };
1837 {
1839 let mut log = self.log.lock().await;
1840 log.append(
1841 EventKind::Consolidated,
1842 None,
1843 None,
1844 [
1845 (
1846 "expired_pruned".to_string(),
1847 Value::from(report.expired_pruned),
1848 ),
1849 (
1850 "superseded_gc".to_string(),
1851 Value::from(report.superseded_gc),
1852 ),
1853 (
1854 "stale_embeddings_removed".to_string(),
1855 Value::from(report.stale_embeddings_removed),
1856 ),
1857 (
1858 "nodes_embedded".to_string(),
1859 Value::from(report.nodes_embedded),
1860 ),
1861 (
1862 "domains_evolved".to_string(),
1863 Value::from(report.domains_evolved.clone()),
1864 ),
1865 ("total_nodes".to_string(), Value::from(report.total_nodes)),
1866 ("total_edges".to_string(), Value::from(report.total_edges)),
1867 ]
1868 .into(),
1869 );
1870 }
1871 return Ok(serde_json::to_value(&report).unwrap_or(Value::Null));
1872 } else {
1873 return Err(
1874 "memory.consolidate requires memgine (attach with with_learning)"
1875 .into(),
1876 );
1877 }
1878 }
1879
1880 let configured = {
1886 let guard = self.tool_executor.lock().await;
1887 guard.as_ref().cloned()
1888 };
1889
1890 if let Some(ref executor) = configured {
1891 let result = executor.execute(tool_name, ¶ms).await;
1892 let fall_through = matches!(&result, Err(e) if e.starts_with("unknown tool"));
1893 if !fall_through {
1894 if let Ok(ref value) = result {
1895 self.result_cache
1896 .put(tool_name, ¶ms, value.clone())
1897 .await;
1898 }
1899 return result;
1900 }
1901 }
1902
1903 if let Some(result) = crate::agent_basics::execute(tool_name, ¶ms).await {
1904 if let Ok(ref value) = result {
1905 self.result_cache
1906 .put(tool_name, ¶ms, value.clone())
1907 .await;
1908 }
1909 return result;
1910 }
1911
1912 Err(format!("no handler for tool '{}'", tool_name))
1913 }
1914 ActionType::StateWrite => {
1915 let key = action
1916 .parameters
1917 .get("key")
1918 .and_then(|v| v.as_str())
1919 .ok_or("state_write requires 'key' parameter")?;
1920 let value = action
1921 .parameters
1922 .get("value")
1923 .cloned()
1924 .unwrap_or(Value::Null);
1925 self.state.set(key, value, &action.id);
1926 Ok(Value::from(format!("written: {}", key)))
1927 }
1928 ActionType::StateRead => {
1929 let key = action
1930 .parameters
1931 .get("key")
1932 .and_then(|v| v.as_str())
1933 .ok_or("state_read requires 'key' parameter")?;
1934 Ok(self.state.get(key).unwrap_or(Value::Null))
1935 }
1936 ActionType::Assertion => {
1937 let key = action
1938 .parameters
1939 .get("key")
1940 .and_then(|v| v.as_str())
1941 .ok_or("assertion requires 'key' parameter")?;
1942 let expected = action
1943 .parameters
1944 .get("expected")
1945 .cloned()
1946 .unwrap_or(Value::Null);
1947 let actual = self.state.get(key).unwrap_or(Value::Null);
1948 if actual != expected {
1949 Err(format!(
1950 "assertion failed: state['{}'] = {:?}, expected {:?}",
1951 key, actual, expected
1952 ))
1953 } else {
1954 Ok(serde_json::json!({"asserted": key, "value": actual}))
1955 }
1956 }
1957 }
1958 }
1959
1960 pub async fn save_checkpoint(&self) -> Checkpoint {
1964 let state = self.state.snapshot();
1965 let tools: Vec<String> = self.tools.read().await.keys().cloned().collect();
1966 let log = self.log.lock().await;
1967 let events: Vec<Value> = log
1968 .events()
1969 .iter()
1970 .map(|e| serde_json::to_value(e).unwrap_or_default())
1971 .collect();
1972
1973 Checkpoint {
1974 checkpoint_id: Uuid::new_v4().to_string(),
1975 created_at: chrono::Utc::now(),
1976 state,
1977 events,
1978 tools,
1979 metadata: HashMap::new(),
1980 }
1981 }
1982
1983 pub async fn save_checkpoint_to_file(&self, path: &str) -> Result<(), String> {
1985 let checkpoint = self.save_checkpoint().await;
1986 let json = serde_json::to_string_pretty(&checkpoint)
1987 .map_err(|e| format!("serialize error: {}", e))?;
1988 tokio::fs::write(path, json)
1989 .await
1990 .map_err(|e| format!("write error: {}", e))?;
1991 Ok(())
1992 }
1993
1994 pub async fn load_checkpoint_from_file(&self, path: &str) -> Result<Checkpoint, String> {
1996 let json = tokio::fs::read_to_string(path)
1997 .await
1998 .map_err(|e| format!("read error: {}", e))?;
1999 let checkpoint: Checkpoint =
2000 serde_json::from_str(&json).map_err(|e| format!("deserialize error: {}", e))?;
2001 self.restore_checkpoint(&checkpoint).await;
2002 Ok(checkpoint)
2003 }
2004
2005 pub async fn restore_checkpoint(&self, checkpoint: &Checkpoint) {
2007 self.state.replace_all(checkpoint.state.clone());
2009 self.idempotency_cache.lock().await.clear();
2012 let mut tools = self.tools.write().await;
2014 tools.clear();
2015 for tool_name in &checkpoint.tools {
2016 let schema = ToolSchema {
2017 name: tool_name.clone(),
2018 description: String::new(),
2019 parameters: serde_json::Value::Object(Default::default()),
2020 returns: None,
2021 idempotent: false,
2022 cache_ttl_secs: None,
2023 rate_limit: None,
2024 };
2025 tools.insert(tool_name.clone(), schema);
2026 }
2027 }
2028
2029 pub async fn register_subprocess_tool(
2034 &self,
2035 name: &str,
2036 tool: crate::subprocess::SubprocessTool,
2037 ) {
2038 use crate::subprocess::SubprocessToolExecutor;
2039
2040 let schema = ToolSchema {
2041 name: name.to_string(),
2042 description: format!("Subprocess tool: {}", tool.command),
2043 parameters: serde_json::Value::Object(Default::default()),
2044 returns: None,
2045 idempotent: false,
2046 cache_ttl_secs: None,
2047 rate_limit: None,
2048 };
2049 self.register_tool_schema(schema).await;
2050
2051 let mut guard = self.tool_executor.lock().await;
2052 let mut executor = match guard.take() {
2053 Some(existing) => {
2054 let mut sub = SubprocessToolExecutor::new();
2055 sub = sub.with_fallback(existing);
2056 sub
2057 }
2058 None => SubprocessToolExecutor::new(),
2059 };
2060 executor.register(name, tool);
2061 *guard = Some(std::sync::Arc::new(executor));
2062 }
2063}
2064
2065impl Default for Runtime {
2066 fn default() -> Self {
2067 Self::new()
2068 }
2069}