1use crate::{
2 AdvanceRequest, AdvanceResult, AgentAction, AgentContext, AgentContextSnapshot,
3 AgentStateDelta, AgentTrigger, ApprovalDecision, ApprovalResolutionRecord, ContinueRequest,
4 DelegationRecord, DeliberationOutcome, DeliberationRecord, EngineOutcome, EnginePolicy,
5 ExecutionMetadata, ExecutionPlanRecord, KernelEvent, KernelEventRecord, LlmProvider,
6 MemoryError, MemoryStore, MemoryStoreExt, ModelDecisionRecord, OutcomeRecord,
7 PendingApprovalRecord, PlannedSkillCall, Planner, PolicyOverlay, PolicyOverlayPatch,
8 PolicyOverlayStatus, PolicyTuningAction, PolicyTuningRecord, ProcessRequest,
9 ProfilePatchRecord, ProviderContentPart, ProviderDecision, ProviderMessage, ProviderRequest,
10 ProviderRequestConfig, ProviderRole, ReflectionRecord, ResumeToken, RetryPolicy,
11 SelfImprovementMode, SessionRecord, SessionSnapshot, SkillBackendKind, SkillDefinition,
12 SkillFailure, SkillFailureKind, SkillInputValidationRecord, SkillInvocation, SkillManifest,
13 SkillStore, StopReason, StrategyPreferenceRecord, SummaryRecord, SuspendReason, ToolCallRecord,
14 ToolDependency, ToolExecutionGraph, ToolNode, ToolNodeCheckpointRecord, ToolNodeStatus,
15 ToolPerformanceRecord, ToolResultRecord, TriggerIntentRecord, TriggerRecord, WakeRequestRecord,
16};
17use async_trait::async_trait;
18use dashmap::DashMap;
19use metrics::{counter, gauge, histogram};
20use std::collections::{BTreeSet, HashMap};
21use std::sync::Arc;
22use std::time::{Instant, SystemTime};
23use thiserror::Error;
24use tokio::task::JoinSet;
25use tracing::{info, instrument, warn};
26use uuid::Uuid;
27
28#[derive(Debug, Error)]
29pub enum EngineError {
30 #[error("memory error: {0}")]
31 Memory(#[from] MemoryError),
32 #[error("blob error: {0}")]
33 Blob(String),
34 #[error("provider error: {0}")]
35 Provider(String),
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum EngineErrorKind {
40 Storage,
41 Blob,
42 Join,
43 Provider,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum EngineErrorSeverity {
48 Recoverable,
49 Fatal,
50}
51
52impl EngineError {
53 pub fn kind(&self) -> EngineErrorKind {
54 match self {
55 EngineError::Memory(_) => EngineErrorKind::Storage,
56 EngineError::Blob(_) => EngineErrorKind::Blob,
57 EngineError::Provider(_) => EngineErrorKind::Provider,
58 }
59 }
60
61 pub fn severity(&self) -> EngineErrorSeverity {
62 match self {
63 EngineError::Memory(_) => EngineErrorSeverity::Fatal,
64 EngineError::Blob(_) => EngineErrorSeverity::Recoverable,
65 EngineError::Provider(_) => EngineErrorSeverity::Recoverable,
66 }
67 }
68
69 pub fn is_recoverable(&self) -> bool {
70 self.severity() == EngineErrorSeverity::Recoverable
71 }
72}
73
74#[derive(Debug, Error, Clone, PartialEq)]
75#[error("{kind:?}: {message}")]
76pub struct SkillExecutionError {
77 pub kind: SkillFailureKind,
78 pub message: String,
79}
80
81impl SkillExecutionError {
82 pub fn new(kind: SkillFailureKind, message: impl Into<String>) -> Self {
83 Self {
84 kind,
85 message: message.into(),
86 }
87 }
88}
89
90#[async_trait]
91pub trait SkillExecutor: Send + Sync {
92 async fn execute(
93 &self,
94 invocation: SkillInvocation,
95 ) -> Result<serde_json::Value, SkillExecutionError>;
96
97 fn executor_kind(&self) -> &'static str;
98}
99
100pub trait WasmSkillExecutor: SkillExecutor {}
101
102impl<T> WasmSkillExecutor for T where T: SkillExecutor + ?Sized {}
103
104#[async_trait]
105pub trait NativeSkill: Send + Sync {
106 async fn execute(
107 &self,
108 invocation: SkillInvocation,
109 ) -> Result<serde_json::Value, SkillExecutionError>;
110
111 fn requires_human_approval(&self) -> bool {
112 false
113 }
114
115 fn executor_kind(&self) -> &'static str {
116 "native"
117 }
118}
119
120#[derive(Clone)]
121pub(crate) enum RegisteredSkillBackend {
122 Wasm(Arc<dyn SkillExecutor>),
123 Native(Arc<dyn NativeSkill>),
124}
125
126impl RegisteredSkillBackend {
127 fn kind(&self) -> SkillBackendKind {
128 match self {
129 RegisteredSkillBackend::Wasm(_) => SkillBackendKind::Wasm,
130 RegisteredSkillBackend::Native(_) => SkillBackendKind::Native,
131 }
132 }
133
134 fn executor_kind(&self) -> &'static str {
135 match self {
136 RegisteredSkillBackend::Wasm(executor) => executor.executor_kind(),
137 RegisteredSkillBackend::Native(executor) => executor.executor_kind(),
138 }
139 }
140
141 fn requires_human_approval(&self) -> bool {
142 match self {
143 RegisteredSkillBackend::Wasm(_) => false,
144 RegisteredSkillBackend::Native(executor) => executor.requires_human_approval(),
145 }
146 }
147}
148
149#[derive(Clone)]
150pub(crate) struct RegisteredSkill {
151 pub(crate) manifest: SkillManifest,
152 pub(crate) backend: RegisteredSkillBackend,
153}
154
155impl RegisteredSkill {
156 fn definition(&self) -> SkillDefinition {
157 SkillDefinition {
158 manifest: self.manifest.clone(),
159 executor_kind: self.backend.executor_kind().to_string(),
160 }
161 }
162}
163
164#[derive(Clone)]
165pub struct AgentEngine {
166 llm: Arc<dyn LlmProvider>,
167 memory: Arc<dyn MemoryStore>,
168 state_cache: Arc<dyn crate::StateProjectionCache>,
169 skill_store: Option<Arc<dyn SkillStore>>,
170 skills: Arc<DashMap<String, RegisteredSkill>>,
171 planner: Option<Arc<dyn Planner>>,
172}
173
174impl AgentEngine {
175 pub fn new(llm: Arc<dyn LlmProvider>, memory: Arc<dyn MemoryStore>) -> Self {
176 Self {
177 llm,
178 memory,
179 state_cache: Arc::new(crate::InMemoryStateCache::new()),
180 skill_store: None,
181 skills: Arc::new(DashMap::new()),
182 planner: None,
183 }
184 }
185
186 pub fn state_cache(&self) -> Arc<dyn crate::StateProjectionCache> {
187 self.state_cache.clone()
188 }
189
190 pub fn with_state_cache(mut self, state_cache: Arc<dyn crate::StateProjectionCache>) -> Self {
191 self.state_cache = state_cache;
192 self
193 }
194
195 pub fn with_skill_store(mut self, skill_store: Arc<dyn SkillStore>) -> Self {
196 self.skill_store = Some(skill_store);
197 self
198 }
199
200 pub fn with_planner(mut self, planner: Arc<dyn Planner>) -> Self {
201 self.planner = Some(planner);
202 self
203 }
204
205 pub fn register_native_skill(&self, manifest: SkillManifest, skill: Arc<dyn NativeSkill>) {
206 self.skills.insert(
207 manifest.name.clone(),
208 RegisteredSkill {
209 manifest,
210 backend: RegisteredSkillBackend::Native(skill),
211 },
212 );
213 }
214
215 pub fn register_wasm_skill(&self, manifest: SkillManifest, executor: Arc<dyn SkillExecutor>) {
216 self.skills.insert(
217 manifest.name.clone(),
218 RegisteredSkill {
219 manifest,
220 backend: RegisteredSkillBackend::Wasm(executor),
221 },
222 );
223 }
224
225 pub async fn register_wasm_skill_persistent(
226 &self,
227 manifest: SkillManifest,
228 executor: Arc<dyn SkillExecutor>,
229 wasm_bytes: Vec<u8>,
230 ) -> Result<(), String> {
231 if let Some(store) = &self.skill_store {
232 store.store_skill(manifest.clone(), wasm_bytes).await?;
233 }
234 self.register_wasm_skill(manifest, executor);
235 Ok(())
236 }
237
238 pub async fn advance(&self, request: AdvanceRequest) -> Result<AdvanceResult, EngineError> {
239 match request {
240 AdvanceRequest::Trigger(request) => self.advance_trigger(request).await,
241 AdvanceRequest::Continue(request) => self.advance_continue(request).await,
242 }
243 }
244
245 pub async fn skill_definitions(&self) -> Vec<SkillDefinition> {
246 let mut definitions = self
247 .skills
248 .iter()
249 .map(|entry| entry.value().definition())
250 .collect::<Vec<_>>();
251 definitions.sort_by(|left, right| left.manifest.name.cmp(&right.manifest.name));
252 definitions
253 }
254
255 async fn advance_trigger(&self, request: ProcessRequest) -> Result<AdvanceResult, EngineError> {
256 let started_at = SystemTime::now();
257 let trigger_id = Uuid::new_v4().to_string();
258
259 if let Some(idempotency_key) = request.idempotency_key.as_deref()
260 && let Ok(Some(mut prior_outcome)) = self
261 .memory
262 .find_outcome_by_idempotency_key(&request.session_id, idempotency_key)
263 .await
264 {
265 prior_outcome.idempotent_replay = true;
266 counter!("rain_engine.idempotent_replay_total").increment(1);
267 return Ok(AdvanceResult {
268 outcome: Some(prior_outcome),
269 emitted_events: Vec::new(),
270 state_delta: AgentStateDelta::default(),
271 wake_request: None,
272 });
273 }
274
275 let trigger_record = TriggerRecord {
276 trigger_id: trigger_id.clone(),
277 session_id: request.session_id.clone(),
278 idempotency_key: request.idempotency_key.clone(),
279 recorded_at: started_at,
280 trigger: request.trigger.clone(),
281 intent: None,
282 };
283 if let Err(err) = self.memory.append_trigger(trigger_record.clone()).await {
284 return Ok(AdvanceResult {
285 outcome: Some(storage_failure_outcome(trigger_id, 0, err.message)),
286 emitted_events: Vec::new(),
287 state_delta: AgentStateDelta::default(),
288 wake_request: None,
289 });
290 }
291
292 let mut snapshot = match self.state_cache.get_projection(&request.session_id).await {
293 Ok(Some(mut cached)) => {
294 cached.records.push(SessionRecord::Trigger(trigger_record));
295 cached
296 }
297 _ => match self.memory.load_session(&request.session_id).await {
298 Ok(snapshot) => snapshot,
299 Err(err) => {
300 return Ok(AdvanceResult {
301 outcome: Some(storage_failure_outcome(trigger_id, 0, err.message)),
302 emitted_events: Vec::new(),
303 state_delta: AgentStateDelta::default(),
304 wake_request: None,
305 });
306 }
307 },
308 };
309 counter!("rain_engine.triggers_total").increment(1);
310
311 let _ = self
314 .memory
315 .append_trigger_intent(
316 &request.session_id,
317 TriggerIntentRecord {
318 trigger_id: trigger_id.clone(),
319 classified_at: SystemTime::now(),
320 intent: classify_trigger_intent(&request.trigger),
321 },
322 )
323 .await;
324 if let Ok(refreshed) = self.memory.load_session(&request.session_id).await {
325 snapshot = refreshed;
326 }
327
328 if let Some(planner) = &self.planner {
330 let output = planner
331 .plan(&snapshot.agent_state(), &request.trigger)
332 .await;
333 let mut changed = false;
334 if !output.events.is_empty() {
335 for event in output.events {
336 let _ = self
337 .memory
338 .append_kernel_event(
339 &request.session_id,
340 KernelEventRecord {
341 event_id: Uuid::new_v4().to_string(),
342 occurred_at: SystemTime::now(),
343 event,
344 },
345 )
346 .await;
347 }
348 changed = true;
349 }
350 if let Some(plan) = output.proposed_plan {
351 let _ = self
352 .memory
353 .append_execution_plan(
354 &request.session_id,
355 ExecutionPlanRecord {
356 plan_id: format!("plan-{}", Uuid::new_v4()),
357 created_at: SystemTime::now(),
358 objective: plan.objective,
359 steps: plan.steps,
360 current_step_index: 0,
361 completed_at: None,
362 },
363 )
364 .await;
365 changed = true;
366 }
367 if changed {
368 if let Ok(refreshed) = self.memory.load_session(&request.session_id).await {
370 snapshot = refreshed;
371 }
372 }
373
374 if snapshot.records.len() > 10
376 && !snapshot
377 .records
378 .iter()
379 .any(|r| matches!(r, SessionRecord::Summary(_)))
380 && let Ok(summary) = self
381 .summarize_history(&snapshot, &request.policy, &request.trigger)
382 .await
383 {
384 let _ = self
385 .memory
386 .append_summary(&request.session_id, summary)
387 .await;
388 if let Ok(refreshed) = self.memory.load_session(&request.session_id).await {
390 snapshot = refreshed;
391 }
392 }
393 }
394 let effective_policy = request
395 .policy
396 .clone()
397 .with_overlay(snapshot.active_policy_overlay());
398 let deadline = Instant::now() + effective_policy.max_execution_time();
399 let mut context = AgentContext {
400 session_id: request.session_id.clone(),
401 records: snapshot.records.clone(),
402 prior_tool_results: snapshot.tool_results(),
403 granted_scopes: request.granted_scopes.clone(),
404 metadata: ExecutionMetadata {
405 trigger_id: trigger_id.clone(),
406 idempotency_key: request.idempotency_key.clone(),
407 started_at,
408 deadline,
409 policy: effective_policy,
410 provider: request.provider.clone(),
411 cancellation: request.cancellation.clone(),
412 },
413 };
414
415 counter!("rain_engine.triggers_total").increment(1);
416 info!(session_id = %context.session_id, trigger_id = %trigger_id, "processing trigger");
417
418 let emitted_events =
419 derive_trigger_kernel_events(&context.metadata.trigger_id, &request.trigger);
420 self.persist_kernel_events(&mut context, &emitted_events)
421 .await?;
422
423 let mut steps_executed = snapshot.current_step_count();
424 let mut consecutive_tool_failure_steps = snapshot.current_consecutive_tool_failure_steps();
425
426 if let AgentTrigger::Approval {
427 resume_token,
428 decision,
429 metadata,
430 } = &request.trigger
431 {
432 let mut pending = None;
433 let mut already_resolved = false;
434 let mut effective_decision = decision.clone();
435
436 if let Some(p) = self
437 .memory
438 .find_pending_approval_by_resume_token(&context.session_id, resume_token.as_str())
439 .await?
440 {
441 pending = Some(p);
442 } else {
443 let resolution = context.records.iter().find_map(|r| match r {
444 SessionRecord::ApprovalResolution(res)
445 if res.resume_token.as_str() == resume_token.as_str() =>
446 {
447 Some(res.clone())
448 }
449 _ => None,
450 });
451 if let Some(res) = resolution {
452 let original_pending = context.records.iter().find_map(|r| match r {
453 SessionRecord::PendingApproval(p)
454 if p.resume_token.as_str() == resume_token.as_str() =>
455 {
456 Some(p.clone())
457 }
458 _ => None,
459 });
460 if let Some(p) = original_pending {
461 pending = Some(p);
462 already_resolved = true;
463 effective_decision = res.decision.clone();
464 }
465 }
466 }
467
468 let Some(pending) = pending else {
469 let outcome = self
470 .finish(
471 &mut context,
472 StopReason::PolicyAborted,
473 None,
474 Some("resume token not found".to_string()),
475 steps_executed,
476 None,
477 )
478 .await?;
479 return Ok(build_advance_result(outcome, emitted_events));
480 };
481
482 if !already_resolved {
483 self.memory
484 .append_approval_resolution(
485 &context.session_id,
486 ApprovalResolutionRecord {
487 resume_token: pending.resume_token.clone(),
488 resolved_at: SystemTime::now(),
489 decision: effective_decision.clone(),
490 metadata: metadata.clone(),
491 },
492 )
493 .await?;
494 }
495
496 let resumed = match effective_decision {
497 ApprovalDecision::Approved => match self
498 .execute_planned_calls(
499 &context,
500 pending.step,
501 pending.pending_calls.clone(),
502 true,
503 )
504 .await?
505 {
506 BatchExecution::Executed(batch) => batch,
507 BatchExecution::Suspended { .. } => {
508 let outcome = self
509 .finish(
510 &mut context,
511 StopReason::PolicyAborted,
512 None,
513 Some("approval resume unexpectedly suspended".to_string()),
514 pending.step,
515 None,
516 )
517 .await?;
518 return Ok(build_advance_result(outcome, emitted_events));
519 }
520 },
521 ApprovalDecision::Rejected => ExecutedBatch {
522 results: pending
523 .pending_calls
524 .into_iter()
525 .map(|call| ToolResultRecord {
526 call_id: call.call_id,
527 finished_at: SystemTime::now(),
528 skill_name: call.name,
529 output: Err(SkillFailure {
530 kind: SkillFailureKind::PermissionDenied,
531 message: "human approval rejected".to_string(),
532 }),
533 })
534 .collect(),
535 all_failed: true,
536 },
537 };
538
539 for result in resumed.results {
540 self.memory
541 .append_tool_result(&context.session_id, result.clone())
542 .await?;
543 context.prior_tool_results.push(result.clone());
544 context.records.push(SessionRecord::ToolResult(result));
545 }
546 steps_executed = pending.step + 1;
547 if resumed.all_failed {
548 consecutive_tool_failure_steps += 1;
549 }
550 } else if let AgentTrigger::DelegationResult {
551 correlation_id: _,
552 payload: _,
553 metadata: _,
554 } = &request.trigger
555 {
556 }
572
573 self.perform_single_step(
574 context,
575 request.trigger,
576 steps_executed,
577 consecutive_tool_failure_steps,
578 emitted_events,
579 )
580 .await
581 }
582
583 async fn advance_continue(
584 &self,
585 request: ContinueRequest,
586 ) -> Result<AdvanceResult, EngineError> {
587 let snapshot = match self.memory.load_session(&request.session_id).await {
588 Ok(snapshot) => snapshot,
589 Err(err) => {
590 return Ok(AdvanceResult {
591 outcome: Some(storage_failure_outcome(
592 request.session_id.clone(),
593 0,
594 err.message,
595 )),
596 emitted_events: Vec::new(),
597 state_delta: AgentStateDelta::default(),
598 wake_request: None,
599 });
600 }
601 };
602
603 let Some(active_trigger) = snapshot.active_trigger() else {
604 return Ok(AdvanceResult {
605 outcome: Some(EngineOutcome {
606 trigger_id: Uuid::new_v4().to_string(),
607 stop_reason: StopReason::Yielded,
608 response: None,
609 detail: Some("no active trigger to continue".to_string()),
610 steps_executed: 0,
611 idempotent_replay: false,
612 resume_token: None,
613 }),
614 emitted_events: Vec::new(),
615 state_delta: AgentStateDelta::default(),
616 wake_request: None,
617 });
618 };
619
620 let trigger_id = active_trigger.trigger_id.clone();
621 let started_at = SystemTime::now();
622 let effective_policy = request
623 .policy
624 .clone()
625 .with_overlay(snapshot.active_policy_overlay());
626 let deadline = Instant::now() + effective_policy.max_execution_time();
627 let mut context = AgentContext {
628 session_id: request.session_id.clone(),
629 records: snapshot.records.clone(),
630 prior_tool_results: snapshot.tool_results(),
631 granted_scopes: request.granted_scopes.clone(),
632 metadata: ExecutionMetadata {
633 trigger_id,
634 idempotency_key: active_trigger.idempotency_key.clone(),
635 started_at,
636 deadline,
637 policy: effective_policy,
638 provider: request.provider.clone(),
639 cancellation: request.cancellation.clone(),
640 },
641 };
642
643 if let Some(graph) = snapshot.active_tool_execution_graph() {
644 let calls = graph
645 .nodes
646 .iter()
647 .map(|node| PlannedSkillCall {
648 call_id: node.call_id.clone(),
649 name: node.skill_name.clone(),
650 args: node.args.clone(),
651 priority: node.priority,
652 depends_on: node
653 .dependencies
654 .iter()
655 .map(|dependency| dependency.call_id.clone())
656 .collect(),
657 retry_policy: node.retry_policy.clone(),
658 dry_run: node.dry_run,
659 })
660 .collect::<Vec<_>>();
661 match self
662 .execute_planned_calls(&context, graph.step, calls, true)
663 .await?
664 {
665 BatchExecution::Executed(batch) => {
666 for result in batch.results {
667 self.memory
668 .append_tool_result(&context.session_id, result.clone())
669 .await?;
670 context.prior_tool_results.push(result.clone());
671 context.records.push(SessionRecord::ToolResult(result));
672 }
673 return self
676 .perform_single_step(
677 context,
678 active_trigger.trigger,
679 snapshot.current_step_count(),
680 snapshot.current_consecutive_tool_failure_steps(),
681 Vec::new(),
682 )
683 .await;
684 }
685 BatchExecution::Suspended { .. } => {
686 let outcome = self
687 .finish(
688 &mut context,
689 StopReason::PolicyAborted,
690 None,
691 Some("checkpointed graph unexpectedly suspended".to_string()),
692 graph.step,
693 None,
694 )
695 .await?;
696 return Ok(build_advance_result(outcome, Vec::new()));
697 }
698 }
699 }
700
701 self.perform_single_step(
702 context,
703 active_trigger.trigger,
704 snapshot.current_step_count(),
705 snapshot.current_consecutive_tool_failure_steps(),
706 Vec::new(),
707 )
708 .await
709 }
710
711 #[instrument(
712 skip(self, context, trigger, emitted_events),
713 fields(
714 session_id = %context.session_id,
715 trigger_id = %context.metadata.trigger_id,
716 step = steps_executed
717 )
718 )]
719 async fn perform_single_step(
720 &self,
721 mut context: AgentContext,
722 trigger: AgentTrigger,
723 steps_executed: usize,
724 consecutive_tool_failure_steps: usize,
725 emitted_events: Vec<KernelEventRecord>,
726 ) -> Result<AdvanceResult, EngineError> {
727 if let Some(mut plan) = context.active_execution_plan()
728 && plan.current_step_index < plan.steps.len()
729 {
730 let action = plan.steps[plan.current_step_index].clone();
731 plan.current_step_index += 1;
732 if plan.current_step_index >= plan.steps.len() {
733 plan.completed_at = Some(SystemTime::now());
734 }
735 let _ = self
736 .memory
737 .append_execution_plan(&context.session_id, plan)
738 .await;
739
740 return self
741 .execute_action(
742 context,
743 action,
744 steps_executed,
745 consecutive_tool_failure_steps,
746 emitted_events,
747 )
748 .await;
749 }
750
751 if let Some(outcome) = self
752 .policy_outcome(&mut context, steps_executed, consecutive_tool_failure_steps)
753 .await?
754 {
755 return Ok(build_advance_result(outcome, emitted_events));
756 }
757
758 let mut available_skills = self
759 .skills
760 .iter()
761 .filter(|skill| {
762 skill
763 .value()
764 .manifest
765 .required_scopes
766 .iter()
767 .all(|scope| context.granted_scopes.contains(scope))
768 })
769 .map(|skill| skill.value().definition())
770 .collect::<Vec<_>>();
771
772 let preferences: HashMap<_, _> = context
775 .records
776 .iter()
777 .filter_map(|record| {
778 if let SessionRecord::StrategyPreference(pref) = record
779 && let Some(skill_name) = &pref.skill_name
780 {
781 return Some((skill_name.clone(), pref.reason.clone()));
782 }
783 None
784 })
785 .collect();
786
787 for def in &mut available_skills {
788 if let Some(reason) = preferences.get(&def.manifest.name) {
789 def.manifest.description = format!(
790 "{} [CRITICAL WARNING: Avoid using this tool if possible. Reason: {}]",
791 def.manifest.description, reason
792 );
793 }
794 }
795
796 let plan = context.active_execution_plan();
797 let provider_request = ProviderRequest {
798 trigger: trigger.clone(),
799 context: context.to_snapshot(steps_executed),
800 available_skills,
801 config: context.metadata.provider.clone(),
802 policy: context.metadata.policy.clone(),
803 contents: build_provider_contents(&trigger, &context.records, plan.as_ref()),
804 };
805 let provider_started = Instant::now();
806 let decision = match tokio::time::timeout(
807 context.metadata.policy.provider_timeout(),
808 self.llm.generate_action(provider_request),
809 )
810 .await
811 {
812 Ok(Ok(decision)) => decision,
813 Ok(Err(err)) => {
814 warn!(session_id = %context.session_id, "provider failed: {}", err.message);
815 let outcome = self
816 .finish(
817 &mut context,
818 StopReason::ProviderFailure,
819 None,
820 Some(format!("provider failure: {}", err.message)),
821 steps_executed,
822 None,
823 )
824 .await?;
825 return Ok(build_advance_result(outcome, emitted_events));
826 }
827 Err(_) => {
828 warn!(session_id = %context.session_id, "provider timed out");
829 let outcome = self
830 .finish(
831 &mut context,
832 StopReason::ProviderFailure,
833 None,
834 Some("provider timeout exceeded".to_string()),
835 steps_executed,
836 None,
837 )
838 .await?;
839 return Ok(build_advance_result(outcome, emitted_events));
840 }
841 };
842 histogram!("rain_engine.provider_latency_seconds")
843 .record(provider_started.elapsed().as_secs_f64());
844 counter!("rain_engine.model_decisions_total", "action" => action_metric_label(&decision.action)).increment(1);
845
846 self.persist_provider_metadata(&mut context, &decision)
847 .await?;
848
849 let decision_record = ModelDecisionRecord {
850 step: steps_executed,
851 decided_at: SystemTime::now(),
852 action: decision.action.clone(),
853 };
854 if let Err(err) = self
855 .memory
856 .append_model_decision(&context.session_id, decision_record.clone())
857 .await
858 {
859 return Ok(AdvanceResult {
860 outcome: Some(storage_failure_outcome(
861 context.metadata.trigger_id.clone(),
862 steps_executed,
863 err.message,
864 )),
865 emitted_events,
866 state_delta: AgentStateDelta::default(),
867 wake_request: None,
868 });
869 }
870 context
871 .records
872 .push(SessionRecord::ModelDecision(decision_record));
873
874 self.execute_action(
875 context,
876 decision.action,
877 steps_executed,
878 consecutive_tool_failure_steps,
879 emitted_events,
880 )
881 .await
882 }
883
884 async fn execute_action(
885 &self,
886 mut context: AgentContext,
887 action: AgentAction,
888 steps_executed: usize,
889 _consecutive_tool_failure_steps: usize,
890 mut emitted_events: Vec<KernelEventRecord>,
891 ) -> Result<AdvanceResult, EngineError> {
892 match action {
893 AgentAction::Plan {
894 summary,
895 candidate_actions,
896 confidence,
897 } => {
898 let record = DeliberationRecord {
899 deliberation_id: Uuid::new_v4().to_string(),
900 trigger_id: context.metadata.trigger_id.clone(),
901 step: steps_executed,
902 created_at: SystemTime::now(),
903 summary,
904 candidate_actions,
905 confidence,
906 outcome: if confidence >= 0.7 {
907 DeliberationOutcome::ReadyToAct
908 } else {
909 DeliberationOutcome::NeedsRefinement
910 },
911 };
912 self.memory
913 .append_deliberation(&context.session_id, record.clone())
914 .await?;
915 context.records.push(SessionRecord::Deliberation(record));
916 Ok(AdvanceResult {
917 outcome: None,
918 emitted_events: emitted_events.clone(),
919 state_delta: derive_state_delta(&emitted_events),
920 wake_request: emitted_events.iter().find_map(extract_wake_request),
921 })
922 }
923 AgentAction::Respond { content } => {
924 let outcome = self
925 .finish(
926 &mut context,
927 StopReason::Responded,
928 Some(content),
929 None,
930 steps_executed + 1,
931 None,
932 )
933 .await?;
934 Ok(build_advance_result(outcome, emitted_events))
935 }
936 AgentAction::Yield { reason } => {
937 let outcome = self
938 .finish(
939 &mut context,
940 StopReason::Yielded,
941 None,
942 reason,
943 steps_executed + 1,
944 None,
945 )
946 .await?;
947 Ok(build_advance_result(outcome, emitted_events))
948 }
949 AgentAction::MemorySearch { query, limit } => {
950 let mut results = self
951 .memory
952 .list_records(crate::RecordPageQuery::new(context.session_id.clone()))
953 .await
954 .map(|p| p.records)
955 .unwrap_or_default();
956
957 let query_lower = query.to_lowercase();
958 results.retain(|r| {
959 format!("{:?}", r.record)
960 .to_lowercase()
961 .contains(&query_lower)
962 });
963 results.truncate(limit.max(1));
964
965 let observation = serde_json::to_value(results).unwrap_or_default();
966
967 let event = KernelEventRecord {
968 event_id: Uuid::new_v4().to_string(),
969 occurred_at: SystemTime::now(),
970 event: KernelEvent::MemorySearched { query, limit },
971 };
972 self.memory
973 .append_kernel_event(&context.session_id, event.clone())
974 .await?;
975 context
976 .records
977 .push(SessionRecord::KernelEvent(event.clone()));
978 emitted_events.push(event.clone());
979
980 let _observation_event = KernelEventRecord {
981 event_id: Uuid::new_v4().to_string(),
982 occurred_at: SystemTime::now(),
983 event: KernelEvent::ObservationAppended(crate::ObservationRecord {
984 observation_id: crate::ObservationId(Uuid::new_v4().to_string()),
985 recorded_at: SystemTime::now(),
986 source: "MemorySearch".to_string(),
987 content: observation,
988 attachment_ids: vec![],
989 related_resources: vec![],
990 }),
991 };
992 self.memory
993 .append_kernel_event(&context.session_id, _observation_event.clone())
994 .await?;
995 context
996 .records
997 .push(SessionRecord::KernelEvent(_observation_event.clone()));
998 emitted_events.push(_observation_event.clone());
999
1000 Ok(AdvanceResult {
1001 outcome: None,
1002 emitted_events: emitted_events.clone(),
1003 state_delta: derive_state_delta(&emitted_events),
1004 wake_request: emitted_events.iter().find_map(extract_wake_request),
1005 })
1006 }
1007 AgentAction::MemoryArchive { content } => {
1008 let event = KernelEventRecord {
1009 event_id: Uuid::new_v4().to_string(),
1010 occurred_at: SystemTime::now(),
1011 event: KernelEvent::MemoryArchived {
1012 content: content.clone(),
1013 },
1014 };
1015 self.memory
1016 .append_kernel_event(&context.session_id, event.clone())
1017 .await?;
1018 context
1019 .records
1020 .push(SessionRecord::KernelEvent(event.clone()));
1021 emitted_events.push(event.clone());
1022
1023 let _observation_event = KernelEventRecord {
1024 event_id: Uuid::new_v4().to_string(),
1025 occurred_at: SystemTime::now(),
1026 event: KernelEvent::ObservationAppended(crate::ObservationRecord {
1027 observation_id: crate::ObservationId(Uuid::new_v4().to_string()),
1028 recorded_at: SystemTime::now(),
1029 source: "MemoryArchive".to_string(),
1030 content: serde_json::json!({ "status": "archived successfully" }),
1031 attachment_ids: vec![],
1032 related_resources: vec![],
1033 }),
1034 };
1035 self.memory
1036 .append_kernel_event(&context.session_id, _observation_event.clone())
1037 .await?;
1038 context
1039 .records
1040 .push(SessionRecord::KernelEvent(_observation_event.clone()));
1041 emitted_events.push(_observation_event.clone());
1042
1043 Ok(AdvanceResult {
1044 outcome: None,
1045 emitted_events: emitted_events.clone(),
1046 state_delta: derive_state_delta(&emitted_events),
1047 wake_request: emitted_events.iter().find_map(extract_wake_request),
1048 })
1049 }
1050 AgentAction::Continue { .. } => Ok(AdvanceResult {
1051 outcome: None,
1052 emitted_events: emitted_events.clone(),
1053 state_delta: derive_state_delta(&emitted_events),
1054 wake_request: emitted_events.iter().find_map(extract_wake_request),
1055 }),
1056 AgentAction::Suspend {
1057 reason,
1058 pending_calls,
1059 resume_token,
1060 } => {
1061 let outcome = self
1062 .suspend(
1063 &mut context,
1064 steps_executed,
1065 reason,
1066 pending_calls,
1067 resume_token,
1068 )
1069 .await?;
1070 Ok(build_advance_result(outcome, emitted_events))
1071 }
1072 AgentAction::CallSkills(calls) => match self
1073 .execute_planned_calls(&context, steps_executed, calls, false)
1074 .await?
1075 {
1076 BatchExecution::Executed(ExecutedBatch {
1077 results,
1078 all_failed,
1079 }) => {
1080 for result in results {
1081 if let Err(err) = self
1082 .memory
1083 .append_tool_result(&context.session_id, result.clone())
1084 .await
1085 {
1086 return Ok(AdvanceResult {
1087 outcome: Some(storage_failure_outcome(
1088 context.metadata.trigger_id.clone(),
1089 steps_executed + 1,
1090 err.message,
1091 )),
1092 emitted_events,
1093 state_delta: AgentStateDelta::default(),
1094 wake_request: None,
1095 });
1096 }
1097 context.prior_tool_results.push(result.clone());
1098 context.records.push(SessionRecord::ToolResult(result));
1099 }
1100 let _ = all_failed;
1101 Ok(AdvanceResult {
1102 outcome: None,
1103 emitted_events: emitted_events.clone(),
1104 state_delta: derive_state_delta(&emitted_events),
1105 wake_request: emitted_events.iter().find_map(extract_wake_request),
1106 })
1107 }
1108 BatchExecution::Suspended {
1109 reason,
1110 pending_calls,
1111 resume_token,
1112 } => {
1113 let outcome = self
1114 .suspend(
1115 &mut context,
1116 steps_executed,
1117 reason,
1118 pending_calls,
1119 resume_token,
1120 )
1121 .await?;
1122 Ok(build_advance_result(outcome, emitted_events))
1123 }
1124 },
1125 AgentAction::Delegate {
1126 target,
1127 task,
1128 correlation_id,
1129 resume_token,
1130 } => {
1131 let record = DelegationRecord {
1132 correlation_id,
1133 created_at: SystemTime::now(),
1134 trigger_id: context.metadata.trigger_id.clone(),
1135 target,
1136 task,
1137 resume_token: resume_token.clone(),
1138 };
1139 self.memory
1140 .append_delegation(&context.session_id, record.clone())
1141 .await?;
1142 context
1143 .records
1144 .push(SessionRecord::Delegation(record.clone()));
1145 let event = KernelEventRecord {
1146 event_id: format!("delegation-{}", record.correlation_id.as_str()),
1147 occurred_at: record.created_at,
1148 event: KernelEvent::DelegationRequested(record),
1149 };
1150 self.memory
1151 .append_kernel_event(&context.session_id, event.clone())
1152 .await?;
1153 context
1154 .records
1155 .push(SessionRecord::KernelEvent(event.clone()));
1156 emitted_events.push(event);
1157 let outcome = self
1158 .finish(
1159 &mut context,
1160 StopReason::Delegated,
1161 None,
1162 Some("delegated to downstream worker".to_string()),
1163 steps_executed + 1,
1164 Some(resume_token),
1165 )
1166 .await?;
1167 Ok(build_advance_result(outcome, emitted_events))
1168 }
1169 }
1170 }
1171
1172 async fn policy_outcome(
1173 &self,
1174 context: &mut AgentContext,
1175 steps_executed: usize,
1176 consecutive_tool_failure_steps: usize,
1177 ) -> Result<Option<EngineOutcome>, EngineError> {
1178 if context.metadata.cancellation.is_cancelled() {
1179 return self
1180 .finish(
1181 context,
1182 StopReason::Cancelled,
1183 None,
1184 Some("execution cancelled".to_string()),
1185 steps_executed,
1186 None,
1187 )
1188 .await
1189 .map(Some);
1190 }
1191
1192 if Instant::now() >= context.metadata.deadline {
1193 return self
1194 .finish(
1195 context,
1196 StopReason::DeadlineExceeded,
1197 None,
1198 Some("engine execution deadline exceeded".to_string()),
1199 steps_executed,
1200 None,
1201 )
1202 .await
1203 .map(Some);
1204 }
1205
1206 if steps_executed >= context.metadata.policy.max_steps {
1207 return self
1208 .finish(
1209 context,
1210 StopReason::MaxStepsReached,
1211 None,
1212 Some("max steps reached".to_string()),
1213 steps_executed,
1214 None,
1215 )
1216 .await
1217 .map(Some);
1218 }
1219
1220 if consecutive_tool_failure_steps >= context.metadata.policy.max_consecutive_tool_failures {
1221 return self
1222 .finish(
1223 context,
1224 StopReason::PolicyAborted,
1225 None,
1226 Some("max consecutive tool failure steps reached".to_string()),
1227 steps_executed,
1228 None,
1229 )
1230 .await
1231 .map(Some);
1232 }
1233
1234 let cost_so_far = context
1235 .records
1236 .iter()
1237 .filter_map(|record| match record {
1238 SessionRecord::ProviderUsage(usage) => Some(usage.estimated_cost_usd),
1239 _ => None,
1240 })
1241 .sum::<f64>();
1242 if cost_so_far >= context.metadata.policy.max_cost_per_session {
1243 return self
1244 .finish(
1245 context,
1246 StopReason::PolicyAborted,
1247 None,
1248 Some("session cost limit reached".to_string()),
1249 steps_executed,
1250 None,
1251 )
1252 .await
1253 .map(Some);
1254 }
1255
1256 Ok(None)
1257 }
1258
1259 async fn persist_kernel_events(
1260 &self,
1261 context: &mut AgentContext,
1262 events: &[KernelEventRecord],
1263 ) -> Result<(), EngineError> {
1264 for event in events {
1265 self.memory
1266 .append_kernel_event(&context.session_id, event.clone())
1267 .await?;
1268 context
1269 .records
1270 .push(SessionRecord::KernelEvent(event.clone()));
1271 }
1272 Ok(())
1273 }
1274
1275 async fn persist_provider_metadata(
1276 &self,
1277 context: &mut AgentContext,
1278 decision: &ProviderDecision,
1279 ) -> Result<(), EngineError> {
1280 if let Some(usage) = &decision.usage {
1281 self.memory
1282 .append_provider_usage(&context.session_id, usage.clone())
1283 .await?;
1284 context
1285 .records
1286 .push(SessionRecord::ProviderUsage(usage.clone()));
1287 }
1288 if let Some(cache) = &decision.cache {
1289 self.memory
1290 .append_provider_cache(&context.session_id, cache.clone())
1291 .await?;
1292 context
1293 .records
1294 .push(SessionRecord::ProviderCache(cache.clone()));
1295 }
1296 Ok(())
1297 }
1298
1299 async fn suspend(
1300 &self,
1301 context: &mut AgentContext,
1302 step: usize,
1303 reason: SuspendReason,
1304 pending_calls: Vec<PlannedSkillCall>,
1305 resume_token: ResumeToken,
1306 ) -> Result<EngineOutcome, EngineError> {
1307 let pending = PendingApprovalRecord {
1308 resume_token: resume_token.clone(),
1309 created_at: SystemTime::now(),
1310 trigger_id: context.metadata.trigger_id.clone(),
1311 step,
1312 reason: reason.clone(),
1313 pending_calls,
1314 };
1315 self.memory
1316 .append_pending_approval(&context.session_id, pending.clone())
1317 .await?;
1318 context
1319 .records
1320 .push(SessionRecord::PendingApproval(pending.clone()));
1321 self.finish(
1322 context,
1323 StopReason::Suspended,
1324 None,
1325 Some(match reason {
1326 SuspendReason::HumanApprovalRequired { .. } => {
1327 "human approval required".to_string()
1328 }
1329 SuspendReason::ProviderRequested { message } => message,
1330 }),
1331 step,
1332 Some(resume_token),
1333 )
1334 .await
1335 }
1336
1337 #[instrument(
1338 skip(self, context, calls),
1339 fields(
1340 session_id = %context.session_id,
1341 trigger_id = %context.metadata.trigger_id,
1342 step,
1343 call_count = calls.len(),
1344 max_parallel = context.metadata.policy.max_parallel_skill_calls
1345 )
1346 )]
1347 async fn execute_planned_calls(
1348 &self,
1349 context: &AgentContext,
1350 step: usize,
1351 calls: Vec<PlannedSkillCall>,
1352 approval_override: bool,
1353 ) -> Result<BatchExecution, EngineError> {
1354 if !approval_override {
1355 let approval_calls = calls
1356 .iter()
1357 .filter_map(|call| {
1358 let skill = self.skills.get(&call.name)?;
1359 skill
1360 .backend
1361 .requires_human_approval()
1362 .then_some(call.name.clone())
1363 })
1364 .collect::<Vec<_>>();
1365 if !approval_calls.is_empty() {
1366 counter!("rain_engine.approval_suspensions_total").increment(1);
1367 return Ok(BatchExecution::Suspended {
1368 reason: SuspendReason::HumanApprovalRequired {
1369 skill_names: approval_calls,
1370 },
1371 pending_calls: calls,
1372 resume_token: ResumeToken(Uuid::new_v4().to_string()),
1373 });
1374 }
1375 }
1376
1377 let graph = existing_or_new_graph(context, step, &calls);
1378 if !context.records.iter().any(|record| {
1379 matches!(
1380 record,
1381 SessionRecord::ToolExecutionGraph(existing) if existing.graph_id == graph.graph_id
1382 )
1383 }) {
1384 self.memory
1385 .append_tool_execution_graph(&context.session_id, graph.clone())
1386 .await?;
1387 for node in &graph.nodes {
1388 self.append_tool_checkpoint(context, &graph, node, ToolNodeStatus::Queued, 0, None)
1389 .await?;
1390 }
1391 }
1392
1393 let mut status_by_call = latest_tool_statuses(context, &graph.graph_id);
1394 let mut attempts_by_call = started_attempt_counts(context, &graph.graph_id);
1395 let mut results_by_call = context
1396 .records
1397 .iter()
1398 .filter_map(|record| match record {
1399 SessionRecord::ToolResult(result) => Some((result.call_id.clone(), result.clone())),
1400 _ => None,
1401 })
1402 .collect::<HashMap<_, _>>();
1403 let mut new_results = Vec::<ToolResultRecord>::new();
1404 let max_parallel = context
1405 .metadata
1406 .policy
1407 .max_parallel_skill_calls
1408 .max(1)
1409 .min(context.metadata.policy.max_ready_tool_nodes.max(1));
1410 gauge!("rain_engine.registered_skills").set(self.skills.len() as f64);
1411
1412 loop {
1413 let skipped = self
1414 .skip_blocked_nodes(
1415 context,
1416 &graph,
1417 &mut status_by_call,
1418 &mut results_by_call,
1419 &mut new_results,
1420 step,
1421 )
1422 .await?;
1423 let ready = ready_nodes(&graph, &status_by_call)
1424 .into_iter()
1425 .take(context.metadata.policy.max_ready_tool_nodes.max(1))
1426 .collect::<Vec<_>>();
1427 if ready.is_empty() {
1428 if !skipped {
1429 break;
1430 }
1431 continue;
1432 }
1433
1434 let mut join_set = JoinSet::new();
1435 for node in ready.into_iter().take(max_parallel) {
1436 let prepared = self
1437 .prepare_node(context, &graph, &node, step, &mut attempts_by_call)
1438 .await?;
1439 match prepared {
1440 PreparedNode::Executable(prepared) => {
1441 join_set.spawn(run_prepared_call(*prepared));
1442 }
1443 PreparedNode::Immediate(result) => {
1444 let status = final_status_for_result(&result);
1445 status_by_call.insert(node.call_id.clone(), status);
1446 results_by_call.insert(result.call_id.clone(), result.clone());
1447 new_results.push(result);
1448 }
1449 }
1450 }
1451
1452 while let Some(joined) = join_set.join_next().await {
1453 let result = joined.map_err(|err| EngineError::Blob(err.to_string()))??;
1454 let Some(node) = graph
1455 .nodes
1456 .iter()
1457 .find(|node| node.call_id == result.call_id)
1458 else {
1459 continue;
1460 };
1461 let status = final_status_for_result(&result);
1462 self.append_tool_checkpoint(
1463 context,
1464 &graph,
1465 node,
1466 status.clone(),
1467 *attempts_by_call.get(&node.call_id).unwrap_or(&1),
1468 result_detail(&result),
1469 )
1470 .await?;
1471 status_by_call.insert(node.call_id.clone(), status);
1472 results_by_call.insert(result.call_id.clone(), result.clone());
1473 new_results.push(result);
1474 }
1475 }
1476
1477 let ordered = graph
1478 .nodes
1479 .iter()
1480 .filter_map(|node| {
1481 new_results
1482 .iter()
1483 .find(|result| result.call_id == node.call_id)
1484 .cloned()
1485 })
1486 .collect::<Vec<_>>();
1487 let any_success = ordered.iter().any(|result| result.output.is_ok());
1488 Ok(BatchExecution::Executed(ExecutedBatch {
1489 results: ordered,
1490 all_failed: !any_success,
1491 }))
1492 }
1493
1494 async fn append_tool_checkpoint(
1495 &self,
1496 context: &AgentContext,
1497 graph: &ToolExecutionGraph,
1498 node: &ToolNode,
1499 status: ToolNodeStatus,
1500 attempt: usize,
1501 detail: Option<String>,
1502 ) -> Result<(), EngineError> {
1503 let record = ToolNodeCheckpointRecord {
1504 checkpoint_id: Uuid::new_v4().to_string(),
1505 graph_id: graph.graph_id.clone(),
1506 call_id: node.call_id.clone(),
1507 skill_name: node.skill_name.clone(),
1508 step: graph.step,
1509 status,
1510 attempt,
1511 occurred_at: SystemTime::now(),
1512 detail,
1513 };
1514 self.memory
1515 .append_tool_node_checkpoint(&context.session_id, record)
1516 .await?;
1517 Ok(())
1518 }
1519
1520 async fn skip_blocked_nodes(
1521 &self,
1522 context: &AgentContext,
1523 graph: &ToolExecutionGraph,
1524 status_by_call: &mut HashMap<String, ToolNodeStatus>,
1525 results_by_call: &mut HashMap<String, ToolResultRecord>,
1526 new_results: &mut Vec<ToolResultRecord>,
1527 step: usize,
1528 ) -> Result<bool, EngineError> {
1529 let mut changed = false;
1530 for node in &graph.nodes {
1531 if is_terminal_status(status_by_call.get(&node.call_id)) {
1532 continue;
1533 }
1534 let blocked_by = node.dependencies.iter().find(|dependency| {
1535 matches!(
1536 status_by_call.get(&dependency.call_id),
1537 Some(ToolNodeStatus::Failed)
1538 | Some(ToolNodeStatus::Skipped)
1539 | Some(ToolNodeStatus::TimedOut)
1540 )
1541 });
1542 if let Some(blocked_by) = blocked_by {
1543 let message = format!("dependency `{}` did not succeed", blocked_by.call_id);
1544 let result = self.error_result(
1545 node.call_id.clone(),
1546 node.skill_name.clone(),
1547 SkillFailureKind::Internal,
1548 message.clone(),
1549 );
1550 self.append_tool_checkpoint(
1551 context,
1552 graph,
1553 node,
1554 ToolNodeStatus::Skipped,
1555 0,
1556 Some(message),
1557 )
1558 .await?;
1559 status_by_call.insert(node.call_id.clone(), ToolNodeStatus::Skipped);
1560 results_by_call.insert(node.call_id.clone(), result.clone());
1561 new_results.push(result);
1562 let _ = step;
1563 changed = true;
1564 }
1565 }
1566 Ok(changed)
1567 }
1568
1569 async fn prepare_node(
1570 &self,
1571 context: &AgentContext,
1572 graph: &ToolExecutionGraph,
1573 node: &ToolNode,
1574 step: usize,
1575 attempts_by_call: &mut HashMap<String, usize>,
1576 ) -> Result<PreparedNode, EngineError> {
1577 let Some(skill) = self
1578 .skills
1579 .get(&node.skill_name)
1580 .map(|entry| entry.value().clone())
1581 else {
1582 self.append_validation(
1583 context,
1584 graph,
1585 node,
1586 false,
1587 vec![format!("skill `{}` is not registered", node.skill_name)],
1588 )
1589 .await?;
1590 self.append_tool_checkpoint(
1591 context,
1592 graph,
1593 node,
1594 ToolNodeStatus::Failed,
1595 0,
1596 Some(format!("skill `{}` is not registered", node.skill_name)),
1597 )
1598 .await?;
1599 return Ok(PreparedNode::Immediate(self.error_result(
1600 node.call_id.clone(),
1601 node.skill_name.clone(),
1602 SkillFailureKind::Internal,
1603 format!("skill `{}` is not registered", node.skill_name),
1604 )));
1605 };
1606
1607 if context.metadata.policy.validate_tool_args {
1608 let errors = validate_against_schema(&node.args, &skill.manifest.input_schema);
1609 self.append_validation(context, graph, node, errors.is_empty(), errors.clone())
1610 .await?;
1611 if !errors.is_empty() {
1612 let message = errors.join("; ");
1613 self.append_tool_checkpoint(
1614 context,
1615 graph,
1616 node,
1617 ToolNodeStatus::Failed,
1618 0,
1619 Some(message.clone()),
1620 )
1621 .await?;
1622 return Ok(PreparedNode::Immediate(self.error_result(
1623 node.call_id.clone(),
1624 node.skill_name.clone(),
1625 SkillFailureKind::InvalidArguments,
1626 message,
1627 )));
1628 }
1629 }
1630 self.append_tool_checkpoint(context, graph, node, ToolNodeStatus::Validated, 0, None)
1631 .await?;
1632
1633 if !skill
1634 .manifest
1635 .required_scopes
1636 .iter()
1637 .all(|scope| context.granted_scopes.contains(scope))
1638 {
1639 counter!("rain_engine.permission_denials_total").increment(1);
1640 self.append_tool_checkpoint(
1641 context,
1642 graph,
1643 node,
1644 ToolNodeStatus::Failed,
1645 0,
1646 Some(format!(
1647 "missing required scopes for skill `{}`",
1648 node.skill_name
1649 )),
1650 )
1651 .await?;
1652 return Ok(PreparedNode::Immediate(self.error_result(
1653 node.call_id.clone(),
1654 node.skill_name.clone(),
1655 SkillFailureKind::PermissionDenied,
1656 format!("missing required scopes for skill `{}`", node.skill_name),
1657 )));
1658 }
1659
1660 if matches!(skill.backend, RegisteredSkillBackend::Native(_))
1661 && !context.metadata.policy.allow_native_skills
1662 {
1663 self.append_tool_checkpoint(
1664 context,
1665 graph,
1666 node,
1667 ToolNodeStatus::Failed,
1668 0,
1669 Some("native skills are disabled by policy".to_string()),
1670 )
1671 .await?;
1672 return Ok(PreparedNode::Immediate(self.error_result(
1673 node.call_id.clone(),
1674 node.skill_name.clone(),
1675 SkillFailureKind::PermissionDenied,
1676 "native skills are disabled by policy".to_string(),
1677 )));
1678 }
1679
1680 let mut manifest = skill.manifest.clone();
1681 manifest.resource_policy = manifest.effective_resource_policy(&context.metadata.policy);
1682 if node.dry_run
1683 && (!context.metadata.policy.enable_tool_dry_run
1684 || !manifest.resource_policy.dry_run_supported)
1685 {
1686 self.append_tool_checkpoint(
1687 context,
1688 graph,
1689 node,
1690 ToolNodeStatus::Failed,
1691 0,
1692 Some("dry-run execution is not enabled for this skill".to_string()),
1693 )
1694 .await?;
1695 return Ok(PreparedNode::Immediate(self.error_result(
1696 node.call_id.clone(),
1697 node.skill_name.clone(),
1698 SkillFailureKind::CapabilityDenied,
1699 "dry-run execution is not enabled for this skill".to_string(),
1700 )));
1701 }
1702
1703 if self.is_skill_circuit_broken(&node.skill_name, context) {
1704 counter!("rain_engine.circuit_breaker_trips_total", "skill" => node.skill_name.clone())
1705 .increment(1);
1706 self.append_tool_checkpoint(
1707 context,
1708 graph,
1709 node,
1710 ToolNodeStatus::Failed,
1711 0,
1712 Some(format!(
1713 "circuit breaker tripped for skill `{}`",
1714 node.skill_name
1715 )),
1716 )
1717 .await?;
1718 return Ok(PreparedNode::Immediate(self.error_result(
1719 node.call_id.clone(),
1720 node.skill_name.clone(),
1721 SkillFailureKind::CapabilityDenied,
1722 format!("circuit breaker tripped for skill `{}`", node.skill_name),
1723 )));
1724 }
1725
1726 let attempt = attempts_by_call.entry(node.call_id.clone()).or_insert(0);
1727 *attempt += 1;
1728 self.append_tool_checkpoint(
1729 context,
1730 graph,
1731 node,
1732 ToolNodeStatus::Started,
1733 *attempt,
1734 None,
1735 )
1736 .await?;
1737
1738 let call_record = ToolCallRecord {
1739 call_id: node.call_id.clone(),
1740 step,
1741 called_at: SystemTime::now(),
1742 skill_name: skill.manifest.name.clone(),
1743 args: node.args.clone(),
1744 backend_kind: skill.backend.kind(),
1745 };
1746 self.memory
1747 .append_tool_call(&context.session_id, call_record)
1748 .await?;
1749 counter!(
1750 "rain_engine.tool_calls_total",
1751 "skill" => skill.manifest.name.clone(),
1752 "backend" => format!("{:?}", skill.backend.kind())
1753 )
1754 .increment(1);
1755
1756 let mut retry_policy = node.retry_policy.policy.clone();
1757 retry_policy.max_attempts = retry_policy
1758 .max_attempts
1759 .min(manifest.resource_policy.retry_policy.max_attempts)
1760 .min(context.metadata.policy.max_tool_retries_per_step);
1761
1762 Ok(PreparedNode::Executable(Box::new(PreparedCall {
1763 call_id: node.call_id.clone(),
1764 name: node.skill_name.clone(),
1765 args: node.args.clone(),
1766 manifest,
1767 backend: skill.backend.clone(),
1768 context_snapshot: context.to_snapshot(step),
1769 dry_run: node.dry_run,
1770 retry_policy,
1771 })))
1772 }
1773
1774 async fn append_validation(
1775 &self,
1776 context: &AgentContext,
1777 graph: &ToolExecutionGraph,
1778 node: &ToolNode,
1779 valid: bool,
1780 errors: Vec<String>,
1781 ) -> Result<(), EngineError> {
1782 let record = SkillInputValidationRecord {
1783 validation_id: Uuid::new_v4().to_string(),
1784 graph_id: graph.graph_id.clone(),
1785 call_id: node.call_id.clone(),
1786 skill_name: node.skill_name.clone(),
1787 validated_at: SystemTime::now(),
1788 valid,
1789 errors,
1790 };
1791 self.memory
1792 .append_skill_input_validation(&context.session_id, record)
1793 .await?;
1794 Ok(())
1795 }
1796
1797 async fn finish(
1798 &self,
1799 context: &mut AgentContext,
1800 stop_reason: StopReason,
1801 response: Option<String>,
1802 detail: Option<String>,
1803 steps_executed: usize,
1804 resume_token: Option<ResumeToken>,
1805 ) -> Result<EngineOutcome, EngineError> {
1806 let outcome = OutcomeRecord {
1807 trigger_id: context.metadata.trigger_id.clone(),
1808 idempotency_key: context.metadata.idempotency_key.clone(),
1809 finished_at: SystemTime::now(),
1810 stop_reason: stop_reason.clone(),
1811 response: response.clone(),
1812 detail: detail.clone(),
1813 steps_executed,
1814 resume_token: resume_token.clone(),
1815 };
1816 if let Err(err) = self
1817 .memory
1818 .append_outcome(&context.session_id, outcome.clone())
1819 .await
1820 {
1821 warn!(session_id = %context.session_id, "failed to record outcome: {}", err.message);
1822 }
1823
1824 context
1825 .records
1826 .push(SessionRecord::Outcome(outcome.clone()));
1827
1828 let outcome_clone = outcome.clone();
1829 self.run_self_improvement(context, outcome_clone).await?;
1830
1831 let _ = self
1833 .state_cache
1834 .set_projection(
1835 &context.session_id,
1836 SessionSnapshot {
1837 session_id: context.session_id.clone(),
1838 records: context.records.clone(),
1839 last_sequence_no: None,
1840 latest_outcome: Some(outcome.clone()),
1841 },
1842 )
1843 .await;
1844
1845 Ok(EngineOutcome {
1846 trigger_id: outcome.trigger_id,
1847 stop_reason,
1848 response,
1849 detail,
1850 steps_executed,
1851 idempotent_replay: false,
1852 resume_token,
1853 })
1854 }
1855
1856 #[instrument(
1857 skip(self, context, outcome),
1858 fields(
1859 session_id = %context.session_id,
1860 trigger_id = %context.metadata.trigger_id,
1861 stop_reason = ?outcome.stop_reason
1862 )
1863 )]
1864 async fn run_self_improvement(
1865 &self,
1866 context: &mut AgentContext,
1867 outcome: OutcomeRecord,
1868 ) -> Result<(), EngineError> {
1869 let snapshot = context.to_snapshot(outcome.steps_executed);
1870 let session_id = context.session_id.clone();
1871 let policy = snapshot.policy.self_improvement.clone();
1872 if !policy.enabled {
1873 return Ok(());
1874 }
1875
1876 counter!("rain_engine.self_improvement_reflections_total").increment(1);
1877
1878 let observations = reflection_observations(&snapshot, &outcome);
1879 let reflection = ReflectionRecord {
1880 reflection_id: format!("reflection-{}", Uuid::new_v4()),
1881 created_at: SystemTime::now(),
1882 trigger_id: snapshot.trigger_id.clone(),
1883 summary: format!(
1884 "Observed {:?} after {} step(s); evaluating future policy and strategy.",
1885 outcome.stop_reason, outcome.steps_executed
1886 ),
1887 observations,
1888 confidence: 0.72,
1889 };
1890 self.memory
1891 .append_reflection(&session_id, reflection.clone())
1892 .await?;
1893 context.records.push(SessionRecord::Reflection(reflection));
1894
1895 for performance in summarize_tool_performance(&snapshot.history) {
1896 self.memory
1897 .append_tool_performance(&session_id, performance.clone())
1898 .await?;
1899 context
1900 .records
1901 .push(SessionRecord::ToolPerformance(performance.clone()));
1902
1903 if performance.calls > 0 {
1904 counter!(
1905 "rain_engine.tool_performance_summaries_total",
1906 "skill" => performance.skill_name.clone()
1907 )
1908 .increment(1);
1909 }
1910 if performance.failure_rate > 0.5 {
1911 let preference = StrategyPreferenceRecord {
1912 preference_id: format!("strategy-{}", Uuid::new_v4()),
1913 created_at: SystemTime::now(),
1914 skill_name: Some(performance.skill_name.clone()),
1915 preference: "avoid_when_alternatives_exist".to_string(),
1916 reason: format!(
1917 "{} failed in {:.0}% of recent calls",
1918 performance.skill_name,
1919 performance.failure_rate * 100.0
1920 ),
1921 confidence: 0.68,
1922 };
1923 self.memory
1924 .append_strategy_preference(&session_id, preference.clone())
1925 .await?;
1926 context
1927 .records
1928 .push(SessionRecord::StrategyPreference(preference));
1929 }
1930 }
1931
1932 if terminal_observation_count(&snapshot.history) < policy.min_observations_before_tuning {
1933 return Ok(());
1934 }
1935
1936 if let Some(rollback) = maybe_rollback_regression(&snapshot, &outcome) {
1937 self.memory
1938 .append_policy_tuning(&session_id, rollback.clone())
1939 .await?;
1940 context.records.push(SessionRecord::PolicyTuning(rollback));
1941 counter!("rain_engine.self_improvement_rollbacks_total").increment(1);
1942 return Ok(());
1943 }
1944
1945 let Some(tuning) = propose_policy_tuning(&snapshot, &outcome) else {
1946 return Ok(());
1947 };
1948 match tuning.action {
1949 PolicyTuningAction::Applied => {
1950 counter!("rain_engine.self_improvement_overlays_applied_total").increment(1)
1951 }
1952 PolicyTuningAction::RejectedUnsafe => {
1953 counter!("rain_engine.self_improvement_rejected_unsafe_total").increment(1)
1954 }
1955 PolicyTuningAction::Proposed | PolicyTuningAction::RolledBack => {}
1956 }
1957 self.memory
1958 .append_policy_tuning(&session_id, tuning.clone())
1959 .await?;
1960 context.records.push(SessionRecord::PolicyTuning(tuning));
1961
1962 let profile_patch = ProfilePatchRecord {
1963 patch_id: format!("profile-patch-{}", Uuid::new_v4()),
1964 created_at: SystemTime::now(),
1965 description: "No capability or scope expansion was applied automatically.".to_string(),
1966 patch: serde_json::json!({"guardrail": "privilege_expansion_requires_approval"}),
1967 requires_approval: false,
1968 applied: true,
1969 };
1970 self.memory
1971 .append_profile_patch(&session_id, profile_patch.clone())
1972 .await?;
1973 context
1974 .records
1975 .push(SessionRecord::ProfilePatch(profile_patch));
1976
1977 Ok(())
1978 }
1979
1980 fn is_skill_circuit_broken(&self, skill_name: &str, context: &AgentContext) -> bool {
1981 let performance = summarize_tool_performance(&context.records);
1982 if let Some(perf) = performance.into_iter().find(|p| p.skill_name == skill_name)
1983 && perf.calls >= 3
1984 {
1985 let threshold = self
1986 .skills
1987 .get(skill_name)
1988 .map(|s| s.value().definition().manifest.circuit_breaker_threshold)
1989 .unwrap_or(0.5);
1990 return perf.failure_rate >= threshold;
1991 }
1992 false
1993 }
1994
1995 fn error_result(
1996 &self,
1997 call_id: String,
1998 skill_name: String,
1999 kind: SkillFailureKind,
2000 message: String,
2001 ) -> ToolResultRecord {
2002 ToolResultRecord {
2003 call_id,
2004 finished_at: SystemTime::now(),
2005 skill_name,
2006 output: Err(SkillFailure { kind, message }),
2007 }
2008 }
2009
2010 async fn summarize_history(
2011 &self,
2012 snapshot: &SessionSnapshot,
2013 policy: &EnginePolicy,
2014 trigger: &AgentTrigger,
2015 ) -> Result<SummaryRecord, EngineError> {
2016 let history_text = snapshot
2017 .records
2018 .iter()
2019 .map(|r| format!("{:?}", r))
2020 .collect::<Vec<_>>()
2021 .join("\n");
2022 let prompt = format!(
2023 "Summarize the following conversation history concisely while preserving all key decisions, facts, and outcomes:\n\n{}",
2024 history_text
2025 );
2026
2027 let context = AgentContextSnapshot {
2028 session_id: snapshot.session_id.clone(),
2029 granted_scopes: Vec::new(),
2030 trigger_id: "internal".to_string(),
2031 idempotency_key: None,
2032 current_step: 0,
2033 max_steps: 0,
2034 history: snapshot.records.clone(),
2035 prior_tool_results: snapshot.tool_results(),
2036 session_cost_usd: 0.0,
2037 state: snapshot.agent_state(),
2038 policy: policy.clone(),
2039 active_execution_plan: snapshot.active_execution_plan(),
2040 };
2041
2042 let request = ProviderRequest {
2043 trigger: trigger.clone(),
2044 context,
2045 available_skills: self.skill_definitions().await,
2046 config: ProviderRequestConfig {
2047 model: None,
2048 temperature: Some(0.0),
2049 max_tokens: Some(500),
2050 },
2051 policy: policy.clone(),
2052 contents: vec![ProviderMessage {
2053 role: ProviderRole::User,
2054 parts: vec![ProviderContentPart::Text(prompt)],
2055 }],
2056 };
2057
2058 let decision = self
2059 .llm
2060 .generate_action(request)
2061 .await
2062 .map_err(|e| EngineError::Provider(e.to_string()))?;
2063
2064 if let AgentAction::Respond { content } = decision.action {
2065 Ok(SummaryRecord {
2066 summary_id: format!("summary-{}", Uuid::new_v4()),
2067 created_at: SystemTime::now(),
2068 content,
2069 original_sequence_range: (0, snapshot.records.len()),
2070 })
2071 } else {
2072 Err(EngineError::Provider(
2073 "Failed to generate summary".to_string(),
2074 ))
2075 }
2076 }
2077}
2078
2079fn classify_trigger_intent(trigger: &AgentTrigger) -> String {
2080 match trigger {
2081 AgentTrigger::ExternalEvent { source, .. } => format!("external_event:{source}"),
2082 AgentTrigger::ScheduledWake { .. } => "scheduled_wake".to_string(),
2083 AgentTrigger::HumanInput { content, .. } | AgentTrigger::Message { content, .. } => {
2084 let lowered = content.to_lowercase();
2085 if lowered.contains("approve") || lowered.contains("permission") {
2086 "approval_or_permission".to_string()
2087 } else if lowered.contains("fix")
2088 || lowered.contains("change")
2089 || lowered.contains("write")
2090 {
2091 "task_execution".to_string()
2092 } else if lowered.contains("what") || lowered.contains("why") || lowered.contains("how")
2093 {
2094 "question_answering".to_string()
2095 } else {
2096 "conversation".to_string()
2097 }
2098 }
2099 AgentTrigger::SystemObservation { source, .. } => format!("system_observation:{source}"),
2100 AgentTrigger::Webhook { source, .. } => format!("webhook:{source}"),
2101 AgentTrigger::RuleTrigger { rule_id, .. } => format!("rule:{rule_id}"),
2102 AgentTrigger::ProactiveHeartbeat { .. } => "heartbeat".to_string(),
2103 AgentTrigger::Approval { decision, .. } => format!("approval:{decision:?}"),
2104 AgentTrigger::DelegationResult { .. } => "delegation_result".to_string(),
2105 }
2106}
2107
2108fn action_metric_label(action: &AgentAction) -> &'static str {
2109 match action {
2110 AgentAction::Plan { .. } => "plan",
2111 AgentAction::Respond { .. } => "respond",
2112 AgentAction::CallSkills(_) => "call_skills",
2113 AgentAction::Continue { .. } => "continue",
2114 AgentAction::Yield { .. } => "yield",
2115 AgentAction::MemorySearch { .. } => "memory_search",
2116 AgentAction::MemoryArchive { .. } => "memory_archive",
2117 AgentAction::Suspend { .. } => "suspend",
2118 AgentAction::Delegate { .. } => "delegate",
2119 }
2120}
2121
2122fn reflection_observations(
2123 snapshot: &AgentContextSnapshot,
2124 outcome: &OutcomeRecord,
2125) -> Vec<String> {
2126 let tool_results = snapshot
2127 .history
2128 .iter()
2129 .filter(|record| matches!(record, SessionRecord::ToolResult(_)))
2130 .count();
2131 let failed_tools = snapshot
2132 .history
2133 .iter()
2134 .filter(|record| match record {
2135 SessionRecord::ToolResult(result) => result.output.is_err(),
2136 _ => false,
2137 })
2138 .count();
2139 let provider_cost = snapshot.session_cost_usd;
2140
2141 vec![
2142 format!("terminal_stop_reason={:?}", outcome.stop_reason),
2143 format!("steps_executed={}", outcome.steps_executed),
2144 format!("tool_results={tool_results}"),
2145 format!("failed_tool_results={failed_tools}"),
2146 format!("estimated_session_cost_usd={provider_cost:.6}"),
2147 ]
2148}
2149
2150fn summarize_tool_performance(records: &[SessionRecord]) -> Vec<ToolPerformanceRecord> {
2151 let calls = records
2152 .iter()
2153 .filter_map(|record| match record {
2154 SessionRecord::ToolCall(call) => Some((call.call_id.clone(), call)),
2155 _ => None,
2156 })
2157 .collect::<HashMap<_, _>>();
2158 let mut grouped = HashMap::<String, (String, usize, usize, usize)>::new();
2159
2160 for record in records {
2161 let SessionRecord::ToolResult(result) = record else {
2162 continue;
2163 };
2164 let backend = calls
2165 .get(&result.call_id)
2166 .map(|call| format!("{:?}", call.backend_kind))
2167 .unwrap_or_else(|| "unknown".to_string());
2168 let entry = grouped
2169 .entry(result.skill_name.clone())
2170 .or_insert((backend, 0, 0, 0));
2171 entry.1 += 1;
2172 if result.output.is_ok() {
2173 entry.2 += 1;
2174 } else {
2175 entry.3 += 1;
2176 }
2177 }
2178
2179 grouped
2180 .into_iter()
2181 .map(
2182 |(skill_name, (backend_kind, calls, successes, failures))| ToolPerformanceRecord {
2183 performance_id: format!("tool-performance-{}", Uuid::new_v4()),
2184 created_at: SystemTime::now(),
2185 skill_name,
2186 backend_kind,
2187 calls,
2188 successes,
2189 failures,
2190 failure_rate: if calls == 0 {
2191 0.0
2192 } else {
2193 failures as f64 / calls as f64
2194 },
2195 },
2196 )
2197 .collect()
2198}
2199
2200fn terminal_observation_count(records: &[SessionRecord]) -> usize {
2201 records
2202 .iter()
2203 .filter(|record| matches!(record, SessionRecord::Outcome(_)))
2204 .count()
2205}
2206
2207fn maybe_rollback_regression(
2208 snapshot: &AgentContextSnapshot,
2209 outcome: &OutcomeRecord,
2210) -> Option<PolicyTuningRecord> {
2211 if !snapshot.policy.self_improvement.rollback_on_regression {
2212 return None;
2213 }
2214 if !matches!(
2217 outcome.stop_reason,
2218 StopReason::ProviderFailure | StopReason::DeadlineExceeded | StopReason::PolicyAborted
2219 ) {
2220 return None;
2221 }
2222 let active = SessionSnapshot {
2223 session_id: snapshot.session_id.clone(),
2224 records: snapshot.history.clone(),
2225 last_sequence_no: None,
2226 latest_outcome: Some(outcome.clone()),
2227 }
2228 .active_policy_overlay()?;
2229
2230 let mut projected_policy = snapshot.policy.clone();
2231 projected_policy.self_improvement = snapshot.policy.self_improvement.clone();
2232 Some(PolicyTuningRecord {
2233 tuning_id: format!("tuning-{}", Uuid::new_v4()),
2234 created_at: SystemTime::now(),
2235 overlay: PolicyOverlay {
2236 status: PolicyOverlayStatus::RolledBack,
2237 reason: format!(
2238 "Regression detected after overlay {}; rolling back for future advances.",
2239 active.overlay_id
2240 ),
2241 ..active
2242 },
2243 action: PolicyTuningAction::RolledBack,
2244 prior_policy: snapshot.policy.clone(),
2245 projected_policy,
2246 })
2247}
2248
2249fn propose_policy_tuning(
2250 snapshot: &AgentContextSnapshot,
2251 outcome: &OutcomeRecord,
2252) -> Option<PolicyTuningRecord> {
2253 let improvement = &snapshot.policy.self_improvement;
2254 let mut patch = PolicyOverlayPatch::default();
2255 let mut reason = None::<String>;
2256 let delta = improvement.max_policy_delta_percent.clamp(1.0, 100.0);
2257
2258 match outcome.stop_reason {
2259 StopReason::ProviderFailure
2260 if outcome
2261 .detail
2262 .as_deref()
2263 .map(|detail| detail.to_ascii_lowercase().contains("timeout"))
2264 .unwrap_or(false) =>
2265 {
2266 patch.provider_timeout_ms = Some(increase_by_percent(
2267 snapshot.policy.provider_timeout_ms,
2268 delta,
2269 ));
2270 reason = Some(
2271 "Provider timed out; increasing provider timeout within guardrails.".to_string(),
2272 );
2273 }
2274 StopReason::MaxStepsReached => {
2275 let multiplier = terminal_observation_count(&snapshot.history).max(1) as f64;
2278 let aggressive_delta = (delta * multiplier).min(200.0); patch.max_steps = Some(increase_usize_by_percent(
2280 snapshot.policy.max_steps,
2281 aggressive_delta,
2282 ));
2283 reason = Some(
2284 "Session hit max steps; increasing future step budget within guardrails."
2285 .to_string(),
2286 );
2287 }
2288 StopReason::DeadlineExceeded => {
2289 patch.max_execution_time_ms = Some(increase_by_percent(
2290 snapshot.policy.max_execution_time_ms,
2291 delta,
2292 ));
2293 reason = Some("Execution deadline was reached; increasing future wall-clock budget within guardrails.".to_string());
2294 }
2295 StopReason::PolicyAborted
2296 if outcome
2297 .detail
2298 .as_deref()
2299 .map(|detail| detail.contains("cost"))
2300 .unwrap_or(false) =>
2301 {
2302 reason = Some(
2303 "Cost limit was reached; automatic cost-limit increases are blocked.".to_string(),
2304 );
2305 }
2306 _ => {}
2307 }
2308
2309 let reason = reason?;
2310 let mut overlay = PolicyOverlay {
2311 overlay_id: format!("overlay-{}", Uuid::new_v4()),
2312 created_at: SystemTime::now(),
2313 status: match improvement.mode {
2314 SelfImprovementMode::Advisory => PolicyOverlayStatus::Proposed,
2315 SelfImprovementMode::AutoWithGuardrails => PolicyOverlayStatus::Applied,
2316 SelfImprovementMode::Shadow => PolicyOverlayStatus::Proposed, },
2318 reason,
2319 evidence_window_records: snapshot.history.len(),
2320 patch,
2321 confidence: 0.74,
2322 rollback_condition:
2323 "Rollback if the next terminal outcome regresses to a policy/provider failure."
2324 .to_string(),
2325 };
2326
2327 let action = if outcome
2328 .detail
2329 .as_deref()
2330 .map(|detail| detail.contains("cost"))
2331 .unwrap_or(false)
2332 {
2333 overlay.status = PolicyOverlayStatus::Rejected;
2334 PolicyTuningAction::RejectedUnsafe
2335 } else {
2336 match improvement.mode {
2337 SelfImprovementMode::Advisory => PolicyTuningAction::Proposed,
2338 SelfImprovementMode::Shadow => {
2339 overlay.status = PolicyOverlayStatus::Proposed; PolicyTuningAction::Proposed
2341 }
2342 SelfImprovementMode::AutoWithGuardrails => PolicyTuningAction::Applied,
2343 }
2344 };
2345
2346 let mut projected_policy = snapshot.policy.clone();
2347 overlay.apply_to(&mut projected_policy);
2348
2349 Some(PolicyTuningRecord {
2350 tuning_id: format!("tuning-{}", Uuid::new_v4()),
2351 created_at: SystemTime::now(),
2352 overlay,
2353 action,
2354 prior_policy: snapshot.policy.clone(),
2355 projected_policy,
2356 })
2357}
2358
2359fn increase_by_percent(value: u64, percent: f64) -> u64 {
2360 ((value.max(1) as f64) * (1.0 + percent / 100.0)).ceil() as u64
2361}
2362
2363fn increase_usize_by_percent(value: usize, percent: f64) -> usize {
2364 ((value.max(1) as f64) * (1.0 + percent / 100.0)).ceil() as usize
2365}
2366
2367fn existing_or_new_graph(
2368 context: &AgentContext,
2369 step: usize,
2370 calls: &[PlannedSkillCall],
2371) -> ToolExecutionGraph {
2372 let call_ids = calls
2373 .iter()
2374 .map(|call| call.call_id.as_str())
2375 .collect::<BTreeSet<_>>();
2376 if let Some(graph) = context
2377 .records
2378 .iter()
2379 .rev()
2380 .find_map(|record| match record {
2381 SessionRecord::ToolExecutionGraph(graph)
2382 if graph.step == step
2383 && graph
2384 .nodes
2385 .iter()
2386 .map(|node| node.call_id.as_str())
2387 .collect::<BTreeSet<_>>()
2388 == call_ids =>
2389 {
2390 Some(graph.clone())
2391 }
2392 _ => None,
2393 })
2394 {
2395 return graph;
2396 }
2397
2398 ToolExecutionGraph {
2399 graph_id: format!("{}:{step}", context.metadata.trigger_id),
2400 trigger_id: context.metadata.trigger_id.clone(),
2401 step,
2402 created_at: SystemTime::now(),
2403 nodes: calls
2404 .iter()
2405 .enumerate()
2406 .map(|(provider_order, call)| ToolNode {
2407 call_id: call.call_id.clone(),
2408 skill_name: call.name.clone(),
2409 args: call.args.clone(),
2410 priority: call.priority,
2411 dependencies: call
2412 .depends_on
2413 .iter()
2414 .map(|call_id| ToolDependency {
2415 call_id: call_id.clone(),
2416 })
2417 .collect(),
2418 retry_policy: call.retry_policy.clone(),
2419 dry_run: call.dry_run,
2420 provider_order,
2421 })
2422 .collect(),
2423 }
2424}
2425
2426fn latest_tool_statuses(context: &AgentContext, graph_id: &str) -> HashMap<String, ToolNodeStatus> {
2427 let mut statuses = HashMap::new();
2428 for record in &context.records {
2429 if let SessionRecord::ToolNodeCheckpoint(checkpoint) = record
2430 && checkpoint.graph_id == graph_id
2431 {
2432 statuses.insert(checkpoint.call_id.clone(), checkpoint.status.clone());
2433 }
2434 }
2435 statuses
2436}
2437
2438fn started_attempt_counts(context: &AgentContext, graph_id: &str) -> HashMap<String, usize> {
2439 let mut attempts = HashMap::<String, usize>::new();
2440 for record in &context.records {
2441 if let SessionRecord::ToolNodeCheckpoint(checkpoint) = record
2442 && checkpoint.graph_id == graph_id
2443 && checkpoint.status == ToolNodeStatus::Started
2444 {
2445 let current = attempts.entry(checkpoint.call_id.clone()).or_default();
2446 *current = (*current).max(checkpoint.attempt);
2447 }
2448 }
2449 attempts
2450}
2451
2452fn ready_nodes(
2453 graph: &ToolExecutionGraph,
2454 status_by_call: &HashMap<String, ToolNodeStatus>,
2455) -> Vec<ToolNode> {
2456 let mut nodes = graph
2457 .nodes
2458 .iter()
2459 .filter(|node| !is_terminal_status(status_by_call.get(&node.call_id)))
2460 .filter(|node| {
2461 node.dependencies.iter().all(|dependency| {
2462 matches!(
2463 status_by_call.get(&dependency.call_id),
2464 Some(ToolNodeStatus::Succeeded)
2465 )
2466 })
2467 })
2468 .cloned()
2469 .collect::<Vec<_>>();
2470 nodes.sort_by(|left, right| {
2471 right
2472 .priority
2473 .cmp(&left.priority)
2474 .then(left.provider_order.cmp(&right.provider_order))
2475 .then(left.call_id.cmp(&right.call_id))
2476 });
2477 nodes
2478}
2479
2480fn is_terminal_status(status: Option<&ToolNodeStatus>) -> bool {
2481 matches!(
2482 status,
2483 Some(
2484 ToolNodeStatus::Succeeded
2485 | ToolNodeStatus::Failed
2486 | ToolNodeStatus::Skipped
2487 | ToolNodeStatus::TimedOut
2488 )
2489 )
2490}
2491
2492fn final_status_for_result(result: &ToolResultRecord) -> ToolNodeStatus {
2493 match &result.output {
2494 Ok(_) => ToolNodeStatus::Succeeded,
2495 Err(error) if error.kind == SkillFailureKind::Timeout => ToolNodeStatus::TimedOut,
2496 Err(_) => ToolNodeStatus::Failed,
2497 }
2498}
2499
2500fn result_detail(result: &ToolResultRecord) -> Option<String> {
2501 match &result.output {
2502 Ok(_) => None,
2503 Err(error) => Some(error.message.clone()),
2504 }
2505}
2506
2507fn validate_against_schema(value: &serde_json::Value, schema: &serde_json::Value) -> Vec<String> {
2508 let schema_type = schema.get("type").and_then(serde_json::Value::as_str);
2509 let mut errors = Vec::new();
2510 if let Some(schema_type) = schema_type
2511 && !json_type_matches(value, schema_type)
2512 {
2513 errors.push(format!("expected root type `{schema_type}`"));
2514 return errors;
2515 }
2516
2517 if schema_type == Some("object") {
2518 let Some(object) = value.as_object() else {
2519 return vec!["expected root object".to_string()];
2520 };
2521 if let Some(required) = schema.get("required").and_then(serde_json::Value::as_array) {
2522 for required_key in required.iter().filter_map(serde_json::Value::as_str) {
2523 if !object.contains_key(required_key) {
2524 errors.push(format!("missing required property `{required_key}`"));
2525 }
2526 }
2527 }
2528 if let Some(properties) = schema
2529 .get("properties")
2530 .and_then(serde_json::Value::as_object)
2531 {
2532 for (key, property_schema) in properties {
2533 let Some(property_value) = object.get(key) else {
2534 continue;
2535 };
2536 if let Some(property_type) = property_schema
2537 .get("type")
2538 .and_then(serde_json::Value::as_str)
2539 && !json_type_matches(property_value, property_type)
2540 {
2541 errors.push(format!("property `{key}` expected type `{property_type}`"));
2542 }
2543 }
2544 }
2545 }
2546 errors
2547}
2548
2549fn json_type_matches(value: &serde_json::Value, expected: &str) -> bool {
2550 match expected {
2551 "object" => value.is_object(),
2552 "array" => value.is_array(),
2553 "string" => value.is_string(),
2554 "number" => value.is_number(),
2555 "integer" => value.as_i64().is_some() || value.as_u64().is_some(),
2556 "boolean" => value.is_boolean(),
2557 "null" => value.is_null(),
2558 _ => true,
2559 }
2560}
2561
2562struct PreparedCall {
2563 call_id: String,
2564 name: String,
2565 args: serde_json::Value,
2566 manifest: SkillManifest,
2567 backend: RegisteredSkillBackend,
2568 context_snapshot: crate::AgentContextSnapshot,
2569 dry_run: bool,
2570 retry_policy: RetryPolicy,
2571}
2572
2573enum PreparedNode {
2574 Executable(Box<PreparedCall>),
2575 Immediate(ToolResultRecord),
2576}
2577
2578#[derive(Debug)]
2579struct ExecutedBatch {
2580 results: Vec<ToolResultRecord>,
2581 all_failed: bool,
2582}
2583
2584#[derive(Debug)]
2585enum BatchExecution {
2586 Executed(ExecutedBatch),
2587 Suspended {
2588 reason: SuspendReason,
2589 pending_calls: Vec<PlannedSkillCall>,
2590 resume_token: ResumeToken,
2591 },
2592}
2593
2594async fn run_prepared_call(prepared: PreparedCall) -> Result<ToolResultRecord, EngineError> {
2595 let started = Instant::now();
2596 let mut output = Err(SkillExecutionError::new(
2597 SkillFailureKind::Internal,
2598 "tool was not attempted",
2599 ));
2600 let mut current_interval_ms = prepared.retry_policy.initial_interval_ms;
2601 for attempt in 0..prepared.retry_policy.max_attempts {
2602 let invocation = SkillInvocation {
2603 call_id: prepared.call_id.clone(),
2604 manifest: prepared.manifest.clone(),
2605 args: prepared.args.clone(),
2606 context: prepared.context_snapshot.clone(),
2607 dry_run: prepared.dry_run,
2608 };
2609 output = match &prepared.backend {
2610 RegisteredSkillBackend::Wasm(executor) => executor.execute(invocation).await,
2611 RegisteredSkillBackend::Native(executor) => executor.execute(invocation).await,
2612 };
2613 if output.is_ok()
2614 || !is_retryable_skill_error(&output)
2615 || attempt + 1 >= prepared.retry_policy.max_attempts
2616 {
2617 break;
2618 }
2619 tokio::time::sleep(std::time::Duration::from_millis(current_interval_ms)).await;
2620 current_interval_ms =
2621 ((current_interval_ms as f64) * prepared.retry_policy.backoff_multiplier) as u64;
2622 current_interval_ms = current_interval_ms.min(prepared.retry_policy.max_interval_ms);
2623 }
2624 histogram!("rain_engine.tool_latency_seconds").record(started.elapsed().as_secs_f64());
2625
2626 let output = match output {
2627 Ok(value) => Ok(value),
2628 Err(err) => {
2629 match err.kind {
2630 SkillFailureKind::PermissionDenied | SkillFailureKind::CapabilityDenied => {
2631 counter!("rain_engine.permission_denials_total").increment(1);
2632 }
2633 SkillFailureKind::Trap | SkillFailureKind::MemoryLimitExceeded => {
2634 counter!("rain_engine.wasm_traps_total").increment(1);
2635 }
2636 SkillFailureKind::Timeout => {
2637 counter!("rain_engine.tool_timeouts_total").increment(1);
2638 }
2639 SkillFailureKind::InvalidArguments
2640 | SkillFailureKind::InvalidResponse
2641 | SkillFailureKind::Internal => {}
2642 }
2643 Err(SkillFailure {
2644 kind: err.kind,
2645 message: err.message,
2646 })
2647 }
2648 };
2649
2650 Ok(ToolResultRecord {
2651 call_id: prepared.call_id,
2652 finished_at: SystemTime::now(),
2653 skill_name: prepared.name,
2654 output,
2655 })
2656}
2657
2658fn is_retryable_skill_error(output: &Result<serde_json::Value, SkillExecutionError>) -> bool {
2659 matches!(
2660 output,
2661 Err(SkillExecutionError {
2662 kind: SkillFailureKind::Timeout | SkillFailureKind::Internal | SkillFailureKind::Trap,
2663 ..
2664 })
2665 )
2666}
2667
2668fn build_advance_result(
2669 outcome: EngineOutcome,
2670 emitted_events: Vec<KernelEventRecord>,
2671) -> AdvanceResult {
2672 let wake_request = emitted_events.iter().find_map(extract_wake_request);
2673 let state_delta = derive_state_delta(&emitted_events);
2674 AdvanceResult {
2675 outcome: Some(outcome),
2676 emitted_events,
2677 state_delta,
2678 wake_request,
2679 }
2680}
2681
2682fn extract_wake_request(event: &KernelEventRecord) -> Option<WakeRequestRecord> {
2683 match &event.event {
2684 KernelEvent::WakeRequested(wake) | KernelEvent::WakeScheduled(wake) => Some(wake.clone()),
2685 _ => None,
2686 }
2687}
2688
2689fn derive_state_delta(events: &[KernelEventRecord]) -> AgentStateDelta {
2690 let mut delta = AgentStateDelta::default();
2691 for event in events {
2692 match &event.event {
2693 KernelEvent::GoalCreated(goal) => delta.created_goal_ids.push(goal.goal_id.clone()),
2694 KernelEvent::TaskPlanned(task) => delta.updated_task_ids.push(task.task_id.clone()),
2695 KernelEvent::TaskClaimed { task_id, .. } => {
2696 delta.updated_task_ids.push(task_id.clone())
2697 }
2698 KernelEvent::TaskBlocked { task_id, .. }
2699 | KernelEvent::TaskCompleted { task_id, .. }
2700 | KernelEvent::TaskFailed { task_id, .. }
2701 | KernelEvent::TaskAbandoned { task_id, .. } => {
2702 delta.updated_task_ids.push(task_id.clone())
2703 }
2704 KernelEvent::HumanInputRequested { task_id, .. } => {
2705 if let Some(task_id) = task_id {
2706 delta.updated_task_ids.push(task_id.clone());
2707 }
2708 }
2709 KernelEvent::ObservationAppended(observation) => delta
2710 .observation_ids
2711 .push(observation.observation_id.clone()),
2712 KernelEvent::ArtifactProduced(artifact) => {
2713 delta.artifact_ids.push(artifact.artifact_id.clone())
2714 }
2715 KernelEvent::DelegationRequested(record) => delta
2716 .delegation_correlation_ids
2717 .push(record.correlation_id.clone()),
2718 KernelEvent::DelegationResolved { correlation_id, .. } => delta
2719 .delegation_correlation_ids
2720 .push(correlation_id.clone()),
2721 KernelEvent::WakeRequested(_)
2722 | KernelEvent::WakeScheduled(_)
2723 | KernelEvent::WakeCompleted { .. }
2724 | KernelEvent::ResourceRegistered(_)
2725 | KernelEvent::RelationshipObserved(_)
2726 | KernelEvent::MemorySearched { .. }
2727 | KernelEvent::MemoryArchived { .. } => {}
2728 }
2729 }
2730 delta
2731}
2732
2733fn derive_trigger_kernel_events(
2734 trigger_id: &str,
2735 trigger: &AgentTrigger,
2736) -> Vec<KernelEventRecord> {
2737 let mut events = Vec::new();
2738 let observed_at = SystemTime::now();
2739 let mut push_observation = |source: String,
2740 content: serde_json::Value,
2741 attachments: Vec<String>| {
2742 events.push(KernelEventRecord {
2743 event_id: format!("observation-{trigger_id}-{}", events.len()),
2744 occurred_at: observed_at,
2745 event: KernelEvent::ObservationAppended(crate::ObservationRecord {
2746 observation_id: crate::ObservationId(format!("{trigger_id}-obs-{}", events.len())),
2747 recorded_at: observed_at,
2748 source,
2749 content,
2750 attachment_ids: attachments,
2751 related_resources: Vec::new(),
2752 }),
2753 });
2754 };
2755
2756 match trigger {
2757 AgentTrigger::ExternalEvent {
2758 source,
2759 payload,
2760 attachments,
2761 } => push_observation(
2762 format!("external:{source}"),
2763 payload.clone(),
2764 attachments
2765 .iter()
2766 .map(|attachment| attachment.attachment_id.clone())
2767 .collect(),
2768 ),
2769 AgentTrigger::HumanInput {
2770 actor_id,
2771 content,
2772 attachments,
2773 } => push_observation(
2774 format!("human:{actor_id}"),
2775 serde_json::json!({ "content": content }),
2776 attachments
2777 .iter()
2778 .map(|attachment| attachment.attachment_id.clone())
2779 .collect(),
2780 ),
2781 AgentTrigger::SystemObservation {
2782 source,
2783 observation,
2784 attachments,
2785 } => push_observation(
2786 format!("system:{source}"),
2787 observation.clone(),
2788 attachments
2789 .iter()
2790 .map(|attachment| attachment.attachment_id.clone())
2791 .collect(),
2792 ),
2793 AgentTrigger::Webhook {
2794 source,
2795 payload,
2796 attachments,
2797 } => push_observation(
2798 format!("webhook:{source}"),
2799 payload.clone(),
2800 attachments
2801 .iter()
2802 .map(|attachment| attachment.attachment_id.clone())
2803 .collect(),
2804 ),
2805 AgentTrigger::RuleTrigger {
2806 rule_id,
2807 context,
2808 attachments,
2809 } => push_observation(
2810 format!("rule:{rule_id}"),
2811 context.clone(),
2812 attachments
2813 .iter()
2814 .map(|attachment| attachment.attachment_id.clone())
2815 .collect(),
2816 ),
2817 AgentTrigger::ProactiveHeartbeat { timestamp, .. } => push_observation(
2818 "heartbeat".to_string(),
2819 serde_json::json!({ "timestamp": timestamp }),
2820 Vec::new(),
2821 ),
2822 AgentTrigger::ScheduledWake {
2823 wake_id, reason, ..
2824 } => events.push(KernelEventRecord {
2825 event_id: format!("wake-completed-{trigger_id}"),
2826 occurred_at: observed_at,
2827 event: KernelEvent::WakeCompleted {
2828 wake_id: wake_id.clone(),
2829 reason: reason.clone(),
2830 completed_at: observed_at,
2831 },
2832 }),
2833 AgentTrigger::Message {
2834 user_id,
2835 content,
2836 attachments,
2837 } => push_observation(
2838 format!("message:{user_id}"),
2839 serde_json::json!({ "content": content }),
2840 attachments
2841 .iter()
2842 .map(|attachment| attachment.attachment_id.clone())
2843 .collect(),
2844 ),
2845 AgentTrigger::DelegationResult {
2846 correlation_id,
2847 payload,
2848 metadata,
2849 } => events.push(KernelEventRecord {
2850 event_id: format!("delegation-resolved-{trigger_id}"),
2851 occurred_at: observed_at,
2852 event: KernelEvent::DelegationResolved {
2853 correlation_id: correlation_id.clone(),
2854 resolved_at: observed_at,
2855 payload: payload.clone(),
2856 metadata: metadata.clone(),
2857 },
2858 }),
2859 AgentTrigger::Approval { .. } => {}
2860 }
2861
2862 events
2863}
2864
2865fn build_provider_contents(
2866 trigger: &AgentTrigger,
2867 history: &[SessionRecord],
2868 active_plan: Option<&ExecutionPlanRecord>,
2869) -> Vec<ProviderMessage> {
2870 let mut messages = Vec::new();
2871 let mut intent_by_trigger = HashMap::<String, String>::new();
2872 for record in history {
2873 match record {
2874 SessionRecord::Trigger(trigger) => {
2875 if let Some(intent) = &trigger.intent {
2876 intent_by_trigger.insert(trigger.trigger_id.clone(), intent.clone());
2877 }
2878 }
2879 SessionRecord::TriggerIntent(intent) => {
2880 intent_by_trigger.insert(intent.trigger_id.clone(), intent.intent.clone());
2881 }
2882 _ => {}
2883 }
2884 }
2885
2886 let max_recent_items = 32;
2889 let (truncated_history, truncated) = if history.len() > max_recent_items {
2890 (&history[history.len() - max_recent_items..], true)
2891 } else {
2892 (history, false)
2893 };
2894
2895 if truncated {
2896 messages.push(ProviderMessage {
2897 role: ProviderRole::System,
2898 parts: vec![ProviderContentPart::Text(
2899 "Notice: Older session history has been paged out to Archival Memory to save space. \
2900 Use the `MemorySearch` action to retrieve specific past interactions if you need them.".to_string(),
2901 )],
2902 });
2903 }
2904
2905 for record in truncated_history {
2907 match record {
2908 SessionRecord::Trigger(t) => {
2909 let mut parts = build_trigger_parts(&t.trigger);
2910 if let Some(intent) = t
2911 .intent
2912 .as_ref()
2913 .or_else(|| intent_by_trigger.get(&t.trigger_id))
2914 {
2915 parts.push(ProviderContentPart::Text(format!(
2916 "Classified intent: {intent}"
2917 )));
2918 }
2919 messages.push(ProviderMessage {
2920 role: ProviderRole::User,
2921 parts,
2922 });
2923 }
2924 SessionRecord::Summary(s) => {
2925 messages.push(ProviderMessage {
2926 role: ProviderRole::Assistant,
2927 parts: vec![ProviderContentPart::Text(format!(
2928 "Summary of prior history: {}",
2929 s.content
2930 ))],
2931 });
2932 }
2933 SessionRecord::ModelDecision(d) => match &d.action {
2934 AgentAction::Respond { content } => {
2935 messages.push(ProviderMessage {
2936 role: ProviderRole::Assistant,
2937 parts: vec![ProviderContentPart::Text(content.clone())],
2938 });
2939 }
2940 AgentAction::CallSkills(calls) => {
2941 messages.push(ProviderMessage {
2943 role: ProviderRole::Assistant,
2944 parts: vec![ProviderContentPart::Json(
2945 serde_json::to_value(calls).unwrap_or_default(),
2946 )],
2947 });
2948 }
2949 _ => {}
2950 },
2951 SessionRecord::ToolResult(r) => {
2952 messages.push(ProviderMessage {
2953 role: ProviderRole::Tool,
2954 parts: vec![ProviderContentPart::ToolResult(r.clone())],
2955 });
2956 }
2957 _ => {}
2958 }
2959 }
2960
2961 messages.push(ProviderMessage {
2963 role: ProviderRole::User,
2964 parts: build_trigger_parts(trigger),
2965 });
2966
2967 if let Some(plan) = active_plan {
2969 messages.push(ProviderMessage {
2970 role: ProviderRole::System,
2971 parts: vec![ProviderContentPart::Text(format!(
2972 "You are currently executing an active, multi-step plan.\n\nOBJECTIVE: {}\n\nYou are on step {} of {}.\n\nPlease stay focused on the objective and execute the necessary actions for this specific step.",
2973 plan.objective,
2974 plan.current_step_index + 1,
2975 plan.steps.len()
2976 ))],
2977 });
2978 }
2979
2980 messages
2981}
2982
2983fn build_trigger_parts(trigger: &AgentTrigger) -> Vec<ProviderContentPart> {
2984 let mut parts = Vec::new();
2985 match trigger {
2986 AgentTrigger::ExternalEvent {
2987 source,
2988 payload,
2989 attachments,
2990 } => {
2991 parts.push(ProviderContentPart::Text(format!(
2992 "external event source: {source}"
2993 )));
2994 parts.push(ProviderContentPart::Json(payload.clone()));
2995 parts.extend(
2996 attachments
2997 .iter()
2998 .cloned()
2999 .map(ProviderContentPart::Attachment),
3000 );
3001 }
3002 AgentTrigger::ScheduledWake {
3003 wake_id,
3004 due_at,
3005 reason,
3006 } => {
3007 parts.push(ProviderContentPart::Text(format!(
3008 "scheduled wake {} due at {:?}: {reason}",
3009 wake_id.0, due_at
3010 )));
3011 }
3012 AgentTrigger::HumanInput {
3013 actor_id,
3014 content,
3015 attachments,
3016 } => {
3017 parts.push(ProviderContentPart::Text(format!(
3018 "human actor: {actor_id}"
3019 )));
3020 parts.push(ProviderContentPart::Text(content.clone()));
3021 parts.extend(
3022 attachments
3023 .iter()
3024 .cloned()
3025 .map(ProviderContentPart::Attachment),
3026 );
3027 }
3028 AgentTrigger::SystemObservation {
3029 source,
3030 observation,
3031 attachments,
3032 } => {
3033 parts.push(ProviderContentPart::Text(format!(
3034 "system observation source: {source}"
3035 )));
3036 parts.push(ProviderContentPart::Json(observation.clone()));
3037 parts.extend(
3038 attachments
3039 .iter()
3040 .cloned()
3041 .map(ProviderContentPart::Attachment),
3042 );
3043 }
3044 AgentTrigger::Webhook {
3045 source,
3046 payload,
3047 attachments,
3048 } => {
3049 parts.push(ProviderContentPart::Text(format!(
3050 "webhook source: {source}"
3051 )));
3052 parts.push(ProviderContentPart::Json(payload.clone()));
3053 parts.extend(
3054 attachments
3055 .iter()
3056 .cloned()
3057 .map(ProviderContentPart::Attachment),
3058 );
3059 }
3060 AgentTrigger::RuleTrigger {
3061 rule_id,
3062 context,
3063 attachments,
3064 } => {
3065 parts.push(ProviderContentPart::Text(format!(
3066 "rule trigger: {rule_id}"
3067 )));
3068 parts.push(ProviderContentPart::Json(context.clone()));
3069 parts.extend(
3070 attachments
3071 .iter()
3072 .cloned()
3073 .map(ProviderContentPart::Attachment),
3074 );
3075 }
3076 AgentTrigger::ProactiveHeartbeat {
3077 timestamp,
3078 attachments,
3079 } => {
3080 parts.push(ProviderContentPart::Text(format!(
3081 "heartbeat timestamp: {timestamp}"
3082 )));
3083 parts.extend(
3084 attachments
3085 .iter()
3086 .cloned()
3087 .map(ProviderContentPart::Attachment),
3088 );
3089 }
3090 AgentTrigger::Message {
3091 user_id,
3092 content,
3093 attachments,
3094 } => {
3095 parts.push(ProviderContentPart::Text(format!("user_id: {user_id}")));
3096 parts.push(ProviderContentPart::Text(content.clone()));
3097 parts.extend(
3098 attachments
3099 .iter()
3100 .cloned()
3101 .map(ProviderContentPart::Attachment),
3102 );
3103 }
3104 AgentTrigger::Approval {
3105 resume_token,
3106 decision,
3107 metadata,
3108 } => {
3109 parts.push(ProviderContentPart::Text(format!(
3110 "approval for resume token {}: {:?}",
3111 resume_token.as_str(),
3112 decision
3113 )));
3114 parts.push(ProviderContentPart::Json(metadata.clone()));
3115 }
3116 AgentTrigger::DelegationResult {
3117 correlation_id,
3118 payload,
3119 metadata,
3120 } => {
3121 parts.push(ProviderContentPart::Text(format!(
3122 "delegation result for correlation {}",
3123 correlation_id.as_str()
3124 )));
3125 parts.push(ProviderContentPart::Json(payload.clone()));
3126 parts.push(ProviderContentPart::Json(metadata.clone()));
3127 }
3128 }
3129 parts
3130}
3131
3132fn storage_failure_outcome(
3133 trigger_id: String,
3134 steps_executed: usize,
3135 detail: String,
3136) -> EngineOutcome {
3137 counter!("rain_engine.storage_failures_total").increment(1);
3138 EngineOutcome {
3139 trigger_id,
3140 stop_reason: StopReason::StorageFailure,
3141 response: None,
3142 detail: Some(detail),
3143 steps_executed,
3144 idempotent_replay: false,
3145 resume_token: None,
3146 }
3147}
3148
3149#[cfg(test)]
3150mod tests {
3151 use super::*;
3152 use crate::{
3153 AttachmentRef, EnginePolicy, InMemoryMemoryStore, MockLlmProvider, ProviderCacheRecord,
3154 ProviderDecision, ProviderError, ProviderErrorKind, ProviderUsageRecord, RecordPageQuery,
3155 ResourcePolicy, SessionListQuery, SessionSnapshot, SkillCapability, StopReason, TaskId,
3156 TaskRecord, TaskStatus, WakeId, WakeRequestRecord,
3157 };
3158 use serde_json::json;
3159 use std::sync::Mutex;
3160
3161 #[derive(Clone)]
3162 struct StubSkillExecutor {
3163 name: &'static str,
3164 responder: Arc<
3165 dyn Fn(SkillInvocation) -> Result<serde_json::Value, SkillExecutionError> + Send + Sync,
3166 >,
3167 }
3168
3169 #[async_trait]
3170 impl SkillExecutor for StubSkillExecutor {
3171 async fn execute(
3172 &self,
3173 invocation: SkillInvocation,
3174 ) -> Result<serde_json::Value, SkillExecutionError> {
3175 (self.responder)(invocation)
3176 }
3177
3178 fn executor_kind(&self) -> &'static str {
3179 self.name
3180 }
3181 }
3182
3183 #[derive(Clone)]
3184 struct StubNativeSkill {
3185 requires_approval: bool,
3186 responder: Arc<
3187 dyn Fn(SkillInvocation) -> Result<serde_json::Value, SkillExecutionError> + Send + Sync,
3188 >,
3189 }
3190
3191 #[async_trait]
3192 impl NativeSkill for StubNativeSkill {
3193 async fn execute(
3194 &self,
3195 invocation: SkillInvocation,
3196 ) -> Result<serde_json::Value, SkillExecutionError> {
3197 (self.responder)(invocation)
3198 }
3199
3200 fn requires_human_approval(&self) -> bool {
3201 self.requires_approval
3202 }
3203 }
3204
3205 fn manifest(name: &str, scopes: &[&str]) -> SkillManifest {
3206 SkillManifest {
3207 name: name.to_string(),
3208 description: format!("{name} description"),
3209 input_schema: json!({"type": "object"}),
3210 required_scopes: scopes.iter().map(|scope| scope.to_string()).collect(),
3211 capability_grants: vec![SkillCapability::StructuredLog],
3212 resource_policy: ResourcePolicy::default_for_tools(),
3213 approval_required: false,
3214 circuit_breaker_threshold: 0.5,
3215 }
3216 }
3217
3218 fn planned(call_id: &str, name: &str, args: serde_json::Value) -> PlannedSkillCall {
3219 PlannedSkillCall {
3220 call_id: call_id.to_string(),
3221 name: name.to_string(),
3222 args,
3223 priority: 0,
3224 depends_on: Vec::new(),
3225 retry_policy: Default::default(),
3226 dry_run: false,
3227 }
3228 }
3229
3230 fn message_trigger(content: &str) -> AgentTrigger {
3231 AgentTrigger::Message {
3232 user_id: "u1".to_string(),
3233 content: content.to_string(),
3234 attachments: Vec::new(),
3235 }
3236 }
3237
3238 async fn session(store: &Arc<InMemoryMemoryStore>, session_id: &str) -> SessionSnapshot {
3239 store
3240 .load_session(session_id)
3241 .await
3242 .expect("session snapshot")
3243 }
3244
3245 async fn run_until_terminal(
3246 engine: &AgentEngine,
3247 request: ProcessRequest,
3248 ) -> Result<EngineOutcome, EngineError> {
3249 let mut next = AdvanceRequest::Trigger(request.clone());
3250 loop {
3251 let result = engine.advance(next).await?;
3252 if let Some(outcome) = result.outcome {
3253 return Ok(outcome);
3254 }
3255 next = AdvanceRequest::Continue(ContinueRequest {
3256 session_id: request.session_id.clone(),
3257 granted_scopes: request.granted_scopes.clone(),
3258 policy: request.policy.clone(),
3259 provider: request.provider.clone(),
3260 cancellation: request.cancellation.clone(),
3261 });
3262 }
3263 }
3264
3265 #[tokio::test]
3266 async fn webhook_trigger_with_attachment_responds() {
3267 let store = Arc::new(InMemoryMemoryStore::new());
3268 let llm = Arc::new(MockLlmProvider::scripted(vec![AgentAction::Respond {
3269 content: "done".to_string(),
3270 }]));
3271 let engine = AgentEngine::new(llm, store.clone());
3272
3273 let outcome = run_until_terminal(
3274 &engine,
3275 ProcessRequest::new(
3276 "session-1",
3277 AgentTrigger::Webhook {
3278 source: "github".to_string(),
3279 payload: json!({"issue": 42}),
3280 attachments: vec![AttachmentRef::inline(
3281 "att-1",
3282 "image/png",
3283 Some("schema.png".to_string()),
3284 vec![1, 2, 3],
3285 )],
3286 },
3287 ),
3288 )
3289 .await
3290 .expect("outcome");
3291
3292 assert_eq!(outcome.stop_reason, StopReason::Responded);
3293 let snapshot = session(&store, "session-1").await;
3294 assert!(matches!(
3295 snapshot.records.first(),
3296 Some(SessionRecord::Trigger(_))
3297 ));
3298 }
3299
3300 #[tokio::test]
3301 async fn advance_executes_one_progression_step() {
3302 let store = Arc::new(InMemoryMemoryStore::new());
3303 let llm = Arc::new(MockLlmProvider::scripted(vec![
3304 AgentAction::CallSkills(vec![PlannedSkillCall {
3305 call_id: "call-1".to_string(),
3306 name: "echo".to_string(),
3307 args: json!({"value": 1}),
3308 priority: 0,
3309 depends_on: Vec::new(),
3310 retry_policy: Default::default(),
3311 dry_run: false,
3312 }]),
3313 AgentAction::Respond {
3314 content: "done".to_string(),
3315 },
3316 ]));
3317 let engine = AgentEngine::new(llm, store.clone());
3318 engine.register_wasm_skill(
3319 manifest("echo", &["tool:run"]),
3320 Arc::new(StubSkillExecutor {
3321 name: "stub",
3322 responder: Arc::new(|invocation| Ok(json!({"echo": invocation.args}))),
3323 }),
3324 );
3325 let request =
3326 ProcessRequest::new("step-session", message_trigger("run")).with_scope("tool:run");
3327
3328 let first = engine
3329 .advance(AdvanceRequest::Trigger(request.clone()))
3330 .await
3331 .expect("first advance");
3332 assert!(first.outcome.is_none());
3333 assert_eq!(first.emitted_events.len(), 1);
3334
3335 let second = engine
3336 .advance(AdvanceRequest::Continue(ContinueRequest {
3337 session_id: request.session_id.clone(),
3338 granted_scopes: request.granted_scopes.clone(),
3339 policy: request.policy.clone(),
3340 provider: request.provider.clone(),
3341 cancellation: request.cancellation.clone(),
3342 }))
3343 .await
3344 .expect("second advance");
3345 assert_eq!(
3346 second.outcome.expect("terminal").stop_reason,
3347 StopReason::Responded
3348 );
3349 assert_eq!(
3350 store
3351 .load_session("step-session")
3352 .await
3353 .unwrap()
3354 .tool_results()
3355 .len(),
3356 1
3357 );
3358 }
3359
3360 #[tokio::test]
3361 async fn replay_projects_task_transitions() {
3362 let store = Arc::new(InMemoryMemoryStore::new());
3363 let now = SystemTime::now();
3364 let task_id = TaskId("task-1".to_string());
3365 store
3366 .append_kernel_event(
3367 "projection-session",
3368 KernelEventRecord {
3369 event_id: "task-planned".to_string(),
3370 occurred_at: now,
3371 event: KernelEvent::TaskPlanned(TaskRecord {
3372 task_id: task_id.clone(),
3373 goal_id: None,
3374 parent_task_id: None,
3375 created_at: now,
3376 title: "triage".to_string(),
3377 detail: None,
3378 status: TaskStatus::Ready,
3379 assignee: None,
3380 blocked_by: Vec::new(),
3381 }),
3382 },
3383 )
3384 .await
3385 .expect("planned");
3386 store
3387 .append_kernel_event(
3388 "projection-session",
3389 KernelEventRecord {
3390 event_id: "task-done".to_string(),
3391 occurred_at: now,
3392 event: KernelEvent::TaskCompleted {
3393 task_id: task_id.clone(),
3394 completed_at: now,
3395 artifact_ids: Vec::new(),
3396 },
3397 },
3398 )
3399 .await
3400 .expect("completed");
3401
3402 let state = store
3403 .load_session("projection-session")
3404 .await
3405 .expect("snapshot")
3406 .agent_state();
3407 assert_eq!(state.tasks[0].status, TaskStatus::Done);
3408 }
3409
3410 #[tokio::test]
3411 async fn scheduled_wake_trigger_completes_pending_wake() {
3412 let store = Arc::new(InMemoryMemoryStore::new());
3413 let now = SystemTime::now();
3414 let wake_id = WakeId("wake-1".to_string());
3415 store
3416 .append_kernel_event(
3417 "wake-session",
3418 KernelEventRecord {
3419 event_id: "wake-scheduled".to_string(),
3420 occurred_at: now,
3421 event: KernelEvent::WakeScheduled(WakeRequestRecord {
3422 wake_id: wake_id.clone(),
3423 requested_at: now,
3424 due_at: now,
3425 reason: "check later".to_string(),
3426 task_id: None,
3427 }),
3428 },
3429 )
3430 .await
3431 .expect("scheduled");
3432 assert!(
3433 store
3434 .load_session("wake-session")
3435 .await
3436 .expect("snapshot")
3437 .agent_state()
3438 .pending_wake
3439 .is_some()
3440 );
3441
3442 let llm = Arc::new(MockLlmProvider::scripted(vec![AgentAction::Yield {
3443 reason: Some("wake handled".to_string()),
3444 }]));
3445 let engine = AgentEngine::new(llm, store.clone());
3446 let outcome = run_until_terminal(
3447 &engine,
3448 ProcessRequest::new(
3449 "wake-session",
3450 AgentTrigger::ScheduledWake {
3451 wake_id: wake_id.clone(),
3452 due_at: now,
3453 reason: "check later".to_string(),
3454 },
3455 ),
3456 )
3457 .await
3458 .expect("outcome");
3459 assert_eq!(outcome.stop_reason, StopReason::Yielded);
3460 let snapshot = store.load_session("wake-session").await.expect("snapshot");
3461 assert!(snapshot.agent_state().pending_wake.is_none());
3462 assert!(snapshot.records.iter().any(|record| matches!(
3463 record,
3464 SessionRecord::KernelEvent(KernelEventRecord {
3465 event: KernelEvent::WakeCompleted { wake_id: completed, .. },
3466 ..
3467 }) if completed == &wake_id
3468 )));
3469 }
3470
3471 #[tokio::test]
3472 async fn duplicate_idempotency_key_reuses_prior_outcome() {
3473 let store = Arc::new(InMemoryMemoryStore::new());
3474 let llm = Arc::new(MockLlmProvider::scripted(vec![AgentAction::Respond {
3475 content: "first".to_string(),
3476 }]));
3477 let engine = AgentEngine::new(llm.clone(), store.clone());
3478 let request = ProcessRequest::new(
3479 "idempotent-session",
3480 AgentTrigger::Webhook {
3481 source: "github".to_string(),
3482 payload: json!({"action": "sync"}),
3483 attachments: Vec::new(),
3484 },
3485 )
3486 .with_idempotency_key("abc");
3487 let first = run_until_terminal(&engine, request.clone())
3488 .await
3489 .expect("first");
3490 let second = run_until_terminal(&engine, request).await.expect("second");
3491 assert_eq!(first.response, second.response);
3492 assert!(second.idempotent_replay);
3493 assert_eq!(llm.observed_inputs().len(), 1);
3494 }
3495
3496 #[tokio::test]
3497 async fn parallel_tool_calls_execute_and_aggregate() {
3498 let store = Arc::new(InMemoryMemoryStore::new());
3499 let llm = Arc::new(MockLlmProvider::dynamic(|input| {
3500 if input.context.prior_tool_results.is_empty() {
3501 Ok(ProviderDecision {
3502 action: AgentAction::CallSkills(vec![
3503 PlannedSkillCall {
3504 call_id: "call-1".to_string(),
3505 name: "first".to_string(),
3506 args: json!({"value": 1}),
3507 priority: 0,
3508 depends_on: Vec::new(),
3509 retry_policy: Default::default(),
3510 dry_run: false,
3511 },
3512 PlannedSkillCall {
3513 call_id: "call-2".to_string(),
3514 name: "second".to_string(),
3515 args: json!({"value": 2}),
3516 priority: 0,
3517 depends_on: Vec::new(),
3518 retry_policy: Default::default(),
3519 dry_run: false,
3520 },
3521 ]),
3522 usage: None,
3523 cache: None,
3524 })
3525 } else {
3526 Ok(ProviderDecision {
3527 action: AgentAction::Respond {
3528 content: "complete".to_string(),
3529 },
3530 usage: None,
3531 cache: None,
3532 })
3533 }
3534 }));
3535 let engine = AgentEngine::new(llm, store.clone());
3536 let order = Arc::new(Mutex::new(Vec::<String>::new()));
3537
3538 for skill_name in ["first", "second"] {
3539 let local = order.clone();
3540 engine.register_wasm_skill(
3541 manifest(skill_name, &["tool:run"]),
3542 Arc::new(StubSkillExecutor {
3543 name: "stub",
3544 responder: Arc::new(move |invocation| {
3545 local
3546 .lock()
3547 .expect("order lock")
3548 .push(invocation.call_id.clone());
3549 Ok(json!({"echo": invocation.args}))
3550 }),
3551 }),
3552 );
3553 }
3554
3555 let outcome = run_until_terminal(
3556 &engine,
3557 ProcessRequest::new("session-2", message_trigger("run")).with_scope("tool:run"),
3558 )
3559 .await
3560 .expect("outcome");
3561
3562 assert_eq!(outcome.stop_reason, StopReason::Responded);
3563 let snapshot = session(&store, "session-2").await;
3564 let tool_results = snapshot.tool_results();
3565 assert_eq!(tool_results.len(), 2);
3566 assert_eq!(order.lock().expect("order lock").len(), 2);
3567 }
3568
3569 #[tokio::test]
3570 async fn provider_metadata_records_are_persisted() {
3571 let store = Arc::new(InMemoryMemoryStore::new());
3572 let llm = Arc::new(MockLlmProvider::dynamic(|_| {
3573 Ok(ProviderDecision {
3574 action: AgentAction::Yield {
3575 reason: Some("done".to_string()),
3576 },
3577 usage: Some(ProviderUsageRecord {
3578 provider_name: "gemini".to_string(),
3579 recorded_at: SystemTime::now(),
3580 input_tokens: 100,
3581 output_tokens: 20,
3582 estimated_cost_usd: 0.25,
3583 cached_content_id: Some("cache-1".to_string()),
3584 }),
3585 cache: Some(ProviderCacheRecord {
3586 provider_name: "gemini".to_string(),
3587 cached_content_id: "cache-1".to_string(),
3588 token_count: 45_000,
3589 cached_at: SystemTime::now(),
3590 }),
3591 })
3592 }));
3593 let engine = AgentEngine::new(llm, store.clone());
3594
3595 let outcome = run_until_terminal(
3596 &engine,
3597 ProcessRequest::new("session-usage", message_trigger("hi")),
3598 )
3599 .await
3600 .expect("outcome");
3601 assert_eq!(outcome.stop_reason, StopReason::Yielded);
3602
3603 let snapshot = session(&store, "session-usage").await;
3604 assert!(
3605 snapshot
3606 .records
3607 .iter()
3608 .any(|record| matches!(record, SessionRecord::ProviderUsage(_)))
3609 );
3610 assert!(
3611 snapshot
3612 .records
3613 .iter()
3614 .any(|record| matches!(record, SessionRecord::ProviderCache(_)))
3615 );
3616 }
3617
3618 #[tokio::test]
3619 async fn approval_trigger_resumes_native_skill() {
3620 let store = Arc::new(InMemoryMemoryStore::new());
3621 let llm = Arc::new(MockLlmProvider::dynamic(|input| {
3622 if input
3623 .context
3624 .history
3625 .iter()
3626 .any(|record| matches!(record, SessionRecord::ToolResult(_)))
3627 {
3628 Ok(ProviderDecision {
3629 action: AgentAction::Respond {
3630 content: "approved-run".to_string(),
3631 },
3632 usage: None,
3633 cache: None,
3634 })
3635 } else {
3636 Ok(ProviderDecision {
3637 action: AgentAction::CallSkills(vec![PlannedSkillCall {
3638 call_id: "native-1".to_string(),
3639 name: "db_fix".to_string(),
3640 args: json!({"apply": true}),
3641 priority: 0,
3642 depends_on: Vec::new(),
3643 retry_policy: Default::default(),
3644 dry_run: false,
3645 }]),
3646 usage: None,
3647 cache: None,
3648 })
3649 }
3650 }));
3651 let engine = AgentEngine::new(llm, store.clone());
3652 engine.register_native_skill(
3653 SkillManifest {
3654 approval_required: true,
3655 ..manifest("db_fix", &["db:write"])
3656 },
3657 Arc::new(StubNativeSkill {
3658 requires_approval: true,
3659 responder: Arc::new(|_| Ok(json!({"status": "fixed"}))),
3660 }),
3661 );
3662
3663 let suspended = run_until_terminal(
3664 &engine,
3665 ProcessRequest::new("approval-session", message_trigger("fix")).with_scope("db:write"),
3666 )
3667 .await
3668 .expect("suspended");
3669 assert_eq!(suspended.stop_reason, StopReason::Suspended);
3670 let token = suspended.resume_token.expect("resume token");
3671
3672 let resumed = run_until_terminal(
3673 &engine,
3674 ProcessRequest::new(
3675 "approval-session",
3676 AgentTrigger::Approval {
3677 resume_token: token,
3678 decision: ApprovalDecision::Approved,
3679 metadata: json!({"approved_by": "human"}),
3680 },
3681 ),
3682 )
3683 .await
3684 .expect("resumed");
3685 assert_eq!(resumed.stop_reason, StopReason::Responded);
3686 assert_eq!(resumed.response.as_deref(), Some("approved-run"));
3687 }
3688
3689 #[tokio::test]
3690 async fn provider_failure_stops_with_explicit_reason() {
3691 let store = Arc::new(InMemoryMemoryStore::new());
3692 let llm = Arc::new(MockLlmProvider::dynamic(|_| {
3693 Err(ProviderError::new(
3694 ProviderErrorKind::Transport,
3695 "upstream unavailable",
3696 true,
3697 ))
3698 }));
3699 let engine = AgentEngine::new(llm, store);
3700
3701 let outcome = run_until_terminal(
3702 &engine,
3703 ProcessRequest::new("provider-failure", message_trigger("hello")),
3704 )
3705 .await
3706 .expect("outcome");
3707
3708 assert_eq!(outcome.stop_reason, StopReason::ProviderFailure);
3709 }
3710
3711 #[tokio::test]
3712 async fn self_improvement_applies_bounded_max_step_overlay() {
3713 let store = Arc::new(InMemoryMemoryStore::new());
3714 let llm = Arc::new(MockLlmProvider::scripted(vec![AgentAction::Respond {
3715 content: "unused".to_string(),
3716 }]));
3717 let engine = AgentEngine::new(llm, store.clone());
3718 let mut policy = EnginePolicy {
3719 max_steps: 0,
3720 ..EnginePolicy::default()
3721 };
3722 policy.self_improvement.min_observations_before_tuning = 1;
3723 policy.self_improvement.max_policy_delta_percent = 50.0;
3724
3725 let outcome = run_until_terminal(
3726 &engine,
3727 ProcessRequest::new("learning-max-steps", message_trigger("continue"))
3728 .with_policy(policy),
3729 )
3730 .await
3731 .expect("outcome");
3732
3733 assert_eq!(outcome.stop_reason, StopReason::MaxStepsReached);
3734 let snapshot = session(&store, "learning-max-steps").await;
3735 let overlay = snapshot.active_policy_overlay().expect("active overlay");
3736 assert_eq!(overlay.patch.max_steps, Some(2));
3737 assert!(
3738 snapshot
3739 .records
3740 .iter()
3741 .any(|record| matches!(record, SessionRecord::Reflection(_)))
3742 );
3743 assert!(
3744 snapshot
3745 .records
3746 .iter()
3747 .any(|record| matches!(record, SessionRecord::PolicyTuning(_)))
3748 );
3749 }
3750
3751 #[tokio::test]
3752 async fn self_improvement_rolls_back_overlay_after_regression() {
3753 let store = Arc::new(InMemoryMemoryStore::new());
3754 let llm = Arc::new(MockLlmProvider::dynamic(|_| {
3755 Err(ProviderError::new(
3756 ProviderErrorKind::Transport,
3757 "upstream unavailable",
3758 true,
3759 ))
3760 }));
3761 let engine = AgentEngine::new(llm, store.clone());
3762 let mut policy = EnginePolicy {
3763 max_steps: 0,
3764 ..EnginePolicy::default()
3765 };
3766 policy.self_improvement.min_observations_before_tuning = 1;
3767
3768 let first = run_until_terminal(
3769 &engine,
3770 ProcessRequest::new("learning-rollback", message_trigger("first"))
3771 .with_policy(policy.clone()),
3772 )
3773 .await
3774 .expect("first");
3775 assert_eq!(first.stop_reason, StopReason::MaxStepsReached);
3776 assert!(
3777 session(&store, "learning-rollback")
3778 .await
3779 .active_policy_overlay()
3780 .is_some()
3781 );
3782
3783 let second = run_until_terminal(
3784 &engine,
3785 ProcessRequest::new("learning-rollback", message_trigger("second")).with_policy(policy),
3786 )
3787 .await
3788 .expect("second");
3789 assert_eq!(second.stop_reason, StopReason::ProviderFailure);
3790
3791 let snapshot = session(&store, "learning-rollback").await;
3792 assert!(snapshot.active_policy_overlay().is_none());
3793 assert!(snapshot.records.iter().any(|record| matches!(
3794 record,
3795 SessionRecord::PolicyTuning(tuning)
3796 if tuning.action == PolicyTuningAction::RolledBack
3797 )));
3798 }
3799
3800 #[tokio::test]
3801 async fn self_improvement_records_tool_performance_and_strategy_preferences() {
3802 let store = Arc::new(InMemoryMemoryStore::new());
3803 let llm = Arc::new(MockLlmProvider::dynamic(|input| {
3804 if input.context.prior_tool_results.is_empty() {
3805 Ok(ProviderDecision {
3806 action: AgentAction::CallSkills(vec![PlannedSkillCall {
3807 call_id: "unstable-call".to_string(),
3808 name: "unstable".to_string(),
3809 args: json!({}),
3810 priority: 0,
3811 depends_on: Vec::new(),
3812 retry_policy: Default::default(),
3813 dry_run: false,
3814 }]),
3815 usage: None,
3816 cache: None,
3817 })
3818 } else {
3819 Ok(ProviderDecision {
3820 action: AgentAction::Respond {
3821 content: "finished".to_string(),
3822 },
3823 usage: None,
3824 cache: None,
3825 })
3826 }
3827 }));
3828 let engine = AgentEngine::new(llm, store.clone());
3829 engine.register_wasm_skill(
3830 manifest("unstable", &["tool:run"]),
3831 Arc::new(StubSkillExecutor {
3832 name: "unstable",
3833 responder: Arc::new(|_| {
3834 Err(SkillExecutionError::new(
3835 SkillFailureKind::Internal,
3836 "simulated failure",
3837 ))
3838 }),
3839 }),
3840 );
3841
3842 let mut policy = EnginePolicy::default();
3843 policy.self_improvement.min_observations_before_tuning = 1;
3844
3845 let outcome = run_until_terminal(
3846 &engine,
3847 ProcessRequest::new("learning-tools", message_trigger("run"))
3848 .with_scope("tool:run")
3849 .with_policy(policy),
3850 )
3851 .await
3852 .expect("outcome");
3853 assert_eq!(outcome.stop_reason, StopReason::Responded);
3854
3855 let snapshot = session(&store, "learning-tools").await;
3856 assert!(snapshot.records.iter().any(|record| matches!(
3857 record,
3858 SessionRecord::ToolPerformance(performance)
3859 if performance.skill_name == "unstable" && performance.failures >= 1
3860 )));
3861 assert!(snapshot.records.iter().any(|record| matches!(
3862 record,
3863 SessionRecord::StrategyPreference(preference)
3864 if preference.skill_name.as_deref() == Some("unstable")
3865 )));
3866 }
3867
3868 #[tokio::test]
3869 async fn list_sessions_and_record_pages_work() {
3870 let store = Arc::new(InMemoryMemoryStore::new());
3871 let llm = Arc::new(MockLlmProvider::scripted(vec![AgentAction::Respond {
3872 content: "ok".to_string(),
3873 }]));
3874 let engine = AgentEngine::new(llm, store.clone());
3875 for session_id in ["a", "b"] {
3876 run_until_terminal(
3877 &engine,
3878 ProcessRequest::new(session_id, message_trigger("x")),
3879 )
3880 .await
3881 .expect("outcome");
3882 }
3883
3884 let sessions = store
3885 .list_sessions(SessionListQuery::default())
3886 .await
3887 .expect("sessions");
3888 assert_eq!(sessions.len(), 2);
3889 let page = store
3890 .list_records(RecordPageQuery::new("a"))
3891 .await
3892 .expect("page");
3893 assert!(!page.records.is_empty());
3894 }
3895
3896 #[tokio::test]
3897 async fn invalid_tool_arguments_are_persisted_without_executor_call() {
3898 let store = Arc::new(InMemoryMemoryStore::new());
3899 let llm = Arc::new(MockLlmProvider::dynamic(|input| {
3900 if input.context.prior_tool_results.is_empty() {
3901 Ok(ProviderDecision {
3902 action: AgentAction::CallSkills(vec![planned(
3903 "bad-args",
3904 "typed_tool",
3905 json!({"value": 1}),
3906 )]),
3907 usage: None,
3908 cache: None,
3909 })
3910 } else {
3911 Ok(ProviderDecision {
3912 action: AgentAction::Respond {
3913 content: "validated".to_string(),
3914 },
3915 usage: None,
3916 cache: None,
3917 })
3918 }
3919 }));
3920 let engine = AgentEngine::new(llm, store.clone());
3921 let calls = Arc::new(Mutex::new(0usize));
3922 let calls_for_executor = calls.clone();
3923 engine.register_wasm_skill(
3924 SkillManifest {
3925 input_schema: json!({
3926 "type": "object",
3927 "required": ["value"],
3928 "properties": {"value": {"type": "string"}}
3929 }),
3930 ..manifest("typed_tool", &["tool:run"])
3931 },
3932 Arc::new(StubSkillExecutor {
3933 name: "typed",
3934 responder: Arc::new(move |_| {
3935 *calls_for_executor.lock().expect("lock") += 1;
3936 Ok(json!({}))
3937 }),
3938 }),
3939 );
3940
3941 let outcome = run_until_terminal(
3942 &engine,
3943 ProcessRequest::new("schema-session", message_trigger("run")).with_scope("tool:run"),
3944 )
3945 .await
3946 .expect("outcome");
3947
3948 assert_eq!(outcome.stop_reason, StopReason::Responded);
3949 assert_eq!(*calls.lock().expect("lock"), 0);
3950 let snapshot = session(&store, "schema-session").await;
3951 assert!(snapshot.records.iter().any(|record| matches!(
3952 record,
3953 SessionRecord::SkillInputValidation(validation)
3954 if !validation.valid && validation.call_id == "bad-args"
3955 )));
3956 assert!(snapshot.records.iter().any(|record| matches!(
3957 record,
3958 SessionRecord::ToolResult(result)
3959 if result.call_id == "bad-args"
3960 && matches!(
3961 result.output,
3962 Err(SkillFailure {
3963 kind: SkillFailureKind::InvalidArguments,
3964 ..
3965 })
3966 )
3967 )));
3968 }
3969
3970 #[tokio::test]
3971 async fn checkpointed_graph_resume_executes_only_unfinished_nodes() {
3972 let store = Arc::new(InMemoryMemoryStore::new());
3973 let llm = Arc::new(MockLlmProvider::scripted(vec![AgentAction::Respond {
3974 content: "done".to_string(),
3975 }]));
3976 let engine = AgentEngine::new(llm, store.clone());
3977 let c1_calls = Arc::new(Mutex::new(0usize));
3978 let c2_calls = Arc::new(Mutex::new(0usize));
3979 let c1_counter = c1_calls.clone();
3980 let c2_counter = c2_calls.clone();
3981 engine.register_wasm_skill(
3982 manifest("first", &["tool:run"]),
3983 Arc::new(StubSkillExecutor {
3984 name: "first",
3985 responder: Arc::new(move |_| {
3986 *c1_counter.lock().expect("lock") += 1;
3987 Ok(json!({"first": true}))
3988 }),
3989 }),
3990 );
3991 engine.register_wasm_skill(
3992 manifest("second", &["tool:run"]),
3993 Arc::new(StubSkillExecutor {
3994 name: "second",
3995 responder: Arc::new(move |_| {
3996 *c2_counter.lock().expect("lock") += 1;
3997 Ok(json!({"second": true}))
3998 }),
3999 }),
4000 );
4001
4002 let trigger_id = "trigger-checkpoint".to_string();
4003 let session_id = "checkpoint-session";
4004 let trigger = TriggerRecord {
4005 trigger_id: trigger_id.clone(),
4006 session_id: session_id.to_string(),
4007 idempotency_key: None,
4008 recorded_at: SystemTime::now(),
4009 trigger: message_trigger("resume"),
4010 intent: None,
4011 };
4012 store.append_trigger(trigger).await.expect("trigger");
4013 let calls = vec![
4014 planned("call-1", "first", json!({})),
4015 planned("call-2", "second", json!({})),
4016 ];
4017 store
4018 .append_model_decision(
4019 session_id,
4020 ModelDecisionRecord {
4021 step: 0,
4022 decided_at: SystemTime::now(),
4023 action: AgentAction::CallSkills(calls.clone()),
4024 },
4025 )
4026 .await
4027 .expect("decision");
4028 let graph = existing_or_new_graph(
4029 &AgentContext {
4030 session_id: session_id.to_string(),
4031 records: Vec::new(),
4032 prior_tool_results: Vec::new(),
4033 granted_scopes: ["tool:run".to_string()].into_iter().collect(),
4034 metadata: ExecutionMetadata {
4035 trigger_id: trigger_id.clone(),
4036 idempotency_key: None,
4037 started_at: SystemTime::now(),
4038 deadline: Instant::now() + EnginePolicy::default().max_execution_time(),
4039 policy: EnginePolicy::default(),
4040 provider: Default::default(),
4041 cancellation: Default::default(),
4042 },
4043 },
4044 0,
4045 &calls,
4046 );
4047 store
4048 .append_tool_execution_graph(session_id, graph.clone())
4049 .await
4050 .expect("graph");
4051 let first_node = graph.nodes.first().expect("first node");
4052 store
4053 .append_tool_node_checkpoint(
4054 session_id,
4055 ToolNodeCheckpointRecord {
4056 checkpoint_id: "cp-start".to_string(),
4057 graph_id: graph.graph_id.clone(),
4058 call_id: first_node.call_id.clone(),
4059 skill_name: first_node.skill_name.clone(),
4060 step: graph.step,
4061 status: ToolNodeStatus::Started,
4062 attempt: 1,
4063 occurred_at: SystemTime::now(),
4064 detail: None,
4065 },
4066 )
4067 .await
4068 .expect("started");
4069 store
4070 .append_tool_node_checkpoint(
4071 session_id,
4072 ToolNodeCheckpointRecord {
4073 checkpoint_id: "cp-ok".to_string(),
4074 graph_id: graph.graph_id.clone(),
4075 call_id: first_node.call_id.clone(),
4076 skill_name: first_node.skill_name.clone(),
4077 step: graph.step,
4078 status: ToolNodeStatus::Succeeded,
4079 attempt: 1,
4080 occurred_at: SystemTime::now(),
4081 detail: None,
4082 },
4083 )
4084 .await
4085 .expect("succeeded");
4086 store
4087 .append_tool_result(
4088 session_id,
4089 ToolResultRecord {
4090 call_id: "call-1".to_string(),
4091 finished_at: SystemTime::now(),
4092 skill_name: "first".to_string(),
4093 output: Ok(json!({"already": "done"})),
4094 },
4095 )
4096 .await
4097 .expect("result");
4098
4099 let result = engine
4100 .advance(AdvanceRequest::Continue(
4101 ContinueRequest::new(session_id).with_scope("tool:run"),
4102 ))
4103 .await
4104 .expect("advance");
4105
4106 assert!(result.outcome.is_some());
4109 assert_eq!(result.outcome.unwrap().stop_reason, StopReason::Responded);
4110 assert_eq!(*c1_calls.lock().expect("lock"), 0);
4111 assert_eq!(*c2_calls.lock().expect("lock"), 1);
4112 let snapshot = session(&store, session_id).await;
4113 assert!(snapshot.records.iter().any(|record| matches!(
4114 record,
4115 SessionRecord::ToolResult(result) if result.call_id == "call-2"
4116 )));
4117 }
4118
4119 #[tokio::test]
4120 async fn priority_ordering_runs_high_priority_first_when_serialized() {
4121 let store = Arc::new(InMemoryMemoryStore::new());
4122 let llm = Arc::new(MockLlmProvider::dynamic(|input| {
4123 if input.context.prior_tool_results.is_empty() {
4124 let mut low = planned("low", "low", json!({}));
4125 low.priority = 0;
4126 let mut high = planned("high", "high", json!({}));
4127 high.priority = 10;
4128 Ok(ProviderDecision {
4129 action: AgentAction::CallSkills(vec![low, high]),
4130 usage: None,
4131 cache: None,
4132 })
4133 } else {
4134 Ok(ProviderDecision {
4135 action: AgentAction::Respond {
4136 content: "ordered".to_string(),
4137 },
4138 usage: None,
4139 cache: None,
4140 })
4141 }
4142 }));
4143 let engine = AgentEngine::new(llm, store.clone());
4144 let order = Arc::new(Mutex::new(Vec::<String>::new()));
4145 for skill_name in ["low", "high"] {
4146 let order = order.clone();
4147 engine.register_wasm_skill(
4148 manifest(skill_name, &["tool:run"]),
4149 Arc::new(StubSkillExecutor {
4150 name: "ordered",
4151 responder: Arc::new(move |invocation| {
4152 order.lock().expect("lock").push(invocation.call_id);
4153 Ok(json!({}))
4154 }),
4155 }),
4156 );
4157 }
4158
4159 let mut policy = EnginePolicy {
4160 max_parallel_skill_calls: 1,
4161 ..EnginePolicy::default()
4162 };
4163 policy.self_improvement.enabled = false;
4164 run_until_terminal(
4165 &engine,
4166 ProcessRequest::new("priority-session", message_trigger("run"))
4167 .with_scope("tool:run")
4168 .with_policy(policy),
4169 )
4170 .await
4171 .expect("outcome");
4172
4173 assert_eq!(&*order.lock().expect("lock"), &["high", "low"]);
4174 }
4175
4176 #[tokio::test]
4177 async fn plan_action_persists_deliberation_record() {
4178 let store = Arc::new(InMemoryMemoryStore::new());
4179 let llm = Arc::new(MockLlmProvider::scripted(vec![
4180 AgentAction::Plan {
4181 summary: "inspect then act".to_string(),
4182 candidate_actions: vec!["search memory".to_string(), "call tool".to_string()],
4183 confidence: 0.82,
4184 },
4185 AgentAction::Respond {
4186 content: "planned".to_string(),
4187 },
4188 ]));
4189 let engine = AgentEngine::new(llm, store.clone());
4190
4191 let outcome = run_until_terminal(
4192 &engine,
4193 ProcessRequest::new("plan-session", message_trigger("think")),
4194 )
4195 .await
4196 .expect("outcome");
4197
4198 assert_eq!(outcome.stop_reason, StopReason::Responded);
4199 let snapshot = session(&store, "plan-session").await;
4200 assert!(snapshot.records.iter().any(|record| matches!(
4201 record,
4202 SessionRecord::Deliberation(deliberation)
4203 if deliberation.summary == "inspect then act"
4204 && deliberation.outcome == DeliberationOutcome::ReadyToAct
4205 )));
4206 }
4207}