Skip to main content

rain_engine_core/
engine.rs

1use crate::{
2    AdvanceRequest, AdvanceResult, AgentAction, AgentContext, AgentContextSnapshot,
3    AgentStateDelta, AgentTrigger, ApprovalDecision, ApprovalResolutionRecord, ContinueRequest,
4    DelegationRecord, DeliberationOutcome, DeliberationRecord, EngineOutcome, EnginePolicy,
5    ExecutionMetadata, ExecutionPlanRecord, KernelEvent, KernelEventRecord, LlmProvider,
6    MemoryError, MemoryStore, MemoryStoreExt, ModelDecisionRecord, OutcomeRecord,
7    PendingApprovalRecord, PlannedSkillCall, Planner, PolicyOverlay, PolicyOverlayPatch,
8    PolicyOverlayStatus, PolicyTuningAction, PolicyTuningRecord, ProcessRequest,
9    ProfilePatchRecord, ProviderContentPart, ProviderDecision, ProviderMessage, ProviderRequest,
10    ProviderRequestConfig, ProviderRole, ReflectionRecord, ResumeToken, RetryPolicy,
11    SelfImprovementMode, SessionRecord, SessionSnapshot, SkillBackendKind, SkillDefinition,
12    SkillFailure, SkillFailureKind, SkillInputValidationRecord, SkillInvocation, SkillManifest,
13    SkillStore, StopReason, StrategyPreferenceRecord, SummaryRecord, SuspendReason, ToolCallRecord,
14    ToolDependency, ToolExecutionGraph, ToolNode, ToolNodeCheckpointRecord, ToolNodeStatus,
15    ToolPerformanceRecord, ToolResultRecord, TriggerIntentRecord, TriggerRecord, WakeRequestRecord,
16};
17use async_trait::async_trait;
18use dashmap::DashMap;
19use metrics::{counter, gauge, histogram};
20use std::collections::{BTreeSet, HashMap};
21use std::sync::Arc;
22use std::time::{Instant, SystemTime};
23use thiserror::Error;
24use tokio::task::JoinSet;
25use tracing::{info, instrument, warn};
26use uuid::Uuid;
27
28#[derive(Debug, Error)]
29pub enum EngineError {
30    #[error("memory error: {0}")]
31    Memory(#[from] MemoryError),
32    #[error("blob error: {0}")]
33    Blob(String),
34    #[error("provider error: {0}")]
35    Provider(String),
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum EngineErrorKind {
40    Storage,
41    Blob,
42    Join,
43    Provider,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum EngineErrorSeverity {
48    Recoverable,
49    Fatal,
50}
51
52impl EngineError {
53    pub fn kind(&self) -> EngineErrorKind {
54        match self {
55            EngineError::Memory(_) => EngineErrorKind::Storage,
56            EngineError::Blob(_) => EngineErrorKind::Blob,
57            EngineError::Provider(_) => EngineErrorKind::Provider,
58        }
59    }
60
61    pub fn severity(&self) -> EngineErrorSeverity {
62        match self {
63            EngineError::Memory(_) => EngineErrorSeverity::Fatal,
64            EngineError::Blob(_) => EngineErrorSeverity::Recoverable,
65            EngineError::Provider(_) => EngineErrorSeverity::Recoverable,
66        }
67    }
68
69    pub fn is_recoverable(&self) -> bool {
70        self.severity() == EngineErrorSeverity::Recoverable
71    }
72}
73
74#[derive(Debug, Error, Clone, PartialEq)]
75#[error("{kind:?}: {message}")]
76pub struct SkillExecutionError {
77    pub kind: SkillFailureKind,
78    pub message: String,
79}
80
81impl SkillExecutionError {
82    pub fn new(kind: SkillFailureKind, message: impl Into<String>) -> Self {
83        Self {
84            kind,
85            message: message.into(),
86        }
87    }
88}
89
90#[async_trait]
91pub trait SkillExecutor: Send + Sync {
92    async fn execute(
93        &self,
94        invocation: SkillInvocation,
95    ) -> Result<serde_json::Value, SkillExecutionError>;
96
97    fn executor_kind(&self) -> &'static str;
98}
99
100pub trait WasmSkillExecutor: SkillExecutor {}
101
102impl<T> WasmSkillExecutor for T where T: SkillExecutor + ?Sized {}
103
104#[async_trait]
105pub trait NativeSkill: Send + Sync {
106    async fn execute(
107        &self,
108        invocation: SkillInvocation,
109    ) -> Result<serde_json::Value, SkillExecutionError>;
110
111    fn requires_human_approval(&self) -> bool {
112        false
113    }
114
115    fn executor_kind(&self) -> &'static str {
116        "native"
117    }
118}
119
120#[derive(Clone)]
121pub(crate) enum RegisteredSkillBackend {
122    Wasm(Arc<dyn SkillExecutor>),
123    Native(Arc<dyn NativeSkill>),
124}
125
126impl RegisteredSkillBackend {
127    fn kind(&self) -> SkillBackendKind {
128        match self {
129            RegisteredSkillBackend::Wasm(_) => SkillBackendKind::Wasm,
130            RegisteredSkillBackend::Native(_) => SkillBackendKind::Native,
131        }
132    }
133
134    fn executor_kind(&self) -> &'static str {
135        match self {
136            RegisteredSkillBackend::Wasm(executor) => executor.executor_kind(),
137            RegisteredSkillBackend::Native(executor) => executor.executor_kind(),
138        }
139    }
140
141    fn requires_human_approval(&self) -> bool {
142        match self {
143            RegisteredSkillBackend::Wasm(_) => false,
144            RegisteredSkillBackend::Native(executor) => executor.requires_human_approval(),
145        }
146    }
147}
148
149#[derive(Clone)]
150pub(crate) struct RegisteredSkill {
151    pub(crate) manifest: SkillManifest,
152    pub(crate) backend: RegisteredSkillBackend,
153}
154
155impl RegisteredSkill {
156    fn definition(&self) -> SkillDefinition {
157        SkillDefinition {
158            manifest: self.manifest.clone(),
159            executor_kind: self.backend.executor_kind().to_string(),
160        }
161    }
162}
163
164#[derive(Clone)]
165pub struct AgentEngine {
166    llm: Arc<dyn LlmProvider>,
167    memory: Arc<dyn MemoryStore>,
168    state_cache: Arc<dyn crate::StateProjectionCache>,
169    skill_store: Option<Arc<dyn SkillStore>>,
170    skills: Arc<DashMap<String, RegisteredSkill>>,
171    planner: Option<Arc<dyn Planner>>,
172}
173
174impl AgentEngine {
175    pub fn new(llm: Arc<dyn LlmProvider>, memory: Arc<dyn MemoryStore>) -> Self {
176        Self {
177            llm,
178            memory,
179            state_cache: Arc::new(crate::InMemoryStateCache::new()),
180            skill_store: None,
181            skills: Arc::new(DashMap::new()),
182            planner: None,
183        }
184    }
185
186    pub fn state_cache(&self) -> Arc<dyn crate::StateProjectionCache> {
187        self.state_cache.clone()
188    }
189
190    pub fn with_state_cache(mut self, state_cache: Arc<dyn crate::StateProjectionCache>) -> Self {
191        self.state_cache = state_cache;
192        self
193    }
194
195    pub fn with_skill_store(mut self, skill_store: Arc<dyn SkillStore>) -> Self {
196        self.skill_store = Some(skill_store);
197        self
198    }
199
200    pub fn with_planner(mut self, planner: Arc<dyn Planner>) -> Self {
201        self.planner = Some(planner);
202        self
203    }
204
205    pub fn register_native_skill(&self, manifest: SkillManifest, skill: Arc<dyn NativeSkill>) {
206        self.skills.insert(
207            manifest.name.clone(),
208            RegisteredSkill {
209                manifest,
210                backend: RegisteredSkillBackend::Native(skill),
211            },
212        );
213    }
214
215    pub fn register_wasm_skill(&self, manifest: SkillManifest, executor: Arc<dyn SkillExecutor>) {
216        self.skills.insert(
217            manifest.name.clone(),
218            RegisteredSkill {
219                manifest,
220                backend: RegisteredSkillBackend::Wasm(executor),
221            },
222        );
223    }
224
225    pub async fn register_wasm_skill_persistent(
226        &self,
227        manifest: SkillManifest,
228        executor: Arc<dyn SkillExecutor>,
229        wasm_bytes: Vec<u8>,
230    ) -> Result<(), String> {
231        if let Some(store) = &self.skill_store {
232            store.store_skill(manifest.clone(), wasm_bytes).await?;
233        }
234        self.register_wasm_skill(manifest, executor);
235        Ok(())
236    }
237
238    pub async fn advance(&self, request: AdvanceRequest) -> Result<AdvanceResult, EngineError> {
239        match request {
240            AdvanceRequest::Trigger(request) => self.advance_trigger(request).await,
241            AdvanceRequest::Continue(request) => self.advance_continue(request).await,
242        }
243    }
244
245    pub async fn skill_definitions(&self) -> Vec<SkillDefinition> {
246        let mut definitions = self
247            .skills
248            .iter()
249            .map(|entry| entry.value().definition())
250            .collect::<Vec<_>>();
251        definitions.sort_by(|left, right| left.manifest.name.cmp(&right.manifest.name));
252        definitions
253    }
254
255    async fn advance_trigger(&self, request: ProcessRequest) -> Result<AdvanceResult, EngineError> {
256        let started_at = SystemTime::now();
257        let trigger_id = Uuid::new_v4().to_string();
258
259        if let Some(idempotency_key) = request.idempotency_key.as_deref()
260            && let Ok(Some(mut prior_outcome)) = self
261                .memory
262                .find_outcome_by_idempotency_key(&request.session_id, idempotency_key)
263                .await
264        {
265            prior_outcome.idempotent_replay = true;
266            counter!("rain_engine.idempotent_replay_total").increment(1);
267            return Ok(AdvanceResult {
268                outcome: Some(prior_outcome),
269                emitted_events: Vec::new(),
270                state_delta: AgentStateDelta::default(),
271                wake_request: None,
272            });
273        }
274
275        let trigger_record = TriggerRecord {
276            trigger_id: trigger_id.clone(),
277            session_id: request.session_id.clone(),
278            idempotency_key: request.idempotency_key.clone(),
279            recorded_at: started_at,
280            trigger: request.trigger.clone(),
281            intent: None,
282        };
283        if let Err(err) = self.memory.append_trigger(trigger_record.clone()).await {
284            return Ok(AdvanceResult {
285                outcome: Some(storage_failure_outcome(trigger_id, 0, err.message)),
286                emitted_events: Vec::new(),
287                state_delta: AgentStateDelta::default(),
288                wake_request: None,
289            });
290        }
291
292        let mut snapshot = match self.state_cache.get_projection(&request.session_id).await {
293            Ok(Some(mut cached)) => {
294                cached.records.push(SessionRecord::Trigger(trigger_record));
295                cached
296            }
297            _ => match self.memory.load_session(&request.session_id).await {
298                Ok(snapshot) => snapshot,
299                Err(err) => {
300                    return Ok(AdvanceResult {
301                        outcome: Some(storage_failure_outcome(trigger_id, 0, err.message)),
302                        emitted_events: Vec::new(),
303                        state_delta: AgentStateDelta::default(),
304                        wake_request: None,
305                    });
306                }
307            },
308        };
309        counter!("rain_engine.triggers_total").increment(1);
310
311        // Intent is classified deterministically so one advance step still asks
312        // the provider at most once.
313        let _ = self
314            .memory
315            .append_trigger_intent(
316                &request.session_id,
317                TriggerIntentRecord {
318                    trigger_id: trigger_id.clone(),
319                    classified_at: SystemTime::now(),
320                    intent: classify_trigger_intent(&request.trigger),
321                },
322            )
323            .await;
324        if let Ok(refreshed) = self.memory.load_session(&request.session_id).await {
325            snapshot = refreshed;
326        }
327
328        // Invoke planner if available to handle goal decomposition or task adjustment
329        if let Some(planner) = &self.planner {
330            let output = planner
331                .plan(&snapshot.agent_state(), &request.trigger)
332                .await;
333            let mut changed = false;
334            if !output.events.is_empty() {
335                for event in output.events {
336                    let _ = self
337                        .memory
338                        .append_kernel_event(
339                            &request.session_id,
340                            KernelEventRecord {
341                                event_id: Uuid::new_v4().to_string(),
342                                occurred_at: SystemTime::now(),
343                                event,
344                            },
345                        )
346                        .await;
347                }
348                changed = true;
349            }
350            if let Some(plan) = output.proposed_plan {
351                let _ = self
352                    .memory
353                    .append_execution_plan(
354                        &request.session_id,
355                        ExecutionPlanRecord {
356                            plan_id: format!("plan-{}", Uuid::new_v4()),
357                            created_at: SystemTime::now(),
358                            objective: plan.objective,
359                            steps: plan.steps,
360                            current_step_index: 0,
361                            completed_at: None,
362                        },
363                    )
364                    .await;
365                changed = true;
366            }
367            if changed {
368                // Refresh snapshot if planning occurred
369                if let Ok(refreshed) = self.memory.load_session(&request.session_id).await {
370                    snapshot = refreshed;
371                }
372            }
373
374            // Trigger history summarization if history is getting long
375            if snapshot.records.len() > 10
376                && !snapshot
377                    .records
378                    .iter()
379                    .any(|r| matches!(r, SessionRecord::Summary(_)))
380                && let Ok(summary) = self
381                    .summarize_history(&snapshot, &request.policy, &request.trigger)
382                    .await
383            {
384                let _ = self
385                    .memory
386                    .append_summary(&request.session_id, summary)
387                    .await;
388                // Refresh snapshot again to include summary
389                if let Ok(refreshed) = self.memory.load_session(&request.session_id).await {
390                    snapshot = refreshed;
391                }
392            }
393        }
394        let effective_policy = request
395            .policy
396            .clone()
397            .with_overlay(snapshot.active_policy_overlay());
398        let deadline = Instant::now() + effective_policy.max_execution_time();
399        let mut context = AgentContext {
400            session_id: request.session_id.clone(),
401            records: snapshot.records.clone(),
402            prior_tool_results: snapshot.tool_results(),
403            granted_scopes: request.granted_scopes.clone(),
404            metadata: ExecutionMetadata {
405                trigger_id: trigger_id.clone(),
406                idempotency_key: request.idempotency_key.clone(),
407                started_at,
408                deadline,
409                policy: effective_policy,
410                provider: request.provider.clone(),
411                cancellation: request.cancellation.clone(),
412            },
413        };
414
415        counter!("rain_engine.triggers_total").increment(1);
416        info!(session_id = %context.session_id, trigger_id = %trigger_id, "processing trigger");
417
418        let emitted_events =
419            derive_trigger_kernel_events(&context.metadata.trigger_id, &request.trigger);
420        self.persist_kernel_events(&mut context, &emitted_events)
421            .await?;
422
423        let mut steps_executed = snapshot.current_step_count();
424        let mut consecutive_tool_failure_steps = snapshot.current_consecutive_tool_failure_steps();
425
426        if let AgentTrigger::Approval {
427            resume_token,
428            decision,
429            metadata,
430        } = &request.trigger
431        {
432            let mut pending = None;
433            let mut already_resolved = false;
434            let mut effective_decision = decision.clone();
435
436            if let Some(p) = self
437                .memory
438                .find_pending_approval_by_resume_token(&context.session_id, resume_token.as_str())
439                .await?
440            {
441                pending = Some(p);
442            } else {
443                let resolution = context.records.iter().find_map(|r| match r {
444                    SessionRecord::ApprovalResolution(res)
445                        if res.resume_token.as_str() == resume_token.as_str() =>
446                    {
447                        Some(res.clone())
448                    }
449                    _ => None,
450                });
451                if let Some(res) = resolution {
452                    let original_pending = context.records.iter().find_map(|r| match r {
453                        SessionRecord::PendingApproval(p)
454                            if p.resume_token.as_str() == resume_token.as_str() =>
455                        {
456                            Some(p.clone())
457                        }
458                        _ => None,
459                    });
460                    if let Some(p) = original_pending {
461                        pending = Some(p);
462                        already_resolved = true;
463                        effective_decision = res.decision.clone();
464                    }
465                }
466            }
467
468            let Some(pending) = pending else {
469                let outcome = self
470                    .finish(
471                        &mut context,
472                        StopReason::PolicyAborted,
473                        None,
474                        Some("resume token not found".to_string()),
475                        steps_executed,
476                        None,
477                    )
478                    .await?;
479                return Ok(build_advance_result(outcome, emitted_events));
480            };
481
482            if !already_resolved {
483                self.memory
484                    .append_approval_resolution(
485                        &context.session_id,
486                        ApprovalResolutionRecord {
487                            resume_token: pending.resume_token.clone(),
488                            resolved_at: SystemTime::now(),
489                            decision: effective_decision.clone(),
490                            metadata: metadata.clone(),
491                        },
492                    )
493                    .await?;
494            }
495
496            let resumed = match effective_decision {
497                ApprovalDecision::Approved => match self
498                    .execute_planned_calls(
499                        &context,
500                        pending.step,
501                        pending.pending_calls.clone(),
502                        true,
503                    )
504                    .await?
505                {
506                    BatchExecution::Executed(batch) => batch,
507                    BatchExecution::Suspended { .. } => {
508                        let outcome = self
509                            .finish(
510                                &mut context,
511                                StopReason::PolicyAborted,
512                                None,
513                                Some("approval resume unexpectedly suspended".to_string()),
514                                pending.step,
515                                None,
516                            )
517                            .await?;
518                        return Ok(build_advance_result(outcome, emitted_events));
519                    }
520                },
521                ApprovalDecision::Rejected => ExecutedBatch {
522                    results: pending
523                        .pending_calls
524                        .into_iter()
525                        .map(|call| ToolResultRecord {
526                            call_id: call.call_id,
527                            finished_at: SystemTime::now(),
528                            skill_name: call.name,
529                            output: Err(SkillFailure {
530                                kind: SkillFailureKind::PermissionDenied,
531                                message: "human approval rejected".to_string(),
532                            }),
533                        })
534                        .collect(),
535                    all_failed: true,
536                },
537            };
538
539            for result in resumed.results {
540                self.memory
541                    .append_tool_result(&context.session_id, result.clone())
542                    .await?;
543                context.prior_tool_results.push(result.clone());
544                context.records.push(SessionRecord::ToolResult(result));
545            }
546            steps_executed = pending.step + 1;
547            if resumed.all_failed {
548                consecutive_tool_failure_steps += 1;
549            }
550        } else if let AgentTrigger::DelegationResult {
551            correlation_id: _,
552            payload: _,
553            metadata: _,
554        } = &request.trigger
555        {
556            // The DelegationResult event was already appended to history via `derive_trigger_kernel_events`
557            // We just need to resume execution.
558            // Actually wait, `derive_trigger_kernel_events` returns the event, and `self.persist_kernel_events` appends it.
559            // So we don't need to do anything special here other than let the normal policy loop continue,
560            // since the LLM will see the `KernelEvent::DelegationResolved` or `AgentTrigger::DelegationResult` in its context
561            // and know the sub-task is done.
562
563            // Let's just make sure it counts as a step or something if necessary.
564            // Actually, we can just proceed to `self.perform_single_step`
565
566            // Wait, we should probably append a fake tool result or something, or the LLM can just read the Trigger.
567            // `build_provider_contents` already includes the Trigger in the context.
568
569            // So nothing extra is strictly needed here for resumption, it will fall through to `perform_single_step`
570            // which will call the LLM to get the next action.
571        }
572
573        self.perform_single_step(
574            context,
575            request.trigger,
576            steps_executed,
577            consecutive_tool_failure_steps,
578            emitted_events,
579        )
580        .await
581    }
582
583    async fn advance_continue(
584        &self,
585        request: ContinueRequest,
586    ) -> Result<AdvanceResult, EngineError> {
587        let snapshot = match self.memory.load_session(&request.session_id).await {
588            Ok(snapshot) => snapshot,
589            Err(err) => {
590                return Ok(AdvanceResult {
591                    outcome: Some(storage_failure_outcome(
592                        request.session_id.clone(),
593                        0,
594                        err.message,
595                    )),
596                    emitted_events: Vec::new(),
597                    state_delta: AgentStateDelta::default(),
598                    wake_request: None,
599                });
600            }
601        };
602
603        let Some(active_trigger) = snapshot.active_trigger() else {
604            return Ok(AdvanceResult {
605                outcome: Some(EngineOutcome {
606                    trigger_id: Uuid::new_v4().to_string(),
607                    stop_reason: StopReason::Yielded,
608                    response: None,
609                    detail: Some("no active trigger to continue".to_string()),
610                    steps_executed: 0,
611                    idempotent_replay: false,
612                    resume_token: None,
613                }),
614                emitted_events: Vec::new(),
615                state_delta: AgentStateDelta::default(),
616                wake_request: None,
617            });
618        };
619
620        let trigger_id = active_trigger.trigger_id.clone();
621        let started_at = SystemTime::now();
622        let effective_policy = request
623            .policy
624            .clone()
625            .with_overlay(snapshot.active_policy_overlay());
626        let deadline = Instant::now() + effective_policy.max_execution_time();
627        let mut context = AgentContext {
628            session_id: request.session_id.clone(),
629            records: snapshot.records.clone(),
630            prior_tool_results: snapshot.tool_results(),
631            granted_scopes: request.granted_scopes.clone(),
632            metadata: ExecutionMetadata {
633                trigger_id,
634                idempotency_key: active_trigger.idempotency_key.clone(),
635                started_at,
636                deadline,
637                policy: effective_policy,
638                provider: request.provider.clone(),
639                cancellation: request.cancellation.clone(),
640            },
641        };
642
643        if let Some(graph) = snapshot.active_tool_execution_graph() {
644            let calls = graph
645                .nodes
646                .iter()
647                .map(|node| PlannedSkillCall {
648                    call_id: node.call_id.clone(),
649                    name: node.skill_name.clone(),
650                    args: node.args.clone(),
651                    priority: node.priority,
652                    depends_on: node
653                        .dependencies
654                        .iter()
655                        .map(|dependency| dependency.call_id.clone())
656                        .collect(),
657                    retry_policy: node.retry_policy.clone(),
658                    dry_run: node.dry_run,
659                })
660                .collect::<Vec<_>>();
661            match self
662                .execute_planned_calls(&context, graph.step, calls, true)
663                .await?
664            {
665                BatchExecution::Executed(batch) => {
666                    for result in batch.results {
667                        self.memory
668                            .append_tool_result(&context.session_id, result.clone())
669                            .await?;
670                        context.prior_tool_results.push(result.clone());
671                        context.records.push(SessionRecord::ToolResult(result));
672                    }
673                    // Instead of yielding to the runtime, we must perform the next step directly
674                    // to ensure the LLM sees the ToolResultRecord that we just executed!
675                    return self
676                        .perform_single_step(
677                            context,
678                            active_trigger.trigger,
679                            snapshot.current_step_count(),
680                            snapshot.current_consecutive_tool_failure_steps(),
681                            Vec::new(),
682                        )
683                        .await;
684                }
685                BatchExecution::Suspended { .. } => {
686                    let outcome = self
687                        .finish(
688                            &mut context,
689                            StopReason::PolicyAborted,
690                            None,
691                            Some("checkpointed graph unexpectedly suspended".to_string()),
692                            graph.step,
693                            None,
694                        )
695                        .await?;
696                    return Ok(build_advance_result(outcome, Vec::new()));
697                }
698            }
699        }
700
701        self.perform_single_step(
702            context,
703            active_trigger.trigger,
704            snapshot.current_step_count(),
705            snapshot.current_consecutive_tool_failure_steps(),
706            Vec::new(),
707        )
708        .await
709    }
710
711    #[instrument(
712        skip(self, context, trigger, emitted_events),
713        fields(
714            session_id = %context.session_id,
715            trigger_id = %context.metadata.trigger_id,
716            step = steps_executed
717        )
718    )]
719    async fn perform_single_step(
720        &self,
721        mut context: AgentContext,
722        trigger: AgentTrigger,
723        steps_executed: usize,
724        consecutive_tool_failure_steps: usize,
725        emitted_events: Vec<KernelEventRecord>,
726    ) -> Result<AdvanceResult, EngineError> {
727        if let Some(mut plan) = context.active_execution_plan()
728            && plan.current_step_index < plan.steps.len()
729        {
730            let action = plan.steps[plan.current_step_index].clone();
731            plan.current_step_index += 1;
732            if plan.current_step_index >= plan.steps.len() {
733                plan.completed_at = Some(SystemTime::now());
734            }
735            let _ = self
736                .memory
737                .append_execution_plan(&context.session_id, plan)
738                .await;
739
740            return self
741                .execute_action(
742                    context,
743                    action,
744                    steps_executed,
745                    consecutive_tool_failure_steps,
746                    emitted_events,
747                )
748                .await;
749        }
750
751        if let Some(outcome) = self
752            .policy_outcome(&mut context, steps_executed, consecutive_tool_failure_steps)
753            .await?
754        {
755            return Ok(build_advance_result(outcome, emitted_events));
756        }
757
758        let mut available_skills = self
759            .skills
760            .iter()
761            .filter(|skill| {
762                skill
763                    .value()
764                    .manifest
765                    .required_scopes
766                    .iter()
767                    .all(|scope| context.granted_scopes.contains(scope))
768            })
769            .map(|skill| skill.value().definition())
770            .collect::<Vec<_>>();
771
772        // Enforce Strategy Preferences: If a tool has a high failure rate, append a warning
773        // to its description so the LLM avoids using it.
774        let preferences: HashMap<_, _> = context
775            .records
776            .iter()
777            .filter_map(|record| {
778                if let SessionRecord::StrategyPreference(pref) = record
779                    && let Some(skill_name) = &pref.skill_name
780                {
781                    return Some((skill_name.clone(), pref.reason.clone()));
782                }
783                None
784            })
785            .collect();
786
787        for def in &mut available_skills {
788            if let Some(reason) = preferences.get(&def.manifest.name) {
789                def.manifest.description = format!(
790                    "{} [CRITICAL WARNING: Avoid using this tool if possible. Reason: {}]",
791                    def.manifest.description, reason
792                );
793            }
794        }
795
796        let plan = context.active_execution_plan();
797        let provider_request = ProviderRequest {
798            trigger: trigger.clone(),
799            context: context.to_snapshot(steps_executed),
800            available_skills,
801            config: context.metadata.provider.clone(),
802            policy: context.metadata.policy.clone(),
803            contents: build_provider_contents(&trigger, &context.records, plan.as_ref()),
804        };
805        let provider_started = Instant::now();
806        let decision = match tokio::time::timeout(
807            context.metadata.policy.provider_timeout(),
808            self.llm.generate_action(provider_request),
809        )
810        .await
811        {
812            Ok(Ok(decision)) => decision,
813            Ok(Err(err)) => {
814                warn!(session_id = %context.session_id, "provider failed: {}", err.message);
815                let outcome = self
816                    .finish(
817                        &mut context,
818                        StopReason::ProviderFailure,
819                        None,
820                        Some(format!("provider failure: {}", err.message)),
821                        steps_executed,
822                        None,
823                    )
824                    .await?;
825                return Ok(build_advance_result(outcome, emitted_events));
826            }
827            Err(_) => {
828                warn!(session_id = %context.session_id, "provider timed out");
829                let outcome = self
830                    .finish(
831                        &mut context,
832                        StopReason::ProviderFailure,
833                        None,
834                        Some("provider timeout exceeded".to_string()),
835                        steps_executed,
836                        None,
837                    )
838                    .await?;
839                return Ok(build_advance_result(outcome, emitted_events));
840            }
841        };
842        histogram!("rain_engine.provider_latency_seconds")
843            .record(provider_started.elapsed().as_secs_f64());
844        counter!("rain_engine.model_decisions_total", "action" => action_metric_label(&decision.action)).increment(1);
845
846        self.persist_provider_metadata(&mut context, &decision)
847            .await?;
848
849        let decision_record = ModelDecisionRecord {
850            step: steps_executed,
851            decided_at: SystemTime::now(),
852            action: decision.action.clone(),
853        };
854        if let Err(err) = self
855            .memory
856            .append_model_decision(&context.session_id, decision_record.clone())
857            .await
858        {
859            return Ok(AdvanceResult {
860                outcome: Some(storage_failure_outcome(
861                    context.metadata.trigger_id.clone(),
862                    steps_executed,
863                    err.message,
864                )),
865                emitted_events,
866                state_delta: AgentStateDelta::default(),
867                wake_request: None,
868            });
869        }
870        context
871            .records
872            .push(SessionRecord::ModelDecision(decision_record));
873
874        self.execute_action(
875            context,
876            decision.action,
877            steps_executed,
878            consecutive_tool_failure_steps,
879            emitted_events,
880        )
881        .await
882    }
883
884    async fn execute_action(
885        &self,
886        mut context: AgentContext,
887        action: AgentAction,
888        steps_executed: usize,
889        _consecutive_tool_failure_steps: usize,
890        mut emitted_events: Vec<KernelEventRecord>,
891    ) -> Result<AdvanceResult, EngineError> {
892        match action {
893            AgentAction::Plan {
894                summary,
895                candidate_actions,
896                confidence,
897            } => {
898                let record = DeliberationRecord {
899                    deliberation_id: Uuid::new_v4().to_string(),
900                    trigger_id: context.metadata.trigger_id.clone(),
901                    step: steps_executed,
902                    created_at: SystemTime::now(),
903                    summary,
904                    candidate_actions,
905                    confidence,
906                    outcome: if confidence >= 0.7 {
907                        DeliberationOutcome::ReadyToAct
908                    } else {
909                        DeliberationOutcome::NeedsRefinement
910                    },
911                };
912                self.memory
913                    .append_deliberation(&context.session_id, record.clone())
914                    .await?;
915                context.records.push(SessionRecord::Deliberation(record));
916                Ok(AdvanceResult {
917                    outcome: None,
918                    emitted_events: emitted_events.clone(),
919                    state_delta: derive_state_delta(&emitted_events),
920                    wake_request: emitted_events.iter().find_map(extract_wake_request),
921                })
922            }
923            AgentAction::Respond { content } => {
924                let outcome = self
925                    .finish(
926                        &mut context,
927                        StopReason::Responded,
928                        Some(content),
929                        None,
930                        steps_executed + 1,
931                        None,
932                    )
933                    .await?;
934                Ok(build_advance_result(outcome, emitted_events))
935            }
936            AgentAction::Yield { reason } => {
937                let outcome = self
938                    .finish(
939                        &mut context,
940                        StopReason::Yielded,
941                        None,
942                        reason,
943                        steps_executed + 1,
944                        None,
945                    )
946                    .await?;
947                Ok(build_advance_result(outcome, emitted_events))
948            }
949            AgentAction::MemorySearch { query, limit } => {
950                let mut results = self
951                    .memory
952                    .list_records(crate::RecordPageQuery::new(context.session_id.clone()))
953                    .await
954                    .map(|p| p.records)
955                    .unwrap_or_default();
956
957                let query_lower = query.to_lowercase();
958                results.retain(|r| {
959                    format!("{:?}", r.record)
960                        .to_lowercase()
961                        .contains(&query_lower)
962                });
963                results.truncate(limit.max(1));
964
965                let observation = serde_json::to_value(results).unwrap_or_default();
966
967                let event = KernelEventRecord {
968                    event_id: Uuid::new_v4().to_string(),
969                    occurred_at: SystemTime::now(),
970                    event: KernelEvent::MemorySearched { query, limit },
971                };
972                self.memory
973                    .append_kernel_event(&context.session_id, event.clone())
974                    .await?;
975                context
976                    .records
977                    .push(SessionRecord::KernelEvent(event.clone()));
978                emitted_events.push(event.clone());
979
980                let _observation_event = KernelEventRecord {
981                    event_id: Uuid::new_v4().to_string(),
982                    occurred_at: SystemTime::now(),
983                    event: KernelEvent::ObservationAppended(crate::ObservationRecord {
984                        observation_id: crate::ObservationId(Uuid::new_v4().to_string()),
985                        recorded_at: SystemTime::now(),
986                        source: "MemorySearch".to_string(),
987                        content: observation,
988                        attachment_ids: vec![],
989                        related_resources: vec![],
990                    }),
991                };
992                self.memory
993                    .append_kernel_event(&context.session_id, _observation_event.clone())
994                    .await?;
995                context
996                    .records
997                    .push(SessionRecord::KernelEvent(_observation_event.clone()));
998                emitted_events.push(_observation_event.clone());
999
1000                Ok(AdvanceResult {
1001                    outcome: None,
1002                    emitted_events: emitted_events.clone(),
1003                    state_delta: derive_state_delta(&emitted_events),
1004                    wake_request: emitted_events.iter().find_map(extract_wake_request),
1005                })
1006            }
1007            AgentAction::MemoryArchive { content } => {
1008                let event = KernelEventRecord {
1009                    event_id: Uuid::new_v4().to_string(),
1010                    occurred_at: SystemTime::now(),
1011                    event: KernelEvent::MemoryArchived {
1012                        content: content.clone(),
1013                    },
1014                };
1015                self.memory
1016                    .append_kernel_event(&context.session_id, event.clone())
1017                    .await?;
1018                context
1019                    .records
1020                    .push(SessionRecord::KernelEvent(event.clone()));
1021                emitted_events.push(event.clone());
1022
1023                let _observation_event = KernelEventRecord {
1024                    event_id: Uuid::new_v4().to_string(),
1025                    occurred_at: SystemTime::now(),
1026                    event: KernelEvent::ObservationAppended(crate::ObservationRecord {
1027                        observation_id: crate::ObservationId(Uuid::new_v4().to_string()),
1028                        recorded_at: SystemTime::now(),
1029                        source: "MemoryArchive".to_string(),
1030                        content: serde_json::json!({ "status": "archived successfully" }),
1031                        attachment_ids: vec![],
1032                        related_resources: vec![],
1033                    }),
1034                };
1035                self.memory
1036                    .append_kernel_event(&context.session_id, _observation_event.clone())
1037                    .await?;
1038                context
1039                    .records
1040                    .push(SessionRecord::KernelEvent(_observation_event.clone()));
1041                emitted_events.push(_observation_event.clone());
1042
1043                Ok(AdvanceResult {
1044                    outcome: None,
1045                    emitted_events: emitted_events.clone(),
1046                    state_delta: derive_state_delta(&emitted_events),
1047                    wake_request: emitted_events.iter().find_map(extract_wake_request),
1048                })
1049            }
1050            AgentAction::Continue { .. } => Ok(AdvanceResult {
1051                outcome: None,
1052                emitted_events: emitted_events.clone(),
1053                state_delta: derive_state_delta(&emitted_events),
1054                wake_request: emitted_events.iter().find_map(extract_wake_request),
1055            }),
1056            AgentAction::Suspend {
1057                reason,
1058                pending_calls,
1059                resume_token,
1060            } => {
1061                let outcome = self
1062                    .suspend(
1063                        &mut context,
1064                        steps_executed,
1065                        reason,
1066                        pending_calls,
1067                        resume_token,
1068                    )
1069                    .await?;
1070                Ok(build_advance_result(outcome, emitted_events))
1071            }
1072            AgentAction::CallSkills(calls) => match self
1073                .execute_planned_calls(&context, steps_executed, calls, false)
1074                .await?
1075            {
1076                BatchExecution::Executed(ExecutedBatch {
1077                    results,
1078                    all_failed,
1079                }) => {
1080                    for result in results {
1081                        if let Err(err) = self
1082                            .memory
1083                            .append_tool_result(&context.session_id, result.clone())
1084                            .await
1085                        {
1086                            return Ok(AdvanceResult {
1087                                outcome: Some(storage_failure_outcome(
1088                                    context.metadata.trigger_id.clone(),
1089                                    steps_executed + 1,
1090                                    err.message,
1091                                )),
1092                                emitted_events,
1093                                state_delta: AgentStateDelta::default(),
1094                                wake_request: None,
1095                            });
1096                        }
1097                        context.prior_tool_results.push(result.clone());
1098                        context.records.push(SessionRecord::ToolResult(result));
1099                    }
1100                    let _ = all_failed;
1101                    Ok(AdvanceResult {
1102                        outcome: None,
1103                        emitted_events: emitted_events.clone(),
1104                        state_delta: derive_state_delta(&emitted_events),
1105                        wake_request: emitted_events.iter().find_map(extract_wake_request),
1106                    })
1107                }
1108                BatchExecution::Suspended {
1109                    reason,
1110                    pending_calls,
1111                    resume_token,
1112                } => {
1113                    let outcome = self
1114                        .suspend(
1115                            &mut context,
1116                            steps_executed,
1117                            reason,
1118                            pending_calls,
1119                            resume_token,
1120                        )
1121                        .await?;
1122                    Ok(build_advance_result(outcome, emitted_events))
1123                }
1124            },
1125            AgentAction::Delegate {
1126                target,
1127                task,
1128                correlation_id,
1129                resume_token,
1130            } => {
1131                let record = DelegationRecord {
1132                    correlation_id,
1133                    created_at: SystemTime::now(),
1134                    trigger_id: context.metadata.trigger_id.clone(),
1135                    target,
1136                    task,
1137                    resume_token: resume_token.clone(),
1138                };
1139                self.memory
1140                    .append_delegation(&context.session_id, record.clone())
1141                    .await?;
1142                context
1143                    .records
1144                    .push(SessionRecord::Delegation(record.clone()));
1145                let event = KernelEventRecord {
1146                    event_id: format!("delegation-{}", record.correlation_id.as_str()),
1147                    occurred_at: record.created_at,
1148                    event: KernelEvent::DelegationRequested(record),
1149                };
1150                self.memory
1151                    .append_kernel_event(&context.session_id, event.clone())
1152                    .await?;
1153                context
1154                    .records
1155                    .push(SessionRecord::KernelEvent(event.clone()));
1156                emitted_events.push(event);
1157                let outcome = self
1158                    .finish(
1159                        &mut context,
1160                        StopReason::Delegated,
1161                        None,
1162                        Some("delegated to downstream worker".to_string()),
1163                        steps_executed + 1,
1164                        Some(resume_token),
1165                    )
1166                    .await?;
1167                Ok(build_advance_result(outcome, emitted_events))
1168            }
1169        }
1170    }
1171
1172    async fn policy_outcome(
1173        &self,
1174        context: &mut AgentContext,
1175        steps_executed: usize,
1176        consecutive_tool_failure_steps: usize,
1177    ) -> Result<Option<EngineOutcome>, EngineError> {
1178        if context.metadata.cancellation.is_cancelled() {
1179            return self
1180                .finish(
1181                    context,
1182                    StopReason::Cancelled,
1183                    None,
1184                    Some("execution cancelled".to_string()),
1185                    steps_executed,
1186                    None,
1187                )
1188                .await
1189                .map(Some);
1190        }
1191
1192        if Instant::now() >= context.metadata.deadline {
1193            return self
1194                .finish(
1195                    context,
1196                    StopReason::DeadlineExceeded,
1197                    None,
1198                    Some("engine execution deadline exceeded".to_string()),
1199                    steps_executed,
1200                    None,
1201                )
1202                .await
1203                .map(Some);
1204        }
1205
1206        if steps_executed >= context.metadata.policy.max_steps {
1207            return self
1208                .finish(
1209                    context,
1210                    StopReason::MaxStepsReached,
1211                    None,
1212                    Some("max steps reached".to_string()),
1213                    steps_executed,
1214                    None,
1215                )
1216                .await
1217                .map(Some);
1218        }
1219
1220        if consecutive_tool_failure_steps >= context.metadata.policy.max_consecutive_tool_failures {
1221            return self
1222                .finish(
1223                    context,
1224                    StopReason::PolicyAborted,
1225                    None,
1226                    Some("max consecutive tool failure steps reached".to_string()),
1227                    steps_executed,
1228                    None,
1229                )
1230                .await
1231                .map(Some);
1232        }
1233
1234        let cost_so_far = context
1235            .records
1236            .iter()
1237            .filter_map(|record| match record {
1238                SessionRecord::ProviderUsage(usage) => Some(usage.estimated_cost_usd),
1239                _ => None,
1240            })
1241            .sum::<f64>();
1242        if cost_so_far >= context.metadata.policy.max_cost_per_session {
1243            return self
1244                .finish(
1245                    context,
1246                    StopReason::PolicyAborted,
1247                    None,
1248                    Some("session cost limit reached".to_string()),
1249                    steps_executed,
1250                    None,
1251                )
1252                .await
1253                .map(Some);
1254        }
1255
1256        Ok(None)
1257    }
1258
1259    async fn persist_kernel_events(
1260        &self,
1261        context: &mut AgentContext,
1262        events: &[KernelEventRecord],
1263    ) -> Result<(), EngineError> {
1264        for event in events {
1265            self.memory
1266                .append_kernel_event(&context.session_id, event.clone())
1267                .await?;
1268            context
1269                .records
1270                .push(SessionRecord::KernelEvent(event.clone()));
1271        }
1272        Ok(())
1273    }
1274
1275    async fn persist_provider_metadata(
1276        &self,
1277        context: &mut AgentContext,
1278        decision: &ProviderDecision,
1279    ) -> Result<(), EngineError> {
1280        if let Some(usage) = &decision.usage {
1281            self.memory
1282                .append_provider_usage(&context.session_id, usage.clone())
1283                .await?;
1284            context
1285                .records
1286                .push(SessionRecord::ProviderUsage(usage.clone()));
1287        }
1288        if let Some(cache) = &decision.cache {
1289            self.memory
1290                .append_provider_cache(&context.session_id, cache.clone())
1291                .await?;
1292            context
1293                .records
1294                .push(SessionRecord::ProviderCache(cache.clone()));
1295        }
1296        Ok(())
1297    }
1298
1299    async fn suspend(
1300        &self,
1301        context: &mut AgentContext,
1302        step: usize,
1303        reason: SuspendReason,
1304        pending_calls: Vec<PlannedSkillCall>,
1305        resume_token: ResumeToken,
1306    ) -> Result<EngineOutcome, EngineError> {
1307        let pending = PendingApprovalRecord {
1308            resume_token: resume_token.clone(),
1309            created_at: SystemTime::now(),
1310            trigger_id: context.metadata.trigger_id.clone(),
1311            step,
1312            reason: reason.clone(),
1313            pending_calls,
1314        };
1315        self.memory
1316            .append_pending_approval(&context.session_id, pending.clone())
1317            .await?;
1318        context
1319            .records
1320            .push(SessionRecord::PendingApproval(pending.clone()));
1321        self.finish(
1322            context,
1323            StopReason::Suspended,
1324            None,
1325            Some(match reason {
1326                SuspendReason::HumanApprovalRequired { .. } => {
1327                    "human approval required".to_string()
1328                }
1329                SuspendReason::ProviderRequested { message } => message,
1330            }),
1331            step,
1332            Some(resume_token),
1333        )
1334        .await
1335    }
1336
1337    #[instrument(
1338        skip(self, context, calls),
1339        fields(
1340            session_id = %context.session_id,
1341            trigger_id = %context.metadata.trigger_id,
1342            step,
1343            call_count = calls.len(),
1344            max_parallel = context.metadata.policy.max_parallel_skill_calls
1345        )
1346    )]
1347    async fn execute_planned_calls(
1348        &self,
1349        context: &AgentContext,
1350        step: usize,
1351        calls: Vec<PlannedSkillCall>,
1352        approval_override: bool,
1353    ) -> Result<BatchExecution, EngineError> {
1354        if !approval_override {
1355            let approval_calls = calls
1356                .iter()
1357                .filter_map(|call| {
1358                    let skill = self.skills.get(&call.name)?;
1359                    skill
1360                        .backend
1361                        .requires_human_approval()
1362                        .then_some(call.name.clone())
1363                })
1364                .collect::<Vec<_>>();
1365            if !approval_calls.is_empty() {
1366                counter!("rain_engine.approval_suspensions_total").increment(1);
1367                return Ok(BatchExecution::Suspended {
1368                    reason: SuspendReason::HumanApprovalRequired {
1369                        skill_names: approval_calls,
1370                    },
1371                    pending_calls: calls,
1372                    resume_token: ResumeToken(Uuid::new_v4().to_string()),
1373                });
1374            }
1375        }
1376
1377        let graph = existing_or_new_graph(context, step, &calls);
1378        if !context.records.iter().any(|record| {
1379            matches!(
1380                record,
1381                SessionRecord::ToolExecutionGraph(existing) if existing.graph_id == graph.graph_id
1382            )
1383        }) {
1384            self.memory
1385                .append_tool_execution_graph(&context.session_id, graph.clone())
1386                .await?;
1387            for node in &graph.nodes {
1388                self.append_tool_checkpoint(context, &graph, node, ToolNodeStatus::Queued, 0, None)
1389                    .await?;
1390            }
1391        }
1392
1393        let mut status_by_call = latest_tool_statuses(context, &graph.graph_id);
1394        let mut attempts_by_call = started_attempt_counts(context, &graph.graph_id);
1395        let mut results_by_call = context
1396            .records
1397            .iter()
1398            .filter_map(|record| match record {
1399                SessionRecord::ToolResult(result) => Some((result.call_id.clone(), result.clone())),
1400                _ => None,
1401            })
1402            .collect::<HashMap<_, _>>();
1403        let mut new_results = Vec::<ToolResultRecord>::new();
1404        let max_parallel = context
1405            .metadata
1406            .policy
1407            .max_parallel_skill_calls
1408            .max(1)
1409            .min(context.metadata.policy.max_ready_tool_nodes.max(1));
1410        gauge!("rain_engine.registered_skills").set(self.skills.len() as f64);
1411
1412        loop {
1413            let skipped = self
1414                .skip_blocked_nodes(
1415                    context,
1416                    &graph,
1417                    &mut status_by_call,
1418                    &mut results_by_call,
1419                    &mut new_results,
1420                    step,
1421                )
1422                .await?;
1423            let ready = ready_nodes(&graph, &status_by_call)
1424                .into_iter()
1425                .take(context.metadata.policy.max_ready_tool_nodes.max(1))
1426                .collect::<Vec<_>>();
1427            if ready.is_empty() {
1428                if !skipped {
1429                    break;
1430                }
1431                continue;
1432            }
1433
1434            let mut join_set = JoinSet::new();
1435            for node in ready.into_iter().take(max_parallel) {
1436                let prepared = self
1437                    .prepare_node(context, &graph, &node, step, &mut attempts_by_call)
1438                    .await?;
1439                match prepared {
1440                    PreparedNode::Executable(prepared) => {
1441                        join_set.spawn(run_prepared_call(*prepared));
1442                    }
1443                    PreparedNode::Immediate(result) => {
1444                        let status = final_status_for_result(&result);
1445                        status_by_call.insert(node.call_id.clone(), status);
1446                        results_by_call.insert(result.call_id.clone(), result.clone());
1447                        new_results.push(result);
1448                    }
1449                }
1450            }
1451
1452            while let Some(joined) = join_set.join_next().await {
1453                let result = joined.map_err(|err| EngineError::Blob(err.to_string()))??;
1454                let Some(node) = graph
1455                    .nodes
1456                    .iter()
1457                    .find(|node| node.call_id == result.call_id)
1458                else {
1459                    continue;
1460                };
1461                let status = final_status_for_result(&result);
1462                self.append_tool_checkpoint(
1463                    context,
1464                    &graph,
1465                    node,
1466                    status.clone(),
1467                    *attempts_by_call.get(&node.call_id).unwrap_or(&1),
1468                    result_detail(&result),
1469                )
1470                .await?;
1471                status_by_call.insert(node.call_id.clone(), status);
1472                results_by_call.insert(result.call_id.clone(), result.clone());
1473                new_results.push(result);
1474            }
1475        }
1476
1477        let ordered = graph
1478            .nodes
1479            .iter()
1480            .filter_map(|node| {
1481                new_results
1482                    .iter()
1483                    .find(|result| result.call_id == node.call_id)
1484                    .cloned()
1485            })
1486            .collect::<Vec<_>>();
1487        let any_success = ordered.iter().any(|result| result.output.is_ok());
1488        Ok(BatchExecution::Executed(ExecutedBatch {
1489            results: ordered,
1490            all_failed: !any_success,
1491        }))
1492    }
1493
1494    async fn append_tool_checkpoint(
1495        &self,
1496        context: &AgentContext,
1497        graph: &ToolExecutionGraph,
1498        node: &ToolNode,
1499        status: ToolNodeStatus,
1500        attempt: usize,
1501        detail: Option<String>,
1502    ) -> Result<(), EngineError> {
1503        let record = ToolNodeCheckpointRecord {
1504            checkpoint_id: Uuid::new_v4().to_string(),
1505            graph_id: graph.graph_id.clone(),
1506            call_id: node.call_id.clone(),
1507            skill_name: node.skill_name.clone(),
1508            step: graph.step,
1509            status,
1510            attempt,
1511            occurred_at: SystemTime::now(),
1512            detail,
1513        };
1514        self.memory
1515            .append_tool_node_checkpoint(&context.session_id, record)
1516            .await?;
1517        Ok(())
1518    }
1519
1520    async fn skip_blocked_nodes(
1521        &self,
1522        context: &AgentContext,
1523        graph: &ToolExecutionGraph,
1524        status_by_call: &mut HashMap<String, ToolNodeStatus>,
1525        results_by_call: &mut HashMap<String, ToolResultRecord>,
1526        new_results: &mut Vec<ToolResultRecord>,
1527        step: usize,
1528    ) -> Result<bool, EngineError> {
1529        let mut changed = false;
1530        for node in &graph.nodes {
1531            if is_terminal_status(status_by_call.get(&node.call_id)) {
1532                continue;
1533            }
1534            let blocked_by = node.dependencies.iter().find(|dependency| {
1535                matches!(
1536                    status_by_call.get(&dependency.call_id),
1537                    Some(ToolNodeStatus::Failed)
1538                        | Some(ToolNodeStatus::Skipped)
1539                        | Some(ToolNodeStatus::TimedOut)
1540                )
1541            });
1542            if let Some(blocked_by) = blocked_by {
1543                let message = format!("dependency `{}` did not succeed", blocked_by.call_id);
1544                let result = self.error_result(
1545                    node.call_id.clone(),
1546                    node.skill_name.clone(),
1547                    SkillFailureKind::Internal,
1548                    message.clone(),
1549                );
1550                self.append_tool_checkpoint(
1551                    context,
1552                    graph,
1553                    node,
1554                    ToolNodeStatus::Skipped,
1555                    0,
1556                    Some(message),
1557                )
1558                .await?;
1559                status_by_call.insert(node.call_id.clone(), ToolNodeStatus::Skipped);
1560                results_by_call.insert(node.call_id.clone(), result.clone());
1561                new_results.push(result);
1562                let _ = step;
1563                changed = true;
1564            }
1565        }
1566        Ok(changed)
1567    }
1568
1569    async fn prepare_node(
1570        &self,
1571        context: &AgentContext,
1572        graph: &ToolExecutionGraph,
1573        node: &ToolNode,
1574        step: usize,
1575        attempts_by_call: &mut HashMap<String, usize>,
1576    ) -> Result<PreparedNode, EngineError> {
1577        let Some(skill) = self
1578            .skills
1579            .get(&node.skill_name)
1580            .map(|entry| entry.value().clone())
1581        else {
1582            self.append_validation(
1583                context,
1584                graph,
1585                node,
1586                false,
1587                vec![format!("skill `{}` is not registered", node.skill_name)],
1588            )
1589            .await?;
1590            self.append_tool_checkpoint(
1591                context,
1592                graph,
1593                node,
1594                ToolNodeStatus::Failed,
1595                0,
1596                Some(format!("skill `{}` is not registered", node.skill_name)),
1597            )
1598            .await?;
1599            return Ok(PreparedNode::Immediate(self.error_result(
1600                node.call_id.clone(),
1601                node.skill_name.clone(),
1602                SkillFailureKind::Internal,
1603                format!("skill `{}` is not registered", node.skill_name),
1604            )));
1605        };
1606
1607        if context.metadata.policy.validate_tool_args {
1608            let errors = validate_against_schema(&node.args, &skill.manifest.input_schema);
1609            self.append_validation(context, graph, node, errors.is_empty(), errors.clone())
1610                .await?;
1611            if !errors.is_empty() {
1612                let message = errors.join("; ");
1613                self.append_tool_checkpoint(
1614                    context,
1615                    graph,
1616                    node,
1617                    ToolNodeStatus::Failed,
1618                    0,
1619                    Some(message.clone()),
1620                )
1621                .await?;
1622                return Ok(PreparedNode::Immediate(self.error_result(
1623                    node.call_id.clone(),
1624                    node.skill_name.clone(),
1625                    SkillFailureKind::InvalidArguments,
1626                    message,
1627                )));
1628            }
1629        }
1630        self.append_tool_checkpoint(context, graph, node, ToolNodeStatus::Validated, 0, None)
1631            .await?;
1632
1633        if !skill
1634            .manifest
1635            .required_scopes
1636            .iter()
1637            .all(|scope| context.granted_scopes.contains(scope))
1638        {
1639            counter!("rain_engine.permission_denials_total").increment(1);
1640            self.append_tool_checkpoint(
1641                context,
1642                graph,
1643                node,
1644                ToolNodeStatus::Failed,
1645                0,
1646                Some(format!(
1647                    "missing required scopes for skill `{}`",
1648                    node.skill_name
1649                )),
1650            )
1651            .await?;
1652            return Ok(PreparedNode::Immediate(self.error_result(
1653                node.call_id.clone(),
1654                node.skill_name.clone(),
1655                SkillFailureKind::PermissionDenied,
1656                format!("missing required scopes for skill `{}`", node.skill_name),
1657            )));
1658        }
1659
1660        if matches!(skill.backend, RegisteredSkillBackend::Native(_))
1661            && !context.metadata.policy.allow_native_skills
1662        {
1663            self.append_tool_checkpoint(
1664                context,
1665                graph,
1666                node,
1667                ToolNodeStatus::Failed,
1668                0,
1669                Some("native skills are disabled by policy".to_string()),
1670            )
1671            .await?;
1672            return Ok(PreparedNode::Immediate(self.error_result(
1673                node.call_id.clone(),
1674                node.skill_name.clone(),
1675                SkillFailureKind::PermissionDenied,
1676                "native skills are disabled by policy".to_string(),
1677            )));
1678        }
1679
1680        let mut manifest = skill.manifest.clone();
1681        manifest.resource_policy = manifest.effective_resource_policy(&context.metadata.policy);
1682        if node.dry_run
1683            && (!context.metadata.policy.enable_tool_dry_run
1684                || !manifest.resource_policy.dry_run_supported)
1685        {
1686            self.append_tool_checkpoint(
1687                context,
1688                graph,
1689                node,
1690                ToolNodeStatus::Failed,
1691                0,
1692                Some("dry-run execution is not enabled for this skill".to_string()),
1693            )
1694            .await?;
1695            return Ok(PreparedNode::Immediate(self.error_result(
1696                node.call_id.clone(),
1697                node.skill_name.clone(),
1698                SkillFailureKind::CapabilityDenied,
1699                "dry-run execution is not enabled for this skill".to_string(),
1700            )));
1701        }
1702
1703        if self.is_skill_circuit_broken(&node.skill_name, context) {
1704            counter!("rain_engine.circuit_breaker_trips_total", "skill" => node.skill_name.clone())
1705                .increment(1);
1706            self.append_tool_checkpoint(
1707                context,
1708                graph,
1709                node,
1710                ToolNodeStatus::Failed,
1711                0,
1712                Some(format!(
1713                    "circuit breaker tripped for skill `{}`",
1714                    node.skill_name
1715                )),
1716            )
1717            .await?;
1718            return Ok(PreparedNode::Immediate(self.error_result(
1719                node.call_id.clone(),
1720                node.skill_name.clone(),
1721                SkillFailureKind::CapabilityDenied,
1722                format!("circuit breaker tripped for skill `{}`", node.skill_name),
1723            )));
1724        }
1725
1726        let attempt = attempts_by_call.entry(node.call_id.clone()).or_insert(0);
1727        *attempt += 1;
1728        self.append_tool_checkpoint(
1729            context,
1730            graph,
1731            node,
1732            ToolNodeStatus::Started,
1733            *attempt,
1734            None,
1735        )
1736        .await?;
1737
1738        let call_record = ToolCallRecord {
1739            call_id: node.call_id.clone(),
1740            step,
1741            called_at: SystemTime::now(),
1742            skill_name: skill.manifest.name.clone(),
1743            args: node.args.clone(),
1744            backend_kind: skill.backend.kind(),
1745        };
1746        self.memory
1747            .append_tool_call(&context.session_id, call_record)
1748            .await?;
1749        counter!(
1750            "rain_engine.tool_calls_total",
1751            "skill" => skill.manifest.name.clone(),
1752            "backend" => format!("{:?}", skill.backend.kind())
1753        )
1754        .increment(1);
1755
1756        let mut retry_policy = node.retry_policy.policy.clone();
1757        retry_policy.max_attempts = retry_policy
1758            .max_attempts
1759            .min(manifest.resource_policy.retry_policy.max_attempts)
1760            .min(context.metadata.policy.max_tool_retries_per_step);
1761
1762        Ok(PreparedNode::Executable(Box::new(PreparedCall {
1763            call_id: node.call_id.clone(),
1764            name: node.skill_name.clone(),
1765            args: node.args.clone(),
1766            manifest,
1767            backend: skill.backend.clone(),
1768            context_snapshot: context.to_snapshot(step),
1769            dry_run: node.dry_run,
1770            retry_policy,
1771        })))
1772    }
1773
1774    async fn append_validation(
1775        &self,
1776        context: &AgentContext,
1777        graph: &ToolExecutionGraph,
1778        node: &ToolNode,
1779        valid: bool,
1780        errors: Vec<String>,
1781    ) -> Result<(), EngineError> {
1782        let record = SkillInputValidationRecord {
1783            validation_id: Uuid::new_v4().to_string(),
1784            graph_id: graph.graph_id.clone(),
1785            call_id: node.call_id.clone(),
1786            skill_name: node.skill_name.clone(),
1787            validated_at: SystemTime::now(),
1788            valid,
1789            errors,
1790        };
1791        self.memory
1792            .append_skill_input_validation(&context.session_id, record)
1793            .await?;
1794        Ok(())
1795    }
1796
1797    async fn finish(
1798        &self,
1799        context: &mut AgentContext,
1800        stop_reason: StopReason,
1801        response: Option<String>,
1802        detail: Option<String>,
1803        steps_executed: usize,
1804        resume_token: Option<ResumeToken>,
1805    ) -> Result<EngineOutcome, EngineError> {
1806        let outcome = OutcomeRecord {
1807            trigger_id: context.metadata.trigger_id.clone(),
1808            idempotency_key: context.metadata.idempotency_key.clone(),
1809            finished_at: SystemTime::now(),
1810            stop_reason: stop_reason.clone(),
1811            response: response.clone(),
1812            detail: detail.clone(),
1813            steps_executed,
1814            resume_token: resume_token.clone(),
1815        };
1816        if let Err(err) = self
1817            .memory
1818            .append_outcome(&context.session_id, outcome.clone())
1819            .await
1820        {
1821            warn!(session_id = %context.session_id, "failed to record outcome: {}", err.message);
1822        }
1823
1824        context
1825            .records
1826            .push(SessionRecord::Outcome(outcome.clone()));
1827
1828        let outcome_clone = outcome.clone();
1829        self.run_self_improvement(context, outcome_clone).await?;
1830
1831        // Update the state projection cache AFTER self improvement
1832        let _ = self
1833            .state_cache
1834            .set_projection(
1835                &context.session_id,
1836                SessionSnapshot {
1837                    session_id: context.session_id.clone(),
1838                    records: context.records.clone(),
1839                    last_sequence_no: None,
1840                    latest_outcome: Some(outcome.clone()),
1841                },
1842            )
1843            .await;
1844
1845        Ok(EngineOutcome {
1846            trigger_id: outcome.trigger_id,
1847            stop_reason,
1848            response,
1849            detail,
1850            steps_executed,
1851            idempotent_replay: false,
1852            resume_token,
1853        })
1854    }
1855
1856    #[instrument(
1857        skip(self, context, outcome),
1858        fields(
1859            session_id = %context.session_id,
1860            trigger_id = %context.metadata.trigger_id,
1861            stop_reason = ?outcome.stop_reason
1862        )
1863    )]
1864    async fn run_self_improvement(
1865        &self,
1866        context: &mut AgentContext,
1867        outcome: OutcomeRecord,
1868    ) -> Result<(), EngineError> {
1869        let snapshot = context.to_snapshot(outcome.steps_executed);
1870        let session_id = context.session_id.clone();
1871        let policy = snapshot.policy.self_improvement.clone();
1872        if !policy.enabled {
1873            return Ok(());
1874        }
1875
1876        counter!("rain_engine.self_improvement_reflections_total").increment(1);
1877
1878        let observations = reflection_observations(&snapshot, &outcome);
1879        let reflection = ReflectionRecord {
1880            reflection_id: format!("reflection-{}", Uuid::new_v4()),
1881            created_at: SystemTime::now(),
1882            trigger_id: snapshot.trigger_id.clone(),
1883            summary: format!(
1884                "Observed {:?} after {} step(s); evaluating future policy and strategy.",
1885                outcome.stop_reason, outcome.steps_executed
1886            ),
1887            observations,
1888            confidence: 0.72,
1889        };
1890        self.memory
1891            .append_reflection(&session_id, reflection.clone())
1892            .await?;
1893        context.records.push(SessionRecord::Reflection(reflection));
1894
1895        for performance in summarize_tool_performance(&snapshot.history) {
1896            self.memory
1897                .append_tool_performance(&session_id, performance.clone())
1898                .await?;
1899            context
1900                .records
1901                .push(SessionRecord::ToolPerformance(performance.clone()));
1902
1903            if performance.calls > 0 {
1904                counter!(
1905                    "rain_engine.tool_performance_summaries_total",
1906                    "skill" => performance.skill_name.clone()
1907                )
1908                .increment(1);
1909            }
1910            if performance.failure_rate > 0.5 {
1911                let preference = StrategyPreferenceRecord {
1912                    preference_id: format!("strategy-{}", Uuid::new_v4()),
1913                    created_at: SystemTime::now(),
1914                    skill_name: Some(performance.skill_name.clone()),
1915                    preference: "avoid_when_alternatives_exist".to_string(),
1916                    reason: format!(
1917                        "{} failed in {:.0}% of recent calls",
1918                        performance.skill_name,
1919                        performance.failure_rate * 100.0
1920                    ),
1921                    confidence: 0.68,
1922                };
1923                self.memory
1924                    .append_strategy_preference(&session_id, preference.clone())
1925                    .await?;
1926                context
1927                    .records
1928                    .push(SessionRecord::StrategyPreference(preference));
1929            }
1930        }
1931
1932        if terminal_observation_count(&snapshot.history) < policy.min_observations_before_tuning {
1933            return Ok(());
1934        }
1935
1936        if let Some(rollback) = maybe_rollback_regression(&snapshot, &outcome) {
1937            self.memory
1938                .append_policy_tuning(&session_id, rollback.clone())
1939                .await?;
1940            context.records.push(SessionRecord::PolicyTuning(rollback));
1941            counter!("rain_engine.self_improvement_rollbacks_total").increment(1);
1942            return Ok(());
1943        }
1944
1945        let Some(tuning) = propose_policy_tuning(&snapshot, &outcome) else {
1946            return Ok(());
1947        };
1948        match tuning.action {
1949            PolicyTuningAction::Applied => {
1950                counter!("rain_engine.self_improvement_overlays_applied_total").increment(1)
1951            }
1952            PolicyTuningAction::RejectedUnsafe => {
1953                counter!("rain_engine.self_improvement_rejected_unsafe_total").increment(1)
1954            }
1955            PolicyTuningAction::Proposed | PolicyTuningAction::RolledBack => {}
1956        }
1957        self.memory
1958            .append_policy_tuning(&session_id, tuning.clone())
1959            .await?;
1960        context.records.push(SessionRecord::PolicyTuning(tuning));
1961
1962        let profile_patch = ProfilePatchRecord {
1963            patch_id: format!("profile-patch-{}", Uuid::new_v4()),
1964            created_at: SystemTime::now(),
1965            description: "No capability or scope expansion was applied automatically.".to_string(),
1966            patch: serde_json::json!({"guardrail": "privilege_expansion_requires_approval"}),
1967            requires_approval: false,
1968            applied: true,
1969        };
1970        self.memory
1971            .append_profile_patch(&session_id, profile_patch.clone())
1972            .await?;
1973        context
1974            .records
1975            .push(SessionRecord::ProfilePatch(profile_patch));
1976
1977        Ok(())
1978    }
1979
1980    fn is_skill_circuit_broken(&self, skill_name: &str, context: &AgentContext) -> bool {
1981        let performance = summarize_tool_performance(&context.records);
1982        if let Some(perf) = performance.into_iter().find(|p| p.skill_name == skill_name)
1983            && perf.calls >= 3
1984        {
1985            let threshold = self
1986                .skills
1987                .get(skill_name)
1988                .map(|s| s.value().definition().manifest.circuit_breaker_threshold)
1989                .unwrap_or(0.5);
1990            return perf.failure_rate >= threshold;
1991        }
1992        false
1993    }
1994
1995    fn error_result(
1996        &self,
1997        call_id: String,
1998        skill_name: String,
1999        kind: SkillFailureKind,
2000        message: String,
2001    ) -> ToolResultRecord {
2002        ToolResultRecord {
2003            call_id,
2004            finished_at: SystemTime::now(),
2005            skill_name,
2006            output: Err(SkillFailure { kind, message }),
2007        }
2008    }
2009
2010    async fn summarize_history(
2011        &self,
2012        snapshot: &SessionSnapshot,
2013        policy: &EnginePolicy,
2014        trigger: &AgentTrigger,
2015    ) -> Result<SummaryRecord, EngineError> {
2016        let history_text = snapshot
2017            .records
2018            .iter()
2019            .map(|r| format!("{:?}", r))
2020            .collect::<Vec<_>>()
2021            .join("\n");
2022        let prompt = format!(
2023            "Summarize the following conversation history concisely while preserving all key decisions, facts, and outcomes:\n\n{}",
2024            history_text
2025        );
2026
2027        let context = AgentContextSnapshot {
2028            session_id: snapshot.session_id.clone(),
2029            granted_scopes: Vec::new(),
2030            trigger_id: "internal".to_string(),
2031            idempotency_key: None,
2032            current_step: 0,
2033            max_steps: 0,
2034            history: snapshot.records.clone(),
2035            prior_tool_results: snapshot.tool_results(),
2036            session_cost_usd: 0.0,
2037            state: snapshot.agent_state(),
2038            policy: policy.clone(),
2039            active_execution_plan: snapshot.active_execution_plan(),
2040        };
2041
2042        let request = ProviderRequest {
2043            trigger: trigger.clone(),
2044            context,
2045            available_skills: self.skill_definitions().await,
2046            config: ProviderRequestConfig {
2047                model: None,
2048                temperature: Some(0.0),
2049                max_tokens: Some(500),
2050            },
2051            policy: policy.clone(),
2052            contents: vec![ProviderMessage {
2053                role: ProviderRole::User,
2054                parts: vec![ProviderContentPart::Text(prompt)],
2055            }],
2056        };
2057
2058        let decision = self
2059            .llm
2060            .generate_action(request)
2061            .await
2062            .map_err(|e| EngineError::Provider(e.to_string()))?;
2063
2064        if let AgentAction::Respond { content } = decision.action {
2065            Ok(SummaryRecord {
2066                summary_id: format!("summary-{}", Uuid::new_v4()),
2067                created_at: SystemTime::now(),
2068                content,
2069                original_sequence_range: (0, snapshot.records.len()),
2070            })
2071        } else {
2072            Err(EngineError::Provider(
2073                "Failed to generate summary".to_string(),
2074            ))
2075        }
2076    }
2077}
2078
2079fn classify_trigger_intent(trigger: &AgentTrigger) -> String {
2080    match trigger {
2081        AgentTrigger::ExternalEvent { source, .. } => format!("external_event:{source}"),
2082        AgentTrigger::ScheduledWake { .. } => "scheduled_wake".to_string(),
2083        AgentTrigger::HumanInput { content, .. } | AgentTrigger::Message { content, .. } => {
2084            let lowered = content.to_lowercase();
2085            if lowered.contains("approve") || lowered.contains("permission") {
2086                "approval_or_permission".to_string()
2087            } else if lowered.contains("fix")
2088                || lowered.contains("change")
2089                || lowered.contains("write")
2090            {
2091                "task_execution".to_string()
2092            } else if lowered.contains("what") || lowered.contains("why") || lowered.contains("how")
2093            {
2094                "question_answering".to_string()
2095            } else {
2096                "conversation".to_string()
2097            }
2098        }
2099        AgentTrigger::SystemObservation { source, .. } => format!("system_observation:{source}"),
2100        AgentTrigger::Webhook { source, .. } => format!("webhook:{source}"),
2101        AgentTrigger::RuleTrigger { rule_id, .. } => format!("rule:{rule_id}"),
2102        AgentTrigger::ProactiveHeartbeat { .. } => "heartbeat".to_string(),
2103        AgentTrigger::Approval { decision, .. } => format!("approval:{decision:?}"),
2104        AgentTrigger::DelegationResult { .. } => "delegation_result".to_string(),
2105    }
2106}
2107
2108fn action_metric_label(action: &AgentAction) -> &'static str {
2109    match action {
2110        AgentAction::Plan { .. } => "plan",
2111        AgentAction::Respond { .. } => "respond",
2112        AgentAction::CallSkills(_) => "call_skills",
2113        AgentAction::Continue { .. } => "continue",
2114        AgentAction::Yield { .. } => "yield",
2115        AgentAction::MemorySearch { .. } => "memory_search",
2116        AgentAction::MemoryArchive { .. } => "memory_archive",
2117        AgentAction::Suspend { .. } => "suspend",
2118        AgentAction::Delegate { .. } => "delegate",
2119    }
2120}
2121
2122fn reflection_observations(
2123    snapshot: &AgentContextSnapshot,
2124    outcome: &OutcomeRecord,
2125) -> Vec<String> {
2126    let tool_results = snapshot
2127        .history
2128        .iter()
2129        .filter(|record| matches!(record, SessionRecord::ToolResult(_)))
2130        .count();
2131    let failed_tools = snapshot
2132        .history
2133        .iter()
2134        .filter(|record| match record {
2135            SessionRecord::ToolResult(result) => result.output.is_err(),
2136            _ => false,
2137        })
2138        .count();
2139    let provider_cost = snapshot.session_cost_usd;
2140
2141    vec![
2142        format!("terminal_stop_reason={:?}", outcome.stop_reason),
2143        format!("steps_executed={}", outcome.steps_executed),
2144        format!("tool_results={tool_results}"),
2145        format!("failed_tool_results={failed_tools}"),
2146        format!("estimated_session_cost_usd={provider_cost:.6}"),
2147    ]
2148}
2149
2150fn summarize_tool_performance(records: &[SessionRecord]) -> Vec<ToolPerformanceRecord> {
2151    let calls = records
2152        .iter()
2153        .filter_map(|record| match record {
2154            SessionRecord::ToolCall(call) => Some((call.call_id.clone(), call)),
2155            _ => None,
2156        })
2157        .collect::<HashMap<_, _>>();
2158    let mut grouped = HashMap::<String, (String, usize, usize, usize)>::new();
2159
2160    for record in records {
2161        let SessionRecord::ToolResult(result) = record else {
2162            continue;
2163        };
2164        let backend = calls
2165            .get(&result.call_id)
2166            .map(|call| format!("{:?}", call.backend_kind))
2167            .unwrap_or_else(|| "unknown".to_string());
2168        let entry = grouped
2169            .entry(result.skill_name.clone())
2170            .or_insert((backend, 0, 0, 0));
2171        entry.1 += 1;
2172        if result.output.is_ok() {
2173            entry.2 += 1;
2174        } else {
2175            entry.3 += 1;
2176        }
2177    }
2178
2179    grouped
2180        .into_iter()
2181        .map(
2182            |(skill_name, (backend_kind, calls, successes, failures))| ToolPerformanceRecord {
2183                performance_id: format!("tool-performance-{}", Uuid::new_v4()),
2184                created_at: SystemTime::now(),
2185                skill_name,
2186                backend_kind,
2187                calls,
2188                successes,
2189                failures,
2190                failure_rate: if calls == 0 {
2191                    0.0
2192                } else {
2193                    failures as f64 / calls as f64
2194                },
2195            },
2196        )
2197        .collect()
2198}
2199
2200fn terminal_observation_count(records: &[SessionRecord]) -> usize {
2201    records
2202        .iter()
2203        .filter(|record| matches!(record, SessionRecord::Outcome(_)))
2204        .count()
2205}
2206
2207fn maybe_rollback_regression(
2208    snapshot: &AgentContextSnapshot,
2209    outcome: &OutcomeRecord,
2210) -> Option<PolicyTuningRecord> {
2211    if !snapshot.policy.self_improvement.rollback_on_regression {
2212        return None;
2213    }
2214    // MaxStepsReached is not a regression. Hitting the step limit at a higher budget
2215    // is progress, not a failure of the policy overlay itself.
2216    if !matches!(
2217        outcome.stop_reason,
2218        StopReason::ProviderFailure | StopReason::DeadlineExceeded | StopReason::PolicyAborted
2219    ) {
2220        return None;
2221    }
2222    let active = SessionSnapshot {
2223        session_id: snapshot.session_id.clone(),
2224        records: snapshot.history.clone(),
2225        last_sequence_no: None,
2226        latest_outcome: Some(outcome.clone()),
2227    }
2228    .active_policy_overlay()?;
2229
2230    let mut projected_policy = snapshot.policy.clone();
2231    projected_policy.self_improvement = snapshot.policy.self_improvement.clone();
2232    Some(PolicyTuningRecord {
2233        tuning_id: format!("tuning-{}", Uuid::new_v4()),
2234        created_at: SystemTime::now(),
2235        overlay: PolicyOverlay {
2236            status: PolicyOverlayStatus::RolledBack,
2237            reason: format!(
2238                "Regression detected after overlay {}; rolling back for future advances.",
2239                active.overlay_id
2240            ),
2241            ..active
2242        },
2243        action: PolicyTuningAction::RolledBack,
2244        prior_policy: snapshot.policy.clone(),
2245        projected_policy,
2246    })
2247}
2248
2249fn propose_policy_tuning(
2250    snapshot: &AgentContextSnapshot,
2251    outcome: &OutcomeRecord,
2252) -> Option<PolicyTuningRecord> {
2253    let improvement = &snapshot.policy.self_improvement;
2254    let mut patch = PolicyOverlayPatch::default();
2255    let mut reason = None::<String>;
2256    let delta = improvement.max_policy_delta_percent.clamp(1.0, 100.0);
2257
2258    match outcome.stop_reason {
2259        StopReason::ProviderFailure
2260            if outcome
2261                .detail
2262                .as_deref()
2263                .map(|detail| detail.to_ascii_lowercase().contains("timeout"))
2264                .unwrap_or(false) =>
2265        {
2266            patch.provider_timeout_ms = Some(increase_by_percent(
2267                snapshot.policy.provider_timeout_ms,
2268                delta,
2269            ));
2270            reason = Some(
2271                "Provider timed out; increasing provider timeout within guardrails.".to_string(),
2272            );
2273        }
2274        StopReason::MaxStepsReached => {
2275            // Scale up aggressively if we keep hitting the step limit (delta, then 2x delta, etc.)
2276            // We use the terminal observation count as a multiplier
2277            let multiplier = terminal_observation_count(&snapshot.history).max(1) as f64;
2278            let aggressive_delta = (delta * multiplier).min(200.0); // Cap at 200% increase per try
2279            patch.max_steps = Some(increase_usize_by_percent(
2280                snapshot.policy.max_steps,
2281                aggressive_delta,
2282            ));
2283            reason = Some(
2284                "Session hit max steps; increasing future step budget within guardrails."
2285                    .to_string(),
2286            );
2287        }
2288        StopReason::DeadlineExceeded => {
2289            patch.max_execution_time_ms = Some(increase_by_percent(
2290                snapshot.policy.max_execution_time_ms,
2291                delta,
2292            ));
2293            reason = Some("Execution deadline was reached; increasing future wall-clock budget within guardrails.".to_string());
2294        }
2295        StopReason::PolicyAborted
2296            if outcome
2297                .detail
2298                .as_deref()
2299                .map(|detail| detail.contains("cost"))
2300                .unwrap_or(false) =>
2301        {
2302            reason = Some(
2303                "Cost limit was reached; automatic cost-limit increases are blocked.".to_string(),
2304            );
2305        }
2306        _ => {}
2307    }
2308
2309    let reason = reason?;
2310    let mut overlay = PolicyOverlay {
2311        overlay_id: format!("overlay-{}", Uuid::new_v4()),
2312        created_at: SystemTime::now(),
2313        status: match improvement.mode {
2314            SelfImprovementMode::Advisory => PolicyOverlayStatus::Proposed,
2315            SelfImprovementMode::AutoWithGuardrails => PolicyOverlayStatus::Applied,
2316            SelfImprovementMode::Shadow => PolicyOverlayStatus::Proposed, // Will be elevated to Applied by the shadow task
2317        },
2318        reason,
2319        evidence_window_records: snapshot.history.len(),
2320        patch,
2321        confidence: 0.74,
2322        rollback_condition:
2323            "Rollback if the next terminal outcome regresses to a policy/provider failure."
2324                .to_string(),
2325    };
2326
2327    let action = if outcome
2328        .detail
2329        .as_deref()
2330        .map(|detail| detail.contains("cost"))
2331        .unwrap_or(false)
2332    {
2333        overlay.status = PolicyOverlayStatus::Rejected;
2334        PolicyTuningAction::RejectedUnsafe
2335    } else {
2336        match improvement.mode {
2337            SelfImprovementMode::Advisory => PolicyTuningAction::Proposed,
2338            SelfImprovementMode::Shadow => {
2339                overlay.status = PolicyOverlayStatus::Proposed; // Start as proposed in shadow mode
2340                PolicyTuningAction::Proposed
2341            }
2342            SelfImprovementMode::AutoWithGuardrails => PolicyTuningAction::Applied,
2343        }
2344    };
2345
2346    let mut projected_policy = snapshot.policy.clone();
2347    overlay.apply_to(&mut projected_policy);
2348
2349    Some(PolicyTuningRecord {
2350        tuning_id: format!("tuning-{}", Uuid::new_v4()),
2351        created_at: SystemTime::now(),
2352        overlay,
2353        action,
2354        prior_policy: snapshot.policy.clone(),
2355        projected_policy,
2356    })
2357}
2358
2359fn increase_by_percent(value: u64, percent: f64) -> u64 {
2360    ((value.max(1) as f64) * (1.0 + percent / 100.0)).ceil() as u64
2361}
2362
2363fn increase_usize_by_percent(value: usize, percent: f64) -> usize {
2364    ((value.max(1) as f64) * (1.0 + percent / 100.0)).ceil() as usize
2365}
2366
2367fn existing_or_new_graph(
2368    context: &AgentContext,
2369    step: usize,
2370    calls: &[PlannedSkillCall],
2371) -> ToolExecutionGraph {
2372    let call_ids = calls
2373        .iter()
2374        .map(|call| call.call_id.as_str())
2375        .collect::<BTreeSet<_>>();
2376    if let Some(graph) = context
2377        .records
2378        .iter()
2379        .rev()
2380        .find_map(|record| match record {
2381            SessionRecord::ToolExecutionGraph(graph)
2382                if graph.step == step
2383                    && graph
2384                        .nodes
2385                        .iter()
2386                        .map(|node| node.call_id.as_str())
2387                        .collect::<BTreeSet<_>>()
2388                        == call_ids =>
2389            {
2390                Some(graph.clone())
2391            }
2392            _ => None,
2393        })
2394    {
2395        return graph;
2396    }
2397
2398    ToolExecutionGraph {
2399        graph_id: format!("{}:{step}", context.metadata.trigger_id),
2400        trigger_id: context.metadata.trigger_id.clone(),
2401        step,
2402        created_at: SystemTime::now(),
2403        nodes: calls
2404            .iter()
2405            .enumerate()
2406            .map(|(provider_order, call)| ToolNode {
2407                call_id: call.call_id.clone(),
2408                skill_name: call.name.clone(),
2409                args: call.args.clone(),
2410                priority: call.priority,
2411                dependencies: call
2412                    .depends_on
2413                    .iter()
2414                    .map(|call_id| ToolDependency {
2415                        call_id: call_id.clone(),
2416                    })
2417                    .collect(),
2418                retry_policy: call.retry_policy.clone(),
2419                dry_run: call.dry_run,
2420                provider_order,
2421            })
2422            .collect(),
2423    }
2424}
2425
2426fn latest_tool_statuses(context: &AgentContext, graph_id: &str) -> HashMap<String, ToolNodeStatus> {
2427    let mut statuses = HashMap::new();
2428    for record in &context.records {
2429        if let SessionRecord::ToolNodeCheckpoint(checkpoint) = record
2430            && checkpoint.graph_id == graph_id
2431        {
2432            statuses.insert(checkpoint.call_id.clone(), checkpoint.status.clone());
2433        }
2434    }
2435    statuses
2436}
2437
2438fn started_attempt_counts(context: &AgentContext, graph_id: &str) -> HashMap<String, usize> {
2439    let mut attempts = HashMap::<String, usize>::new();
2440    for record in &context.records {
2441        if let SessionRecord::ToolNodeCheckpoint(checkpoint) = record
2442            && checkpoint.graph_id == graph_id
2443            && checkpoint.status == ToolNodeStatus::Started
2444        {
2445            let current = attempts.entry(checkpoint.call_id.clone()).or_default();
2446            *current = (*current).max(checkpoint.attempt);
2447        }
2448    }
2449    attempts
2450}
2451
2452fn ready_nodes(
2453    graph: &ToolExecutionGraph,
2454    status_by_call: &HashMap<String, ToolNodeStatus>,
2455) -> Vec<ToolNode> {
2456    let mut nodes = graph
2457        .nodes
2458        .iter()
2459        .filter(|node| !is_terminal_status(status_by_call.get(&node.call_id)))
2460        .filter(|node| {
2461            node.dependencies.iter().all(|dependency| {
2462                matches!(
2463                    status_by_call.get(&dependency.call_id),
2464                    Some(ToolNodeStatus::Succeeded)
2465                )
2466            })
2467        })
2468        .cloned()
2469        .collect::<Vec<_>>();
2470    nodes.sort_by(|left, right| {
2471        right
2472            .priority
2473            .cmp(&left.priority)
2474            .then(left.provider_order.cmp(&right.provider_order))
2475            .then(left.call_id.cmp(&right.call_id))
2476    });
2477    nodes
2478}
2479
2480fn is_terminal_status(status: Option<&ToolNodeStatus>) -> bool {
2481    matches!(
2482        status,
2483        Some(
2484            ToolNodeStatus::Succeeded
2485                | ToolNodeStatus::Failed
2486                | ToolNodeStatus::Skipped
2487                | ToolNodeStatus::TimedOut
2488        )
2489    )
2490}
2491
2492fn final_status_for_result(result: &ToolResultRecord) -> ToolNodeStatus {
2493    match &result.output {
2494        Ok(_) => ToolNodeStatus::Succeeded,
2495        Err(error) if error.kind == SkillFailureKind::Timeout => ToolNodeStatus::TimedOut,
2496        Err(_) => ToolNodeStatus::Failed,
2497    }
2498}
2499
2500fn result_detail(result: &ToolResultRecord) -> Option<String> {
2501    match &result.output {
2502        Ok(_) => None,
2503        Err(error) => Some(error.message.clone()),
2504    }
2505}
2506
2507fn validate_against_schema(value: &serde_json::Value, schema: &serde_json::Value) -> Vec<String> {
2508    let schema_type = schema.get("type").and_then(serde_json::Value::as_str);
2509    let mut errors = Vec::new();
2510    if let Some(schema_type) = schema_type
2511        && !json_type_matches(value, schema_type)
2512    {
2513        errors.push(format!("expected root type `{schema_type}`"));
2514        return errors;
2515    }
2516
2517    if schema_type == Some("object") {
2518        let Some(object) = value.as_object() else {
2519            return vec!["expected root object".to_string()];
2520        };
2521        if let Some(required) = schema.get("required").and_then(serde_json::Value::as_array) {
2522            for required_key in required.iter().filter_map(serde_json::Value::as_str) {
2523                if !object.contains_key(required_key) {
2524                    errors.push(format!("missing required property `{required_key}`"));
2525                }
2526            }
2527        }
2528        if let Some(properties) = schema
2529            .get("properties")
2530            .and_then(serde_json::Value::as_object)
2531        {
2532            for (key, property_schema) in properties {
2533                let Some(property_value) = object.get(key) else {
2534                    continue;
2535                };
2536                if let Some(property_type) = property_schema
2537                    .get("type")
2538                    .and_then(serde_json::Value::as_str)
2539                    && !json_type_matches(property_value, property_type)
2540                {
2541                    errors.push(format!("property `{key}` expected type `{property_type}`"));
2542                }
2543            }
2544        }
2545    }
2546    errors
2547}
2548
2549fn json_type_matches(value: &serde_json::Value, expected: &str) -> bool {
2550    match expected {
2551        "object" => value.is_object(),
2552        "array" => value.is_array(),
2553        "string" => value.is_string(),
2554        "number" => value.is_number(),
2555        "integer" => value.as_i64().is_some() || value.as_u64().is_some(),
2556        "boolean" => value.is_boolean(),
2557        "null" => value.is_null(),
2558        _ => true,
2559    }
2560}
2561
2562struct PreparedCall {
2563    call_id: String,
2564    name: String,
2565    args: serde_json::Value,
2566    manifest: SkillManifest,
2567    backend: RegisteredSkillBackend,
2568    context_snapshot: crate::AgentContextSnapshot,
2569    dry_run: bool,
2570    retry_policy: RetryPolicy,
2571}
2572
2573enum PreparedNode {
2574    Executable(Box<PreparedCall>),
2575    Immediate(ToolResultRecord),
2576}
2577
2578#[derive(Debug)]
2579struct ExecutedBatch {
2580    results: Vec<ToolResultRecord>,
2581    all_failed: bool,
2582}
2583
2584#[derive(Debug)]
2585enum BatchExecution {
2586    Executed(ExecutedBatch),
2587    Suspended {
2588        reason: SuspendReason,
2589        pending_calls: Vec<PlannedSkillCall>,
2590        resume_token: ResumeToken,
2591    },
2592}
2593
2594async fn run_prepared_call(prepared: PreparedCall) -> Result<ToolResultRecord, EngineError> {
2595    let started = Instant::now();
2596    let mut output = Err(SkillExecutionError::new(
2597        SkillFailureKind::Internal,
2598        "tool was not attempted",
2599    ));
2600    let mut current_interval_ms = prepared.retry_policy.initial_interval_ms;
2601    for attempt in 0..prepared.retry_policy.max_attempts {
2602        let invocation = SkillInvocation {
2603            call_id: prepared.call_id.clone(),
2604            manifest: prepared.manifest.clone(),
2605            args: prepared.args.clone(),
2606            context: prepared.context_snapshot.clone(),
2607            dry_run: prepared.dry_run,
2608        };
2609        output = match &prepared.backend {
2610            RegisteredSkillBackend::Wasm(executor) => executor.execute(invocation).await,
2611            RegisteredSkillBackend::Native(executor) => executor.execute(invocation).await,
2612        };
2613        if output.is_ok()
2614            || !is_retryable_skill_error(&output)
2615            || attempt + 1 >= prepared.retry_policy.max_attempts
2616        {
2617            break;
2618        }
2619        tokio::time::sleep(std::time::Duration::from_millis(current_interval_ms)).await;
2620        current_interval_ms =
2621            ((current_interval_ms as f64) * prepared.retry_policy.backoff_multiplier) as u64;
2622        current_interval_ms = current_interval_ms.min(prepared.retry_policy.max_interval_ms);
2623    }
2624    histogram!("rain_engine.tool_latency_seconds").record(started.elapsed().as_secs_f64());
2625
2626    let output = match output {
2627        Ok(value) => Ok(value),
2628        Err(err) => {
2629            match err.kind {
2630                SkillFailureKind::PermissionDenied | SkillFailureKind::CapabilityDenied => {
2631                    counter!("rain_engine.permission_denials_total").increment(1);
2632                }
2633                SkillFailureKind::Trap | SkillFailureKind::MemoryLimitExceeded => {
2634                    counter!("rain_engine.wasm_traps_total").increment(1);
2635                }
2636                SkillFailureKind::Timeout => {
2637                    counter!("rain_engine.tool_timeouts_total").increment(1);
2638                }
2639                SkillFailureKind::InvalidArguments
2640                | SkillFailureKind::InvalidResponse
2641                | SkillFailureKind::Internal => {}
2642            }
2643            Err(SkillFailure {
2644                kind: err.kind,
2645                message: err.message,
2646            })
2647        }
2648    };
2649
2650    Ok(ToolResultRecord {
2651        call_id: prepared.call_id,
2652        finished_at: SystemTime::now(),
2653        skill_name: prepared.name,
2654        output,
2655    })
2656}
2657
2658fn is_retryable_skill_error(output: &Result<serde_json::Value, SkillExecutionError>) -> bool {
2659    matches!(
2660        output,
2661        Err(SkillExecutionError {
2662            kind: SkillFailureKind::Timeout | SkillFailureKind::Internal | SkillFailureKind::Trap,
2663            ..
2664        })
2665    )
2666}
2667
2668fn build_advance_result(
2669    outcome: EngineOutcome,
2670    emitted_events: Vec<KernelEventRecord>,
2671) -> AdvanceResult {
2672    let wake_request = emitted_events.iter().find_map(extract_wake_request);
2673    let state_delta = derive_state_delta(&emitted_events);
2674    AdvanceResult {
2675        outcome: Some(outcome),
2676        emitted_events,
2677        state_delta,
2678        wake_request,
2679    }
2680}
2681
2682fn extract_wake_request(event: &KernelEventRecord) -> Option<WakeRequestRecord> {
2683    match &event.event {
2684        KernelEvent::WakeRequested(wake) | KernelEvent::WakeScheduled(wake) => Some(wake.clone()),
2685        _ => None,
2686    }
2687}
2688
2689fn derive_state_delta(events: &[KernelEventRecord]) -> AgentStateDelta {
2690    let mut delta = AgentStateDelta::default();
2691    for event in events {
2692        match &event.event {
2693            KernelEvent::GoalCreated(goal) => delta.created_goal_ids.push(goal.goal_id.clone()),
2694            KernelEvent::TaskPlanned(task) => delta.updated_task_ids.push(task.task_id.clone()),
2695            KernelEvent::TaskClaimed { task_id, .. } => {
2696                delta.updated_task_ids.push(task_id.clone())
2697            }
2698            KernelEvent::TaskBlocked { task_id, .. }
2699            | KernelEvent::TaskCompleted { task_id, .. }
2700            | KernelEvent::TaskFailed { task_id, .. }
2701            | KernelEvent::TaskAbandoned { task_id, .. } => {
2702                delta.updated_task_ids.push(task_id.clone())
2703            }
2704            KernelEvent::HumanInputRequested { task_id, .. } => {
2705                if let Some(task_id) = task_id {
2706                    delta.updated_task_ids.push(task_id.clone());
2707                }
2708            }
2709            KernelEvent::ObservationAppended(observation) => delta
2710                .observation_ids
2711                .push(observation.observation_id.clone()),
2712            KernelEvent::ArtifactProduced(artifact) => {
2713                delta.artifact_ids.push(artifact.artifact_id.clone())
2714            }
2715            KernelEvent::DelegationRequested(record) => delta
2716                .delegation_correlation_ids
2717                .push(record.correlation_id.clone()),
2718            KernelEvent::DelegationResolved { correlation_id, .. } => delta
2719                .delegation_correlation_ids
2720                .push(correlation_id.clone()),
2721            KernelEvent::WakeRequested(_)
2722            | KernelEvent::WakeScheduled(_)
2723            | KernelEvent::WakeCompleted { .. }
2724            | KernelEvent::ResourceRegistered(_)
2725            | KernelEvent::RelationshipObserved(_)
2726            | KernelEvent::MemorySearched { .. }
2727            | KernelEvent::MemoryArchived { .. } => {}
2728        }
2729    }
2730    delta
2731}
2732
2733fn derive_trigger_kernel_events(
2734    trigger_id: &str,
2735    trigger: &AgentTrigger,
2736) -> Vec<KernelEventRecord> {
2737    let mut events = Vec::new();
2738    let observed_at = SystemTime::now();
2739    let mut push_observation = |source: String,
2740                                content: serde_json::Value,
2741                                attachments: Vec<String>| {
2742        events.push(KernelEventRecord {
2743            event_id: format!("observation-{trigger_id}-{}", events.len()),
2744            occurred_at: observed_at,
2745            event: KernelEvent::ObservationAppended(crate::ObservationRecord {
2746                observation_id: crate::ObservationId(format!("{trigger_id}-obs-{}", events.len())),
2747                recorded_at: observed_at,
2748                source,
2749                content,
2750                attachment_ids: attachments,
2751                related_resources: Vec::new(),
2752            }),
2753        });
2754    };
2755
2756    match trigger {
2757        AgentTrigger::ExternalEvent {
2758            source,
2759            payload,
2760            attachments,
2761        } => push_observation(
2762            format!("external:{source}"),
2763            payload.clone(),
2764            attachments
2765                .iter()
2766                .map(|attachment| attachment.attachment_id.clone())
2767                .collect(),
2768        ),
2769        AgentTrigger::HumanInput {
2770            actor_id,
2771            content,
2772            attachments,
2773        } => push_observation(
2774            format!("human:{actor_id}"),
2775            serde_json::json!({ "content": content }),
2776            attachments
2777                .iter()
2778                .map(|attachment| attachment.attachment_id.clone())
2779                .collect(),
2780        ),
2781        AgentTrigger::SystemObservation {
2782            source,
2783            observation,
2784            attachments,
2785        } => push_observation(
2786            format!("system:{source}"),
2787            observation.clone(),
2788            attachments
2789                .iter()
2790                .map(|attachment| attachment.attachment_id.clone())
2791                .collect(),
2792        ),
2793        AgentTrigger::Webhook {
2794            source,
2795            payload,
2796            attachments,
2797        } => push_observation(
2798            format!("webhook:{source}"),
2799            payload.clone(),
2800            attachments
2801                .iter()
2802                .map(|attachment| attachment.attachment_id.clone())
2803                .collect(),
2804        ),
2805        AgentTrigger::RuleTrigger {
2806            rule_id,
2807            context,
2808            attachments,
2809        } => push_observation(
2810            format!("rule:{rule_id}"),
2811            context.clone(),
2812            attachments
2813                .iter()
2814                .map(|attachment| attachment.attachment_id.clone())
2815                .collect(),
2816        ),
2817        AgentTrigger::ProactiveHeartbeat { timestamp, .. } => push_observation(
2818            "heartbeat".to_string(),
2819            serde_json::json!({ "timestamp": timestamp }),
2820            Vec::new(),
2821        ),
2822        AgentTrigger::ScheduledWake {
2823            wake_id, reason, ..
2824        } => events.push(KernelEventRecord {
2825            event_id: format!("wake-completed-{trigger_id}"),
2826            occurred_at: observed_at,
2827            event: KernelEvent::WakeCompleted {
2828                wake_id: wake_id.clone(),
2829                reason: reason.clone(),
2830                completed_at: observed_at,
2831            },
2832        }),
2833        AgentTrigger::Message {
2834            user_id,
2835            content,
2836            attachments,
2837        } => push_observation(
2838            format!("message:{user_id}"),
2839            serde_json::json!({ "content": content }),
2840            attachments
2841                .iter()
2842                .map(|attachment| attachment.attachment_id.clone())
2843                .collect(),
2844        ),
2845        AgentTrigger::DelegationResult {
2846            correlation_id,
2847            payload,
2848            metadata,
2849        } => events.push(KernelEventRecord {
2850            event_id: format!("delegation-resolved-{trigger_id}"),
2851            occurred_at: observed_at,
2852            event: KernelEvent::DelegationResolved {
2853                correlation_id: correlation_id.clone(),
2854                resolved_at: observed_at,
2855                payload: payload.clone(),
2856                metadata: metadata.clone(),
2857            },
2858        }),
2859        AgentTrigger::Approval { .. } => {}
2860    }
2861
2862    events
2863}
2864
2865fn build_provider_contents(
2866    trigger: &AgentTrigger,
2867    history: &[SessionRecord],
2868    active_plan: Option<&ExecutionPlanRecord>,
2869) -> Vec<ProviderMessage> {
2870    let mut messages = Vec::new();
2871    let mut intent_by_trigger = HashMap::<String, String>::new();
2872    for record in history {
2873        match record {
2874            SessionRecord::Trigger(trigger) => {
2875                if let Some(intent) = &trigger.intent {
2876                    intent_by_trigger.insert(trigger.trigger_id.clone(), intent.clone());
2877                }
2878            }
2879            SessionRecord::TriggerIntent(intent) => {
2880                intent_by_trigger.insert(intent.trigger_id.clone(), intent.intent.clone());
2881            }
2882            _ => {}
2883        }
2884    }
2885
2886    // Apply Virtual Context Window constraints
2887    // Max 32 recent items (we can make this dynamic via policy later, hardcoded to 32 for now)
2888    let max_recent_items = 32;
2889    let (truncated_history, truncated) = if history.len() > max_recent_items {
2890        (&history[history.len() - max_recent_items..], true)
2891    } else {
2892        (history, false)
2893    };
2894
2895    if truncated {
2896        messages.push(ProviderMessage {
2897            role: ProviderRole::System,
2898            parts: vec![ProviderContentPart::Text(
2899                "Notice: Older session history has been paged out to Archival Memory to save space. \
2900                 Use the `MemorySearch` action to retrieve specific past interactions if you need them.".to_string(),
2901            )],
2902        });
2903    }
2904
2905    // Map history to messages to provide multi-turn memory
2906    for record in truncated_history {
2907        match record {
2908            SessionRecord::Trigger(t) => {
2909                let mut parts = build_trigger_parts(&t.trigger);
2910                if let Some(intent) = t
2911                    .intent
2912                    .as_ref()
2913                    .or_else(|| intent_by_trigger.get(&t.trigger_id))
2914                {
2915                    parts.push(ProviderContentPart::Text(format!(
2916                        "Classified intent: {intent}"
2917                    )));
2918                }
2919                messages.push(ProviderMessage {
2920                    role: ProviderRole::User,
2921                    parts,
2922                });
2923            }
2924            SessionRecord::Summary(s) => {
2925                messages.push(ProviderMessage {
2926                    role: ProviderRole::Assistant,
2927                    parts: vec![ProviderContentPart::Text(format!(
2928                        "Summary of prior history: {}",
2929                        s.content
2930                    ))],
2931                });
2932            }
2933            SessionRecord::ModelDecision(d) => match &d.action {
2934                AgentAction::Respond { content } => {
2935                    messages.push(ProviderMessage {
2936                        role: ProviderRole::Assistant,
2937                        parts: vec![ProviderContentPart::Text(content.clone())],
2938                    });
2939                }
2940                AgentAction::CallSkills(calls) => {
2941                    // Represent tool calls in history
2942                    messages.push(ProviderMessage {
2943                        role: ProviderRole::Assistant,
2944                        parts: vec![ProviderContentPart::Json(
2945                            serde_json::to_value(calls).unwrap_or_default(),
2946                        )],
2947                    });
2948                }
2949                _ => {}
2950            },
2951            SessionRecord::ToolResult(r) => {
2952                messages.push(ProviderMessage {
2953                    role: ProviderRole::Tool,
2954                    parts: vec![ProviderContentPart::ToolResult(r.clone())],
2955                });
2956            }
2957            _ => {}
2958        }
2959    }
2960
2961    // Add the current trigger that kicked off this step
2962    messages.push(ProviderMessage {
2963        role: ProviderRole::User,
2964        parts: build_trigger_parts(trigger),
2965    });
2966
2967    // Inject Chain of Thought active plan if one exists
2968    if let Some(plan) = active_plan {
2969        messages.push(ProviderMessage {
2970            role: ProviderRole::System,
2971            parts: vec![ProviderContentPart::Text(format!(
2972                "You are currently executing an active, multi-step plan.\n\nOBJECTIVE: {}\n\nYou are on step {} of {}.\n\nPlease stay focused on the objective and execute the necessary actions for this specific step.",
2973                plan.objective,
2974                plan.current_step_index + 1,
2975                plan.steps.len()
2976            ))],
2977        });
2978    }
2979
2980    messages
2981}
2982
2983fn build_trigger_parts(trigger: &AgentTrigger) -> Vec<ProviderContentPart> {
2984    let mut parts = Vec::new();
2985    match trigger {
2986        AgentTrigger::ExternalEvent {
2987            source,
2988            payload,
2989            attachments,
2990        } => {
2991            parts.push(ProviderContentPart::Text(format!(
2992                "external event source: {source}"
2993            )));
2994            parts.push(ProviderContentPart::Json(payload.clone()));
2995            parts.extend(
2996                attachments
2997                    .iter()
2998                    .cloned()
2999                    .map(ProviderContentPart::Attachment),
3000            );
3001        }
3002        AgentTrigger::ScheduledWake {
3003            wake_id,
3004            due_at,
3005            reason,
3006        } => {
3007            parts.push(ProviderContentPart::Text(format!(
3008                "scheduled wake {} due at {:?}: {reason}",
3009                wake_id.0, due_at
3010            )));
3011        }
3012        AgentTrigger::HumanInput {
3013            actor_id,
3014            content,
3015            attachments,
3016        } => {
3017            parts.push(ProviderContentPart::Text(format!(
3018                "human actor: {actor_id}"
3019            )));
3020            parts.push(ProviderContentPart::Text(content.clone()));
3021            parts.extend(
3022                attachments
3023                    .iter()
3024                    .cloned()
3025                    .map(ProviderContentPart::Attachment),
3026            );
3027        }
3028        AgentTrigger::SystemObservation {
3029            source,
3030            observation,
3031            attachments,
3032        } => {
3033            parts.push(ProviderContentPart::Text(format!(
3034                "system observation source: {source}"
3035            )));
3036            parts.push(ProviderContentPart::Json(observation.clone()));
3037            parts.extend(
3038                attachments
3039                    .iter()
3040                    .cloned()
3041                    .map(ProviderContentPart::Attachment),
3042            );
3043        }
3044        AgentTrigger::Webhook {
3045            source,
3046            payload,
3047            attachments,
3048        } => {
3049            parts.push(ProviderContentPart::Text(format!(
3050                "webhook source: {source}"
3051            )));
3052            parts.push(ProviderContentPart::Json(payload.clone()));
3053            parts.extend(
3054                attachments
3055                    .iter()
3056                    .cloned()
3057                    .map(ProviderContentPart::Attachment),
3058            );
3059        }
3060        AgentTrigger::RuleTrigger {
3061            rule_id,
3062            context,
3063            attachments,
3064        } => {
3065            parts.push(ProviderContentPart::Text(format!(
3066                "rule trigger: {rule_id}"
3067            )));
3068            parts.push(ProviderContentPart::Json(context.clone()));
3069            parts.extend(
3070                attachments
3071                    .iter()
3072                    .cloned()
3073                    .map(ProviderContentPart::Attachment),
3074            );
3075        }
3076        AgentTrigger::ProactiveHeartbeat {
3077            timestamp,
3078            attachments,
3079        } => {
3080            parts.push(ProviderContentPart::Text(format!(
3081                "heartbeat timestamp: {timestamp}"
3082            )));
3083            parts.extend(
3084                attachments
3085                    .iter()
3086                    .cloned()
3087                    .map(ProviderContentPart::Attachment),
3088            );
3089        }
3090        AgentTrigger::Message {
3091            user_id,
3092            content,
3093            attachments,
3094        } => {
3095            parts.push(ProviderContentPart::Text(format!("user_id: {user_id}")));
3096            parts.push(ProviderContentPart::Text(content.clone()));
3097            parts.extend(
3098                attachments
3099                    .iter()
3100                    .cloned()
3101                    .map(ProviderContentPart::Attachment),
3102            );
3103        }
3104        AgentTrigger::Approval {
3105            resume_token,
3106            decision,
3107            metadata,
3108        } => {
3109            parts.push(ProviderContentPart::Text(format!(
3110                "approval for resume token {}: {:?}",
3111                resume_token.as_str(),
3112                decision
3113            )));
3114            parts.push(ProviderContentPart::Json(metadata.clone()));
3115        }
3116        AgentTrigger::DelegationResult {
3117            correlation_id,
3118            payload,
3119            metadata,
3120        } => {
3121            parts.push(ProviderContentPart::Text(format!(
3122                "delegation result for correlation {}",
3123                correlation_id.as_str()
3124            )));
3125            parts.push(ProviderContentPart::Json(payload.clone()));
3126            parts.push(ProviderContentPart::Json(metadata.clone()));
3127        }
3128    }
3129    parts
3130}
3131
3132fn storage_failure_outcome(
3133    trigger_id: String,
3134    steps_executed: usize,
3135    detail: String,
3136) -> EngineOutcome {
3137    counter!("rain_engine.storage_failures_total").increment(1);
3138    EngineOutcome {
3139        trigger_id,
3140        stop_reason: StopReason::StorageFailure,
3141        response: None,
3142        detail: Some(detail),
3143        steps_executed,
3144        idempotent_replay: false,
3145        resume_token: None,
3146    }
3147}
3148
3149#[cfg(test)]
3150mod tests {
3151    use super::*;
3152    use crate::{
3153        AttachmentRef, EnginePolicy, InMemoryMemoryStore, MockLlmProvider, ProviderCacheRecord,
3154        ProviderDecision, ProviderError, ProviderErrorKind, ProviderUsageRecord, RecordPageQuery,
3155        ResourcePolicy, SessionListQuery, SessionSnapshot, SkillCapability, StopReason, TaskId,
3156        TaskRecord, TaskStatus, WakeId, WakeRequestRecord,
3157    };
3158    use serde_json::json;
3159    use std::sync::Mutex;
3160
3161    #[derive(Clone)]
3162    struct StubSkillExecutor {
3163        name: &'static str,
3164        responder: Arc<
3165            dyn Fn(SkillInvocation) -> Result<serde_json::Value, SkillExecutionError> + Send + Sync,
3166        >,
3167    }
3168
3169    #[async_trait]
3170    impl SkillExecutor for StubSkillExecutor {
3171        async fn execute(
3172            &self,
3173            invocation: SkillInvocation,
3174        ) -> Result<serde_json::Value, SkillExecutionError> {
3175            (self.responder)(invocation)
3176        }
3177
3178        fn executor_kind(&self) -> &'static str {
3179            self.name
3180        }
3181    }
3182
3183    #[derive(Clone)]
3184    struct StubNativeSkill {
3185        requires_approval: bool,
3186        responder: Arc<
3187            dyn Fn(SkillInvocation) -> Result<serde_json::Value, SkillExecutionError> + Send + Sync,
3188        >,
3189    }
3190
3191    #[async_trait]
3192    impl NativeSkill for StubNativeSkill {
3193        async fn execute(
3194            &self,
3195            invocation: SkillInvocation,
3196        ) -> Result<serde_json::Value, SkillExecutionError> {
3197            (self.responder)(invocation)
3198        }
3199
3200        fn requires_human_approval(&self) -> bool {
3201            self.requires_approval
3202        }
3203    }
3204
3205    fn manifest(name: &str, scopes: &[&str]) -> SkillManifest {
3206        SkillManifest {
3207            name: name.to_string(),
3208            description: format!("{name} description"),
3209            input_schema: json!({"type": "object"}),
3210            required_scopes: scopes.iter().map(|scope| scope.to_string()).collect(),
3211            capability_grants: vec![SkillCapability::StructuredLog],
3212            resource_policy: ResourcePolicy::default_for_tools(),
3213            approval_required: false,
3214            circuit_breaker_threshold: 0.5,
3215        }
3216    }
3217
3218    fn planned(call_id: &str, name: &str, args: serde_json::Value) -> PlannedSkillCall {
3219        PlannedSkillCall {
3220            call_id: call_id.to_string(),
3221            name: name.to_string(),
3222            args,
3223            priority: 0,
3224            depends_on: Vec::new(),
3225            retry_policy: Default::default(),
3226            dry_run: false,
3227        }
3228    }
3229
3230    fn message_trigger(content: &str) -> AgentTrigger {
3231        AgentTrigger::Message {
3232            user_id: "u1".to_string(),
3233            content: content.to_string(),
3234            attachments: Vec::new(),
3235        }
3236    }
3237
3238    async fn session(store: &Arc<InMemoryMemoryStore>, session_id: &str) -> SessionSnapshot {
3239        store
3240            .load_session(session_id)
3241            .await
3242            .expect("session snapshot")
3243    }
3244
3245    async fn run_until_terminal(
3246        engine: &AgentEngine,
3247        request: ProcessRequest,
3248    ) -> Result<EngineOutcome, EngineError> {
3249        let mut next = AdvanceRequest::Trigger(request.clone());
3250        loop {
3251            let result = engine.advance(next).await?;
3252            if let Some(outcome) = result.outcome {
3253                return Ok(outcome);
3254            }
3255            next = AdvanceRequest::Continue(ContinueRequest {
3256                session_id: request.session_id.clone(),
3257                granted_scopes: request.granted_scopes.clone(),
3258                policy: request.policy.clone(),
3259                provider: request.provider.clone(),
3260                cancellation: request.cancellation.clone(),
3261            });
3262        }
3263    }
3264
3265    #[tokio::test]
3266    async fn webhook_trigger_with_attachment_responds() {
3267        let store = Arc::new(InMemoryMemoryStore::new());
3268        let llm = Arc::new(MockLlmProvider::scripted(vec![AgentAction::Respond {
3269            content: "done".to_string(),
3270        }]));
3271        let engine = AgentEngine::new(llm, store.clone());
3272
3273        let outcome = run_until_terminal(
3274            &engine,
3275            ProcessRequest::new(
3276                "session-1",
3277                AgentTrigger::Webhook {
3278                    source: "github".to_string(),
3279                    payload: json!({"issue": 42}),
3280                    attachments: vec![AttachmentRef::inline(
3281                        "att-1",
3282                        "image/png",
3283                        Some("schema.png".to_string()),
3284                        vec![1, 2, 3],
3285                    )],
3286                },
3287            ),
3288        )
3289        .await
3290        .expect("outcome");
3291
3292        assert_eq!(outcome.stop_reason, StopReason::Responded);
3293        let snapshot = session(&store, "session-1").await;
3294        assert!(matches!(
3295            snapshot.records.first(),
3296            Some(SessionRecord::Trigger(_))
3297        ));
3298    }
3299
3300    #[tokio::test]
3301    async fn advance_executes_one_progression_step() {
3302        let store = Arc::new(InMemoryMemoryStore::new());
3303        let llm = Arc::new(MockLlmProvider::scripted(vec![
3304            AgentAction::CallSkills(vec![PlannedSkillCall {
3305                call_id: "call-1".to_string(),
3306                name: "echo".to_string(),
3307                args: json!({"value": 1}),
3308                priority: 0,
3309                depends_on: Vec::new(),
3310                retry_policy: Default::default(),
3311                dry_run: false,
3312            }]),
3313            AgentAction::Respond {
3314                content: "done".to_string(),
3315            },
3316        ]));
3317        let engine = AgentEngine::new(llm, store.clone());
3318        engine.register_wasm_skill(
3319            manifest("echo", &["tool:run"]),
3320            Arc::new(StubSkillExecutor {
3321                name: "stub",
3322                responder: Arc::new(|invocation| Ok(json!({"echo": invocation.args}))),
3323            }),
3324        );
3325        let request =
3326            ProcessRequest::new("step-session", message_trigger("run")).with_scope("tool:run");
3327
3328        let first = engine
3329            .advance(AdvanceRequest::Trigger(request.clone()))
3330            .await
3331            .expect("first advance");
3332        assert!(first.outcome.is_none());
3333        assert_eq!(first.emitted_events.len(), 1);
3334
3335        let second = engine
3336            .advance(AdvanceRequest::Continue(ContinueRequest {
3337                session_id: request.session_id.clone(),
3338                granted_scopes: request.granted_scopes.clone(),
3339                policy: request.policy.clone(),
3340                provider: request.provider.clone(),
3341                cancellation: request.cancellation.clone(),
3342            }))
3343            .await
3344            .expect("second advance");
3345        assert_eq!(
3346            second.outcome.expect("terminal").stop_reason,
3347            StopReason::Responded
3348        );
3349        assert_eq!(
3350            store
3351                .load_session("step-session")
3352                .await
3353                .unwrap()
3354                .tool_results()
3355                .len(),
3356            1
3357        );
3358    }
3359
3360    #[tokio::test]
3361    async fn replay_projects_task_transitions() {
3362        let store = Arc::new(InMemoryMemoryStore::new());
3363        let now = SystemTime::now();
3364        let task_id = TaskId("task-1".to_string());
3365        store
3366            .append_kernel_event(
3367                "projection-session",
3368                KernelEventRecord {
3369                    event_id: "task-planned".to_string(),
3370                    occurred_at: now,
3371                    event: KernelEvent::TaskPlanned(TaskRecord {
3372                        task_id: task_id.clone(),
3373                        goal_id: None,
3374                        parent_task_id: None,
3375                        created_at: now,
3376                        title: "triage".to_string(),
3377                        detail: None,
3378                        status: TaskStatus::Ready,
3379                        assignee: None,
3380                        blocked_by: Vec::new(),
3381                    }),
3382                },
3383            )
3384            .await
3385            .expect("planned");
3386        store
3387            .append_kernel_event(
3388                "projection-session",
3389                KernelEventRecord {
3390                    event_id: "task-done".to_string(),
3391                    occurred_at: now,
3392                    event: KernelEvent::TaskCompleted {
3393                        task_id: task_id.clone(),
3394                        completed_at: now,
3395                        artifact_ids: Vec::new(),
3396                    },
3397                },
3398            )
3399            .await
3400            .expect("completed");
3401
3402        let state = store
3403            .load_session("projection-session")
3404            .await
3405            .expect("snapshot")
3406            .agent_state();
3407        assert_eq!(state.tasks[0].status, TaskStatus::Done);
3408    }
3409
3410    #[tokio::test]
3411    async fn scheduled_wake_trigger_completes_pending_wake() {
3412        let store = Arc::new(InMemoryMemoryStore::new());
3413        let now = SystemTime::now();
3414        let wake_id = WakeId("wake-1".to_string());
3415        store
3416            .append_kernel_event(
3417                "wake-session",
3418                KernelEventRecord {
3419                    event_id: "wake-scheduled".to_string(),
3420                    occurred_at: now,
3421                    event: KernelEvent::WakeScheduled(WakeRequestRecord {
3422                        wake_id: wake_id.clone(),
3423                        requested_at: now,
3424                        due_at: now,
3425                        reason: "check later".to_string(),
3426                        task_id: None,
3427                    }),
3428                },
3429            )
3430            .await
3431            .expect("scheduled");
3432        assert!(
3433            store
3434                .load_session("wake-session")
3435                .await
3436                .expect("snapshot")
3437                .agent_state()
3438                .pending_wake
3439                .is_some()
3440        );
3441
3442        let llm = Arc::new(MockLlmProvider::scripted(vec![AgentAction::Yield {
3443            reason: Some("wake handled".to_string()),
3444        }]));
3445        let engine = AgentEngine::new(llm, store.clone());
3446        let outcome = run_until_terminal(
3447            &engine,
3448            ProcessRequest::new(
3449                "wake-session",
3450                AgentTrigger::ScheduledWake {
3451                    wake_id: wake_id.clone(),
3452                    due_at: now,
3453                    reason: "check later".to_string(),
3454                },
3455            ),
3456        )
3457        .await
3458        .expect("outcome");
3459        assert_eq!(outcome.stop_reason, StopReason::Yielded);
3460        let snapshot = store.load_session("wake-session").await.expect("snapshot");
3461        assert!(snapshot.agent_state().pending_wake.is_none());
3462        assert!(snapshot.records.iter().any(|record| matches!(
3463            record,
3464            SessionRecord::KernelEvent(KernelEventRecord {
3465                event: KernelEvent::WakeCompleted { wake_id: completed, .. },
3466                ..
3467            }) if completed == &wake_id
3468        )));
3469    }
3470
3471    #[tokio::test]
3472    async fn duplicate_idempotency_key_reuses_prior_outcome() {
3473        let store = Arc::new(InMemoryMemoryStore::new());
3474        let llm = Arc::new(MockLlmProvider::scripted(vec![AgentAction::Respond {
3475            content: "first".to_string(),
3476        }]));
3477        let engine = AgentEngine::new(llm.clone(), store.clone());
3478        let request = ProcessRequest::new(
3479            "idempotent-session",
3480            AgentTrigger::Webhook {
3481                source: "github".to_string(),
3482                payload: json!({"action": "sync"}),
3483                attachments: Vec::new(),
3484            },
3485        )
3486        .with_idempotency_key("abc");
3487        let first = run_until_terminal(&engine, request.clone())
3488            .await
3489            .expect("first");
3490        let second = run_until_terminal(&engine, request).await.expect("second");
3491        assert_eq!(first.response, second.response);
3492        assert!(second.idempotent_replay);
3493        assert_eq!(llm.observed_inputs().len(), 1);
3494    }
3495
3496    #[tokio::test]
3497    async fn parallel_tool_calls_execute_and_aggregate() {
3498        let store = Arc::new(InMemoryMemoryStore::new());
3499        let llm = Arc::new(MockLlmProvider::dynamic(|input| {
3500            if input.context.prior_tool_results.is_empty() {
3501                Ok(ProviderDecision {
3502                    action: AgentAction::CallSkills(vec![
3503                        PlannedSkillCall {
3504                            call_id: "call-1".to_string(),
3505                            name: "first".to_string(),
3506                            args: json!({"value": 1}),
3507                            priority: 0,
3508                            depends_on: Vec::new(),
3509                            retry_policy: Default::default(),
3510                            dry_run: false,
3511                        },
3512                        PlannedSkillCall {
3513                            call_id: "call-2".to_string(),
3514                            name: "second".to_string(),
3515                            args: json!({"value": 2}),
3516                            priority: 0,
3517                            depends_on: Vec::new(),
3518                            retry_policy: Default::default(),
3519                            dry_run: false,
3520                        },
3521                    ]),
3522                    usage: None,
3523                    cache: None,
3524                })
3525            } else {
3526                Ok(ProviderDecision {
3527                    action: AgentAction::Respond {
3528                        content: "complete".to_string(),
3529                    },
3530                    usage: None,
3531                    cache: None,
3532                })
3533            }
3534        }));
3535        let engine = AgentEngine::new(llm, store.clone());
3536        let order = Arc::new(Mutex::new(Vec::<String>::new()));
3537
3538        for skill_name in ["first", "second"] {
3539            let local = order.clone();
3540            engine.register_wasm_skill(
3541                manifest(skill_name, &["tool:run"]),
3542                Arc::new(StubSkillExecutor {
3543                    name: "stub",
3544                    responder: Arc::new(move |invocation| {
3545                        local
3546                            .lock()
3547                            .expect("order lock")
3548                            .push(invocation.call_id.clone());
3549                        Ok(json!({"echo": invocation.args}))
3550                    }),
3551                }),
3552            );
3553        }
3554
3555        let outcome = run_until_terminal(
3556            &engine,
3557            ProcessRequest::new("session-2", message_trigger("run")).with_scope("tool:run"),
3558        )
3559        .await
3560        .expect("outcome");
3561
3562        assert_eq!(outcome.stop_reason, StopReason::Responded);
3563        let snapshot = session(&store, "session-2").await;
3564        let tool_results = snapshot.tool_results();
3565        assert_eq!(tool_results.len(), 2);
3566        assert_eq!(order.lock().expect("order lock").len(), 2);
3567    }
3568
3569    #[tokio::test]
3570    async fn provider_metadata_records_are_persisted() {
3571        let store = Arc::new(InMemoryMemoryStore::new());
3572        let llm = Arc::new(MockLlmProvider::dynamic(|_| {
3573            Ok(ProviderDecision {
3574                action: AgentAction::Yield {
3575                    reason: Some("done".to_string()),
3576                },
3577                usage: Some(ProviderUsageRecord {
3578                    provider_name: "gemini".to_string(),
3579                    recorded_at: SystemTime::now(),
3580                    input_tokens: 100,
3581                    output_tokens: 20,
3582                    estimated_cost_usd: 0.25,
3583                    cached_content_id: Some("cache-1".to_string()),
3584                }),
3585                cache: Some(ProviderCacheRecord {
3586                    provider_name: "gemini".to_string(),
3587                    cached_content_id: "cache-1".to_string(),
3588                    token_count: 45_000,
3589                    cached_at: SystemTime::now(),
3590                }),
3591            })
3592        }));
3593        let engine = AgentEngine::new(llm, store.clone());
3594
3595        let outcome = run_until_terminal(
3596            &engine,
3597            ProcessRequest::new("session-usage", message_trigger("hi")),
3598        )
3599        .await
3600        .expect("outcome");
3601        assert_eq!(outcome.stop_reason, StopReason::Yielded);
3602
3603        let snapshot = session(&store, "session-usage").await;
3604        assert!(
3605            snapshot
3606                .records
3607                .iter()
3608                .any(|record| matches!(record, SessionRecord::ProviderUsage(_)))
3609        );
3610        assert!(
3611            snapshot
3612                .records
3613                .iter()
3614                .any(|record| matches!(record, SessionRecord::ProviderCache(_)))
3615        );
3616    }
3617
3618    #[tokio::test]
3619    async fn approval_trigger_resumes_native_skill() {
3620        let store = Arc::new(InMemoryMemoryStore::new());
3621        let llm = Arc::new(MockLlmProvider::dynamic(|input| {
3622            if input
3623                .context
3624                .history
3625                .iter()
3626                .any(|record| matches!(record, SessionRecord::ToolResult(_)))
3627            {
3628                Ok(ProviderDecision {
3629                    action: AgentAction::Respond {
3630                        content: "approved-run".to_string(),
3631                    },
3632                    usage: None,
3633                    cache: None,
3634                })
3635            } else {
3636                Ok(ProviderDecision {
3637                    action: AgentAction::CallSkills(vec![PlannedSkillCall {
3638                        call_id: "native-1".to_string(),
3639                        name: "db_fix".to_string(),
3640                        args: json!({"apply": true}),
3641                        priority: 0,
3642                        depends_on: Vec::new(),
3643                        retry_policy: Default::default(),
3644                        dry_run: false,
3645                    }]),
3646                    usage: None,
3647                    cache: None,
3648                })
3649            }
3650        }));
3651        let engine = AgentEngine::new(llm, store.clone());
3652        engine.register_native_skill(
3653            SkillManifest {
3654                approval_required: true,
3655                ..manifest("db_fix", &["db:write"])
3656            },
3657            Arc::new(StubNativeSkill {
3658                requires_approval: true,
3659                responder: Arc::new(|_| Ok(json!({"status": "fixed"}))),
3660            }),
3661        );
3662
3663        let suspended = run_until_terminal(
3664            &engine,
3665            ProcessRequest::new("approval-session", message_trigger("fix")).with_scope("db:write"),
3666        )
3667        .await
3668        .expect("suspended");
3669        assert_eq!(suspended.stop_reason, StopReason::Suspended);
3670        let token = suspended.resume_token.expect("resume token");
3671
3672        let resumed = run_until_terminal(
3673            &engine,
3674            ProcessRequest::new(
3675                "approval-session",
3676                AgentTrigger::Approval {
3677                    resume_token: token,
3678                    decision: ApprovalDecision::Approved,
3679                    metadata: json!({"approved_by": "human"}),
3680                },
3681            ),
3682        )
3683        .await
3684        .expect("resumed");
3685        assert_eq!(resumed.stop_reason, StopReason::Responded);
3686        assert_eq!(resumed.response.as_deref(), Some("approved-run"));
3687    }
3688
3689    #[tokio::test]
3690    async fn provider_failure_stops_with_explicit_reason() {
3691        let store = Arc::new(InMemoryMemoryStore::new());
3692        let llm = Arc::new(MockLlmProvider::dynamic(|_| {
3693            Err(ProviderError::new(
3694                ProviderErrorKind::Transport,
3695                "upstream unavailable",
3696                true,
3697            ))
3698        }));
3699        let engine = AgentEngine::new(llm, store);
3700
3701        let outcome = run_until_terminal(
3702            &engine,
3703            ProcessRequest::new("provider-failure", message_trigger("hello")),
3704        )
3705        .await
3706        .expect("outcome");
3707
3708        assert_eq!(outcome.stop_reason, StopReason::ProviderFailure);
3709    }
3710
3711    #[tokio::test]
3712    async fn self_improvement_applies_bounded_max_step_overlay() {
3713        let store = Arc::new(InMemoryMemoryStore::new());
3714        let llm = Arc::new(MockLlmProvider::scripted(vec![AgentAction::Respond {
3715            content: "unused".to_string(),
3716        }]));
3717        let engine = AgentEngine::new(llm, store.clone());
3718        let mut policy = EnginePolicy {
3719            max_steps: 0,
3720            ..EnginePolicy::default()
3721        };
3722        policy.self_improvement.min_observations_before_tuning = 1;
3723        policy.self_improvement.max_policy_delta_percent = 50.0;
3724
3725        let outcome = run_until_terminal(
3726            &engine,
3727            ProcessRequest::new("learning-max-steps", message_trigger("continue"))
3728                .with_policy(policy),
3729        )
3730        .await
3731        .expect("outcome");
3732
3733        assert_eq!(outcome.stop_reason, StopReason::MaxStepsReached);
3734        let snapshot = session(&store, "learning-max-steps").await;
3735        let overlay = snapshot.active_policy_overlay().expect("active overlay");
3736        assert_eq!(overlay.patch.max_steps, Some(2));
3737        assert!(
3738            snapshot
3739                .records
3740                .iter()
3741                .any(|record| matches!(record, SessionRecord::Reflection(_)))
3742        );
3743        assert!(
3744            snapshot
3745                .records
3746                .iter()
3747                .any(|record| matches!(record, SessionRecord::PolicyTuning(_)))
3748        );
3749    }
3750
3751    #[tokio::test]
3752    async fn self_improvement_rolls_back_overlay_after_regression() {
3753        let store = Arc::new(InMemoryMemoryStore::new());
3754        let llm = Arc::new(MockLlmProvider::dynamic(|_| {
3755            Err(ProviderError::new(
3756                ProviderErrorKind::Transport,
3757                "upstream unavailable",
3758                true,
3759            ))
3760        }));
3761        let engine = AgentEngine::new(llm, store.clone());
3762        let mut policy = EnginePolicy {
3763            max_steps: 0,
3764            ..EnginePolicy::default()
3765        };
3766        policy.self_improvement.min_observations_before_tuning = 1;
3767
3768        let first = run_until_terminal(
3769            &engine,
3770            ProcessRequest::new("learning-rollback", message_trigger("first"))
3771                .with_policy(policy.clone()),
3772        )
3773        .await
3774        .expect("first");
3775        assert_eq!(first.stop_reason, StopReason::MaxStepsReached);
3776        assert!(
3777            session(&store, "learning-rollback")
3778                .await
3779                .active_policy_overlay()
3780                .is_some()
3781        );
3782
3783        let second = run_until_terminal(
3784            &engine,
3785            ProcessRequest::new("learning-rollback", message_trigger("second")).with_policy(policy),
3786        )
3787        .await
3788        .expect("second");
3789        assert_eq!(second.stop_reason, StopReason::ProviderFailure);
3790
3791        let snapshot = session(&store, "learning-rollback").await;
3792        assert!(snapshot.active_policy_overlay().is_none());
3793        assert!(snapshot.records.iter().any(|record| matches!(
3794            record,
3795            SessionRecord::PolicyTuning(tuning)
3796                if tuning.action == PolicyTuningAction::RolledBack
3797        )));
3798    }
3799
3800    #[tokio::test]
3801    async fn self_improvement_records_tool_performance_and_strategy_preferences() {
3802        let store = Arc::new(InMemoryMemoryStore::new());
3803        let llm = Arc::new(MockLlmProvider::dynamic(|input| {
3804            if input.context.prior_tool_results.is_empty() {
3805                Ok(ProviderDecision {
3806                    action: AgentAction::CallSkills(vec![PlannedSkillCall {
3807                        call_id: "unstable-call".to_string(),
3808                        name: "unstable".to_string(),
3809                        args: json!({}),
3810                        priority: 0,
3811                        depends_on: Vec::new(),
3812                        retry_policy: Default::default(),
3813                        dry_run: false,
3814                    }]),
3815                    usage: None,
3816                    cache: None,
3817                })
3818            } else {
3819                Ok(ProviderDecision {
3820                    action: AgentAction::Respond {
3821                        content: "finished".to_string(),
3822                    },
3823                    usage: None,
3824                    cache: None,
3825                })
3826            }
3827        }));
3828        let engine = AgentEngine::new(llm, store.clone());
3829        engine.register_wasm_skill(
3830            manifest("unstable", &["tool:run"]),
3831            Arc::new(StubSkillExecutor {
3832                name: "unstable",
3833                responder: Arc::new(|_| {
3834                    Err(SkillExecutionError::new(
3835                        SkillFailureKind::Internal,
3836                        "simulated failure",
3837                    ))
3838                }),
3839            }),
3840        );
3841
3842        let mut policy = EnginePolicy::default();
3843        policy.self_improvement.min_observations_before_tuning = 1;
3844
3845        let outcome = run_until_terminal(
3846            &engine,
3847            ProcessRequest::new("learning-tools", message_trigger("run"))
3848                .with_scope("tool:run")
3849                .with_policy(policy),
3850        )
3851        .await
3852        .expect("outcome");
3853        assert_eq!(outcome.stop_reason, StopReason::Responded);
3854
3855        let snapshot = session(&store, "learning-tools").await;
3856        assert!(snapshot.records.iter().any(|record| matches!(
3857            record,
3858            SessionRecord::ToolPerformance(performance)
3859                if performance.skill_name == "unstable" && performance.failures >= 1
3860        )));
3861        assert!(snapshot.records.iter().any(|record| matches!(
3862            record,
3863            SessionRecord::StrategyPreference(preference)
3864                if preference.skill_name.as_deref() == Some("unstable")
3865        )));
3866    }
3867
3868    #[tokio::test]
3869    async fn list_sessions_and_record_pages_work() {
3870        let store = Arc::new(InMemoryMemoryStore::new());
3871        let llm = Arc::new(MockLlmProvider::scripted(vec![AgentAction::Respond {
3872            content: "ok".to_string(),
3873        }]));
3874        let engine = AgentEngine::new(llm, store.clone());
3875        for session_id in ["a", "b"] {
3876            run_until_terminal(
3877                &engine,
3878                ProcessRequest::new(session_id, message_trigger("x")),
3879            )
3880            .await
3881            .expect("outcome");
3882        }
3883
3884        let sessions = store
3885            .list_sessions(SessionListQuery::default())
3886            .await
3887            .expect("sessions");
3888        assert_eq!(sessions.len(), 2);
3889        let page = store
3890            .list_records(RecordPageQuery::new("a"))
3891            .await
3892            .expect("page");
3893        assert!(!page.records.is_empty());
3894    }
3895
3896    #[tokio::test]
3897    async fn invalid_tool_arguments_are_persisted_without_executor_call() {
3898        let store = Arc::new(InMemoryMemoryStore::new());
3899        let llm = Arc::new(MockLlmProvider::dynamic(|input| {
3900            if input.context.prior_tool_results.is_empty() {
3901                Ok(ProviderDecision {
3902                    action: AgentAction::CallSkills(vec![planned(
3903                        "bad-args",
3904                        "typed_tool",
3905                        json!({"value": 1}),
3906                    )]),
3907                    usage: None,
3908                    cache: None,
3909                })
3910            } else {
3911                Ok(ProviderDecision {
3912                    action: AgentAction::Respond {
3913                        content: "validated".to_string(),
3914                    },
3915                    usage: None,
3916                    cache: None,
3917                })
3918            }
3919        }));
3920        let engine = AgentEngine::new(llm, store.clone());
3921        let calls = Arc::new(Mutex::new(0usize));
3922        let calls_for_executor = calls.clone();
3923        engine.register_wasm_skill(
3924            SkillManifest {
3925                input_schema: json!({
3926                    "type": "object",
3927                    "required": ["value"],
3928                    "properties": {"value": {"type": "string"}}
3929                }),
3930                ..manifest("typed_tool", &["tool:run"])
3931            },
3932            Arc::new(StubSkillExecutor {
3933                name: "typed",
3934                responder: Arc::new(move |_| {
3935                    *calls_for_executor.lock().expect("lock") += 1;
3936                    Ok(json!({}))
3937                }),
3938            }),
3939        );
3940
3941        let outcome = run_until_terminal(
3942            &engine,
3943            ProcessRequest::new("schema-session", message_trigger("run")).with_scope("tool:run"),
3944        )
3945        .await
3946        .expect("outcome");
3947
3948        assert_eq!(outcome.stop_reason, StopReason::Responded);
3949        assert_eq!(*calls.lock().expect("lock"), 0);
3950        let snapshot = session(&store, "schema-session").await;
3951        assert!(snapshot.records.iter().any(|record| matches!(
3952            record,
3953            SessionRecord::SkillInputValidation(validation)
3954                if !validation.valid && validation.call_id == "bad-args"
3955        )));
3956        assert!(snapshot.records.iter().any(|record| matches!(
3957            record,
3958            SessionRecord::ToolResult(result)
3959                if result.call_id == "bad-args"
3960                    && matches!(
3961                        result.output,
3962                        Err(SkillFailure {
3963                            kind: SkillFailureKind::InvalidArguments,
3964                            ..
3965                        })
3966                    )
3967        )));
3968    }
3969
3970    #[tokio::test]
3971    async fn checkpointed_graph_resume_executes_only_unfinished_nodes() {
3972        let store = Arc::new(InMemoryMemoryStore::new());
3973        let llm = Arc::new(MockLlmProvider::scripted(vec![AgentAction::Respond {
3974            content: "done".to_string(),
3975        }]));
3976        let engine = AgentEngine::new(llm, store.clone());
3977        let c1_calls = Arc::new(Mutex::new(0usize));
3978        let c2_calls = Arc::new(Mutex::new(0usize));
3979        let c1_counter = c1_calls.clone();
3980        let c2_counter = c2_calls.clone();
3981        engine.register_wasm_skill(
3982            manifest("first", &["tool:run"]),
3983            Arc::new(StubSkillExecutor {
3984                name: "first",
3985                responder: Arc::new(move |_| {
3986                    *c1_counter.lock().expect("lock") += 1;
3987                    Ok(json!({"first": true}))
3988                }),
3989            }),
3990        );
3991        engine.register_wasm_skill(
3992            manifest("second", &["tool:run"]),
3993            Arc::new(StubSkillExecutor {
3994                name: "second",
3995                responder: Arc::new(move |_| {
3996                    *c2_counter.lock().expect("lock") += 1;
3997                    Ok(json!({"second": true}))
3998                }),
3999            }),
4000        );
4001
4002        let trigger_id = "trigger-checkpoint".to_string();
4003        let session_id = "checkpoint-session";
4004        let trigger = TriggerRecord {
4005            trigger_id: trigger_id.clone(),
4006            session_id: session_id.to_string(),
4007            idempotency_key: None,
4008            recorded_at: SystemTime::now(),
4009            trigger: message_trigger("resume"),
4010            intent: None,
4011        };
4012        store.append_trigger(trigger).await.expect("trigger");
4013        let calls = vec![
4014            planned("call-1", "first", json!({})),
4015            planned("call-2", "second", json!({})),
4016        ];
4017        store
4018            .append_model_decision(
4019                session_id,
4020                ModelDecisionRecord {
4021                    step: 0,
4022                    decided_at: SystemTime::now(),
4023                    action: AgentAction::CallSkills(calls.clone()),
4024                },
4025            )
4026            .await
4027            .expect("decision");
4028        let graph = existing_or_new_graph(
4029            &AgentContext {
4030                session_id: session_id.to_string(),
4031                records: Vec::new(),
4032                prior_tool_results: Vec::new(),
4033                granted_scopes: ["tool:run".to_string()].into_iter().collect(),
4034                metadata: ExecutionMetadata {
4035                    trigger_id: trigger_id.clone(),
4036                    idempotency_key: None,
4037                    started_at: SystemTime::now(),
4038                    deadline: Instant::now() + EnginePolicy::default().max_execution_time(),
4039                    policy: EnginePolicy::default(),
4040                    provider: Default::default(),
4041                    cancellation: Default::default(),
4042                },
4043            },
4044            0,
4045            &calls,
4046        );
4047        store
4048            .append_tool_execution_graph(session_id, graph.clone())
4049            .await
4050            .expect("graph");
4051        let first_node = graph.nodes.first().expect("first node");
4052        store
4053            .append_tool_node_checkpoint(
4054                session_id,
4055                ToolNodeCheckpointRecord {
4056                    checkpoint_id: "cp-start".to_string(),
4057                    graph_id: graph.graph_id.clone(),
4058                    call_id: first_node.call_id.clone(),
4059                    skill_name: first_node.skill_name.clone(),
4060                    step: graph.step,
4061                    status: ToolNodeStatus::Started,
4062                    attempt: 1,
4063                    occurred_at: SystemTime::now(),
4064                    detail: None,
4065                },
4066            )
4067            .await
4068            .expect("started");
4069        store
4070            .append_tool_node_checkpoint(
4071                session_id,
4072                ToolNodeCheckpointRecord {
4073                    checkpoint_id: "cp-ok".to_string(),
4074                    graph_id: graph.graph_id.clone(),
4075                    call_id: first_node.call_id.clone(),
4076                    skill_name: first_node.skill_name.clone(),
4077                    step: graph.step,
4078                    status: ToolNodeStatus::Succeeded,
4079                    attempt: 1,
4080                    occurred_at: SystemTime::now(),
4081                    detail: None,
4082                },
4083            )
4084            .await
4085            .expect("succeeded");
4086        store
4087            .append_tool_result(
4088                session_id,
4089                ToolResultRecord {
4090                    call_id: "call-1".to_string(),
4091                    finished_at: SystemTime::now(),
4092                    skill_name: "first".to_string(),
4093                    output: Ok(json!({"already": "done"})),
4094                },
4095            )
4096            .await
4097            .expect("result");
4098
4099        let result = engine
4100            .advance(AdvanceRequest::Continue(
4101                ContinueRequest::new(session_id).with_scope("tool:run"),
4102            ))
4103            .await
4104            .expect("advance");
4105
4106        // The engine now returns an outcome because we altered it to fall through to `perform_single_step`
4107        // which makes the LLM run again, returning `Responded`.
4108        assert!(result.outcome.is_some());
4109        assert_eq!(result.outcome.unwrap().stop_reason, StopReason::Responded);
4110        assert_eq!(*c1_calls.lock().expect("lock"), 0);
4111        assert_eq!(*c2_calls.lock().expect("lock"), 1);
4112        let snapshot = session(&store, session_id).await;
4113        assert!(snapshot.records.iter().any(|record| matches!(
4114            record,
4115            SessionRecord::ToolResult(result) if result.call_id == "call-2"
4116        )));
4117    }
4118
4119    #[tokio::test]
4120    async fn priority_ordering_runs_high_priority_first_when_serialized() {
4121        let store = Arc::new(InMemoryMemoryStore::new());
4122        let llm = Arc::new(MockLlmProvider::dynamic(|input| {
4123            if input.context.prior_tool_results.is_empty() {
4124                let mut low = planned("low", "low", json!({}));
4125                low.priority = 0;
4126                let mut high = planned("high", "high", json!({}));
4127                high.priority = 10;
4128                Ok(ProviderDecision {
4129                    action: AgentAction::CallSkills(vec![low, high]),
4130                    usage: None,
4131                    cache: None,
4132                })
4133            } else {
4134                Ok(ProviderDecision {
4135                    action: AgentAction::Respond {
4136                        content: "ordered".to_string(),
4137                    },
4138                    usage: None,
4139                    cache: None,
4140                })
4141            }
4142        }));
4143        let engine = AgentEngine::new(llm, store.clone());
4144        let order = Arc::new(Mutex::new(Vec::<String>::new()));
4145        for skill_name in ["low", "high"] {
4146            let order = order.clone();
4147            engine.register_wasm_skill(
4148                manifest(skill_name, &["tool:run"]),
4149                Arc::new(StubSkillExecutor {
4150                    name: "ordered",
4151                    responder: Arc::new(move |invocation| {
4152                        order.lock().expect("lock").push(invocation.call_id);
4153                        Ok(json!({}))
4154                    }),
4155                }),
4156            );
4157        }
4158
4159        let mut policy = EnginePolicy {
4160            max_parallel_skill_calls: 1,
4161            ..EnginePolicy::default()
4162        };
4163        policy.self_improvement.enabled = false;
4164        run_until_terminal(
4165            &engine,
4166            ProcessRequest::new("priority-session", message_trigger("run"))
4167                .with_scope("tool:run")
4168                .with_policy(policy),
4169        )
4170        .await
4171        .expect("outcome");
4172
4173        assert_eq!(&*order.lock().expect("lock"), &["high", "low"]);
4174    }
4175
4176    #[tokio::test]
4177    async fn plan_action_persists_deliberation_record() {
4178        let store = Arc::new(InMemoryMemoryStore::new());
4179        let llm = Arc::new(MockLlmProvider::scripted(vec![
4180            AgentAction::Plan {
4181                summary: "inspect then act".to_string(),
4182                candidate_actions: vec!["search memory".to_string(), "call tool".to_string()],
4183                confidence: 0.82,
4184            },
4185            AgentAction::Respond {
4186                content: "planned".to_string(),
4187            },
4188        ]));
4189        let engine = AgentEngine::new(llm, store.clone());
4190
4191        let outcome = run_until_terminal(
4192            &engine,
4193            ProcessRequest::new("plan-session", message_trigger("think")),
4194        )
4195        .await
4196        .expect("outcome");
4197
4198        assert_eq!(outcome.stop_reason, StopReason::Responded);
4199        let snapshot = session(&store, "plan-session").await;
4200        assert!(snapshot.records.iter().any(|record| matches!(
4201            record,
4202            SessionRecord::Deliberation(deliberation)
4203                if deliberation.summary == "inspect then act"
4204                    && deliberation.outcome == DeliberationOutcome::ReadyToAct
4205        )));
4206    }
4207}