Skip to main content

ranvier_runtime/
axon.rs

1//! # Axon: Executable Decision Tree
2//!
3//! The `Axon` is the **runtime execution path** of a Typed Decision Tree.
4//! It functions as a reusable Typed Decision flow (Axon<In, Out, E>).
5//!
6//! ## Design Philosophy
7//!
8//! * **Axon flows, Schematic shows**: Axon executes; Schematic describes
9//! * **Builder pattern**: `Axon::start().then().then()`
10//! * **Schematic extraction**: Every Axon carries its structural metadata
11//!
12//! "Axon is the flowing thing, Schematic is the visible thing."
13
14use crate::persistence::{
15    CompensationAutoTrigger, CompensationContext, CompensationHandle,
16    CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17    PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use async_trait::async_trait;
20use ranvier_audit::{AuditEvent, AuditSink};
21use ranvier_core::bus::Bus;
22use ranvier_core::cluster::DistributedLock;
23use ranvier_core::event::{DlqPolicy, DlqSink};
24use ranvier_core::outcome::Outcome;
25use ranvier_core::policy::DynamicPolicy;
26use ranvier_core::saga::{SagaPolicy, SagaStack};
27use ranvier_core::schematic::{
28    BusCapabilitySchema, Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation,
29};
30use ranvier_core::timeline::{Timeline, TimelineEvent};
31use ranvier_core::transition::Transition;
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use std::any::type_name;
35use std::ffi::OsString;
36use std::fs;
37use std::future::Future;
38use std::panic::Location;
39use std::path::{Path, PathBuf};
40use std::pin::Pin;
41use std::sync::{Arc, Mutex, OnceLock};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tracing::Instrument;
44
45/// Configuration for Execution Mode.
46#[derive(Clone)]
47pub enum ExecutionMode {
48    /// Normal, unpartitioned local execution.
49    Local,
50    /// Singleton execution, ensures only one instance runs across the entire cluster.
51    Singleton {
52        lock_key: String,
53        ttl_ms: u64,
54        lock_provider: Arc<dyn DistributedLock>,
55    },
56}
57
58/// Type alias for async boxed futures used in Axon execution.
59pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
60
61/// Executor type for Axon steps.
62/// Now takes an input state `In`, a resource bundle `Res`, and returns an `Outcome<Out, E>`.
63/// Must be Send + Sync to be reusable across threads and clones.
64pub type Executor<In, Out, E, Res> =
65    Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
66
67/// Manual intervention jump command injected into the Bus.
68#[derive(Debug, Clone)]
69pub struct ManualJump {
70    pub target_node: String,
71    pub payload_override: Option<serde_json::Value>,
72}
73
74/// Start step index for resumption, injected into the Bus.
75#[derive(Debug, Clone, Copy)]
76struct StartStep(u64);
77
78/// Persisted state for resumption, injected into the Bus.
79#[derive(Debug, Clone)]
80struct ResumptionState {
81    payload: Option<serde_json::Value>,
82}
83
84/// Helper to extract a readable type name from a type.
85fn type_name_of<T: ?Sized>() -> String {
86    let full = type_name::<T>();
87    full.split("::").last().unwrap_or(full).to_string()
88}
89
90/// The Axon Builder and Runtime.
91///
92/// `Axon` represents an executable decision tree.
93/// It is reusable and thread-safe.
94///
95/// ## Example
96///
97/// ```rust,ignore
98/// use ranvier_core::prelude::*;
99/// // ...
100/// // Start with an identity Axon (In -> In)
101/// let axon = Axon::<(), (), _>::new("My Axon")
102///     .then(StepA)
103///     .then(StepB);
104///
105/// // Execute multiple times
106/// let res1 = axon.execute((), &mut bus1).await;
107/// let res2 = axon.execute((), &mut bus2).await;
108/// ```
109pub struct Axon<In, Out, E, Res = ()> {
110    /// The static structure (for visualization/analysis)
111    pub schematic: Schematic,
112    /// The runtime executor
113    executor: Executor<In, Out, E, Res>,
114    /// How this Axon is executed across the cluster
115    pub execution_mode: ExecutionMode,
116    /// Optional persistence store for state inspection
117    pub persistence_store: Option<Arc<dyn crate::persistence::PersistenceStore>>,
118    /// Optional audit sink for tamper-evident logging of interventions
119    pub audit_sink: Option<Arc<dyn AuditSink>>,
120    /// Optional dead-letter queue sink for storing failed events
121    pub dlq_sink: Option<Arc<dyn DlqSink>>,
122    /// Policy for handling event failures
123    pub dlq_policy: DlqPolicy,
124    /// Optional dynamic (hot-reloadable) DLQ policy — takes precedence over static `dlq_policy`
125    pub dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>,
126    /// Policy for automated saga compensation
127    pub saga_policy: SagaPolicy,
128    /// Optional dynamic (hot-reloadable) Saga policy — takes precedence over static `saga_policy`
129    pub dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>,
130    /// Registry for Saga compensation handlers
131    pub saga_compensation_registry:
132        Arc<std::sync::RwLock<ranvier_core::saga::SagaCompensationRegistry<E, Res>>>,
133    /// Optional IAM handle for identity verification at the Schematic boundary
134    pub iam_handle: Option<ranvier_core::iam::IamHandle>,
135}
136
137/// Schematic export request derived from command-line args/env.
138#[derive(Debug, Clone)]
139pub struct SchematicExportRequest {
140    /// Optional output file path. If omitted, schematic is written to stdout.
141    pub output: Option<PathBuf>,
142}
143
144impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
145    fn clone(&self) -> Self {
146        Self {
147            schematic: self.schematic.clone(),
148            executor: self.executor.clone(),
149            execution_mode: self.execution_mode.clone(),
150            persistence_store: self.persistence_store.clone(),
151            audit_sink: self.audit_sink.clone(),
152            dlq_sink: self.dlq_sink.clone(),
153            dlq_policy: self.dlq_policy.clone(),
154            dynamic_dlq_policy: self.dynamic_dlq_policy.clone(),
155            saga_policy: self.saga_policy.clone(),
156            dynamic_saga_policy: self.dynamic_saga_policy.clone(),
157            saga_compensation_registry: self.saga_compensation_registry.clone(),
158            iam_handle: self.iam_handle.clone(),
159        }
160    }
161}
162
163impl<In, E, Res> Axon<In, In, E, Res>
164where
165    In: Send + Sync + Serialize + DeserializeOwned + 'static,
166    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
167    Res: ranvier_core::transition::ResourceRequirement,
168{
169    /// Create a new Axon flow with the given label.
170    /// This is the preferred entry point per Flat API guidelines.
171    #[track_caller]
172    pub fn new(label: &str) -> Self {
173        let caller = Location::caller();
174        Self::start_with_source(label, caller)
175    }
176
177    /// Start defining a new Axon flow.
178    /// This creates an Identity Axon (In -> In) with no initial resource requirements.
179    #[track_caller]
180    pub fn start(label: &str) -> Self {
181        let caller = Location::caller();
182        Self::start_with_source(label, caller)
183    }
184
185    fn start_with_source(label: &str, caller: &'static Location<'static>) -> Self {
186        let node_id = uuid::Uuid::new_v4().to_string();
187        let node = Node {
188            id: node_id,
189            kind: NodeKind::Ingress,
190            label: label.to_string(),
191            description: None,
192            input_type: "void".to_string(),
193            output_type: type_name_of::<In>(),
194            resource_type: type_name_of::<Res>(),
195            metadata: Default::default(),
196            bus_capability: None,
197            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
198            position: None,
199            compensation_node_id: None,
200        };
201
202        let mut schematic = Schematic::new(label);
203        schematic.nodes.push(node);
204
205        let executor: Executor<In, In, E, Res> =
206            Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
207
208        Self {
209            schematic,
210            executor,
211            execution_mode: ExecutionMode::Local,
212            persistence_store: None,
213            audit_sink: None,
214            dlq_sink: None,
215            dlq_policy: DlqPolicy::default(),
216            dynamic_dlq_policy: None,
217            saga_policy: SagaPolicy::default(),
218            dynamic_saga_policy: None,
219            saga_compensation_registry: Arc::new(std::sync::RwLock::new(
220                ranvier_core::saga::SagaCompensationRegistry::new(),
221            )),
222            iam_handle: None,
223        }
224    }
225}
226
227impl<In, Out, E, Res> Axon<In, Out, E, Res>
228where
229    In: Send + Sync + Serialize + DeserializeOwned + 'static,
230    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
231    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
232    Res: ranvier_core::transition::ResourceRequirement,
233{
234    /// Update the Execution Mode for this Axon (e.g., Local vs Singleton).
235    pub fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
236        self.execution_mode = mode;
237        self
238    }
239
240    /// Set the schematic version for this Axon.
241    pub fn with_version(mut self, version: impl Into<String>) -> Self {
242        self.schematic.schema_version = version.into();
243        self
244    }
245
246    /// Attach a persistence store to enable state inspection via the Inspector.
247    pub fn with_persistence_store<S>(mut self, store: S) -> Self
248    where
249        S: crate::persistence::PersistenceStore + 'static,
250    {
251        self.persistence_store = Some(Arc::new(store));
252        self
253    }
254
255    /// Attach an audit sink for tamper-evident logging.
256    pub fn with_audit_sink<S>(mut self, sink: S) -> Self
257    where
258        S: AuditSink + 'static,
259    {
260        self.audit_sink = Some(Arc::new(sink));
261        self
262    }
263
264    /// Set the Dead Letter Queue sink for this Axon.
265    pub fn with_dlq_sink<S>(mut self, sink: S) -> Self
266    where
267        S: DlqSink + 'static,
268    {
269        self.dlq_sink = Some(Arc::new(sink));
270        self
271    }
272
273    /// Set the Dead Letter Queue policy for this Axon.
274    pub fn with_dlq_policy(mut self, policy: DlqPolicy) -> Self {
275        self.dlq_policy = policy;
276        self
277    }
278
279    /// Set the Saga compensation policy for this Axon.
280    pub fn with_saga_policy(mut self, policy: SagaPolicy) -> Self {
281        self.saga_policy = policy;
282        self
283    }
284
285    /// Set a dynamic (hot-reloadable) DLQ policy. When set, the dynamic policy's
286    /// current value is read at each execution, overriding the static `dlq_policy`.
287    pub fn with_dynamic_dlq_policy(mut self, dynamic: DynamicPolicy<DlqPolicy>) -> Self {
288        self.dynamic_dlq_policy = Some(dynamic);
289        self
290    }
291
292    /// Set a dynamic (hot-reloadable) Saga policy. When set, the dynamic policy's
293    /// current value is read at each execution, overriding the static `saga_policy`.
294    pub fn with_dynamic_saga_policy(mut self, dynamic: DynamicPolicy<SagaPolicy>) -> Self {
295        self.dynamic_saga_policy = Some(dynamic);
296        self
297    }
298
299    /// Set an IAM policy and verifier for identity verification at the Axon boundary.
300    ///
301    /// When set, `execute()` will:
302    /// 1. Read `IamToken` from the Bus (injected by the HTTP layer or test harness)
303    /// 2. Verify the token using the provided verifier
304    /// 3. Enforce the policy against the verified identity
305    /// 4. Insert the resulting `IamIdentity` into the Bus for downstream Transitions
306    pub fn with_iam(
307        mut self,
308        policy: ranvier_core::iam::IamPolicy,
309        verifier: impl ranvier_core::iam::IamVerifier + 'static,
310    ) -> Self {
311        self.iam_handle = Some(ranvier_core::iam::IamHandle::new(
312            policy,
313            Arc::new(verifier),
314        ));
315        self
316    }
317}
318
319#[async_trait]
320impl<In, Out, E, Res> ranvier_inspector::StateInspector for Axon<In, Out, E, Res>
321where
322    In: Send + Sync + Serialize + DeserializeOwned + 'static,
323    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
324    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
325    Res: ranvier_core::transition::ResourceRequirement,
326{
327    async fn get_state(&self, trace_id: &str) -> Option<serde_json::Value> {
328        let store = self.persistence_store.as_ref()?;
329        let trace = store.load(trace_id).await.ok().flatten()?;
330        Some(serde_json::to_value(trace).unwrap_or(serde_json::Value::Null))
331    }
332
333    async fn force_resume(
334        &self,
335        trace_id: &str,
336        target_node: &str,
337        payload_override: Option<Value>,
338    ) -> Result<(), String> {
339        let store = self
340            .persistence_store
341            .as_ref()
342            .ok_or("No persistence store attached to Axon")?;
343
344        let intervention = crate::persistence::Intervention {
345            target_node: target_node.to_string(),
346            payload_override,
347            timestamp_ms: now_ms(),
348        };
349
350        store
351            .save_intervention(trace_id, intervention)
352            .await
353            .map_err(|e| format!("Failed to save intervention: {}", e))?;
354
355        if let Some(sink) = self.audit_sink.as_ref() {
356            let event = AuditEvent::new(
357                uuid::Uuid::new_v4().to_string(),
358                "Inspector".to_string(),
359                "ForceResume".to_string(),
360                trace_id.to_string(),
361            )
362            .with_metadata("target_node", target_node);
363
364            let _ = sink.append(&event).await;
365        }
366
367        tracing::info!(trace_id = %trace_id, target_node = %target_node, "Force resume requested via Inspector");
368        Ok(())
369    }
370}
371
372impl<In, Out, E, Res> Axon<In, Out, E, Res>
373where
374    In: Send + Sync + Serialize + DeserializeOwned + 'static,
375    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
376    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
377    Res: ranvier_core::transition::ResourceRequirement,
378{
379    /// Chain a transition to this Axon.
380    ///
381    /// Requires the transition to use the SAME resource bundle as the previous steps.
382    #[track_caller]
383    pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
384    where
385        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
386        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
387    {
388        let caller = Location::caller();
389        // Decompose self to avoid partial move issues
390        let Axon {
391            mut schematic,
392            executor: prev_executor,
393            execution_mode,
394            persistence_store,
395            audit_sink,
396            dlq_sink,
397            dlq_policy,
398            dynamic_dlq_policy,
399            saga_policy,
400            dynamic_saga_policy,
401            saga_compensation_registry,
402            iam_handle,
403        } = self;
404
405        // Update Schematic
406        let next_node_id = uuid::Uuid::new_v4().to_string();
407        let next_node = Node {
408            id: next_node_id.clone(),
409            kind: NodeKind::Atom,
410            label: transition.label(),
411            description: transition.description(),
412            input_type: type_name_of::<Out>(),
413            output_type: type_name_of::<Next>(),
414            resource_type: type_name_of::<Res>(),
415            metadata: Default::default(),
416            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
417            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
418            position: transition
419                .position()
420                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
421            compensation_node_id: None,
422        };
423
424        let last_node_id = schematic
425            .nodes
426            .last()
427            .map(|n| n.id.clone())
428            .unwrap_or_default();
429
430        schematic.nodes.push(next_node);
431        schematic.edges.push(Edge {
432            from: last_node_id,
433            to: next_node_id.clone(),
434            kind: EdgeType::Linear,
435            label: Some("Next".to_string()),
436        });
437
438        // Compose Executor
439        let node_id_for_exec = next_node_id.clone();
440        let node_label_for_exec = transition.label();
441        let bus_policy_for_exec = transition.bus_access_policy();
442        let bus_policy_clone = bus_policy_for_exec.clone();
443        let current_step_idx = schematic.nodes.len() as u64 - 1;
444        let next_executor: Executor<In, Next, E, Res> = Arc::new(
445            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
446                let prev = prev_executor.clone();
447                let trans = transition.clone();
448                let timeline_node_id = node_id_for_exec.clone();
449                let timeline_node_label = node_label_for_exec.clone();
450                let transition_bus_policy = bus_policy_clone.clone();
451                let step_idx = current_step_idx;
452
453                Box::pin(async move {
454                    // Check for manual intervention jump
455                    if let Some(jump) = bus.read::<ManualJump>()
456                        && (jump.target_node == timeline_node_id
457                            || jump.target_node == timeline_node_label)
458                    {
459                        tracing::info!(
460                            node_id = %timeline_node_id,
461                            node_label = %timeline_node_label,
462                            "Manual jump target reached; skipping previous steps"
463                        );
464
465                        let state = if let Some(ow) = jump.payload_override.clone() {
466                            match serde_json::from_value::<Out>(ow) {
467                                Ok(s) => s,
468                                Err(e) => {
469                                    tracing::error!(
470                                        "Payload override deserialization failed: {}",
471                                        e
472                                    );
473                                    return Outcome::emit(
474                                        "execution.jump.payload_error",
475                                        Some(serde_json::json!({"error": e.to_string()})),
476                                    )
477                                    .map(|_: ()| unreachable!());
478                                }
479                            }
480                        } else {
481                            // Default back to the provided input if this is an identity jump or types match
482                            // For now, treat missing payload on a mid-flow jump as an avoidable error if Possible.
483                            // In a better implementation, we'd try to load the last persisted Out for the previous step.
484                            return Outcome::emit(
485                                "execution.jump.missing_payload",
486                                Some(serde_json::json!({"node_id": timeline_node_id})),
487                            );
488                        };
489
490                        // Skip prev() and continue with trans.run(state, ...)
491                        return run_this_step::<Out, Next, E, Res>(
492                            &trans,
493                            state,
494                            res,
495                            bus,
496                            &timeline_node_id,
497                            &timeline_node_label,
498                            &transition_bus_policy,
499                            step_idx,
500                        )
501                        .await;
502                    }
503
504                    // Handle resumption skip
505                    if let Some(start) = bus.read::<StartStep>()
506                        && step_idx == start.0
507                        && bus.read::<ResumptionState>().is_some()
508                    {
509                        // Prefer fresh input (In → Out via JSON round-trip).
510                        // The caller provides updated state (e.g., corrected data after a fault).
511                        // Falls back to persisted checkpoint state when types are incompatible.
512                        let fresh_state = serde_json::to_value(&input)
513                            .ok()
514                            .and_then(|v| serde_json::from_value::<Out>(v).ok());
515                        let persisted_state = bus
516                            .read::<ResumptionState>()
517                            .and_then(|r| r.payload.clone())
518                            .and_then(|p| serde_json::from_value::<Out>(p).ok());
519
520                        if let Some(s) = fresh_state.or(persisted_state) {
521                            tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint");
522                            return run_this_step::<Out, Next, E, Res>(
523                                &trans,
524                                s,
525                                res,
526                                bus,
527                                &timeline_node_id,
528                                &timeline_node_label,
529                                &transition_bus_policy,
530                                step_idx,
531                            )
532                            .await;
533                        }
534
535                        return Outcome::emit(
536                            "execution.resumption.payload_error",
537                            Some(serde_json::json!({"error": "no compatible resumption state"})),
538                        )
539                        .map(|_: ()| unreachable!());
540                    }
541
542                    // Run previous step
543                    let prev_result = prev(input, res, bus).await;
544
545                    // Unpack
546                    let state = match prev_result {
547                        Outcome::Next(t) => t,
548                        other => return other.map(|_| unreachable!()),
549                    };
550
551                    run_this_step::<Out, Next, E, Res>(
552                        &trans,
553                        state,
554                        res,
555                        bus,
556                        &timeline_node_id,
557                        &timeline_node_label,
558                        &transition_bus_policy,
559                        step_idx,
560                    )
561                    .await
562                })
563            },
564        );
565        Axon {
566            schematic,
567            executor: next_executor,
568            execution_mode,
569            persistence_store,
570            audit_sink,
571            dlq_sink,
572            dlq_policy,
573            dynamic_dlq_policy,
574            saga_policy,
575            dynamic_saga_policy,
576            saga_compensation_registry,
577            iam_handle,
578        }
579    }
580
581    /// Chain a transition to this Axon with a Saga compensation step.
582    ///
583    /// If the transition fails, the compensation transition will be executed
584    /// automatically if `CompensationAutoTrigger` is enabled in the Bus.
585    #[track_caller]
586    pub fn then_compensated<Next, Trans, Comp>(
587        self,
588        transition: Trans,
589        compensation: Comp,
590    ) -> Axon<In, Next, E, Res>
591    where
592        Out: Clone,
593        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
594        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
595        Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
596    {
597        let caller = Location::caller();
598        let Axon {
599            mut schematic,
600            executor: prev_executor,
601            execution_mode,
602            persistence_store,
603            audit_sink,
604            dlq_sink,
605            dlq_policy,
606            dynamic_dlq_policy,
607            saga_policy,
608            dynamic_saga_policy,
609            saga_compensation_registry,
610            iam_handle,
611        } = self;
612
613        // 1. Add Primary Node
614        let next_node_id = uuid::Uuid::new_v4().to_string();
615        let comp_node_id = uuid::Uuid::new_v4().to_string();
616
617        let next_node = Node {
618            id: next_node_id.clone(),
619            kind: NodeKind::Atom,
620            label: transition.label(),
621            description: transition.description(),
622            input_type: type_name_of::<Out>(),
623            output_type: type_name_of::<Next>(),
624            resource_type: type_name_of::<Res>(),
625            metadata: Default::default(),
626            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
627            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
628            position: transition
629                .position()
630                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
631            compensation_node_id: Some(comp_node_id.clone()),
632        };
633
634        // 2. Add Compensation Node
635        let comp_node = Node {
636            id: comp_node_id.clone(),
637            kind: NodeKind::Atom,
638            label: format!("Compensate: {}", compensation.label()),
639            description: compensation.description(),
640            input_type: type_name_of::<Out>(),
641            output_type: "void".to_string(),
642            resource_type: type_name_of::<Res>(),
643            metadata: Default::default(),
644            bus_capability: None,
645            source_location: None,
646            position: compensation
647                .position()
648                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
649            compensation_node_id: None,
650        };
651
652        let last_node_id = schematic
653            .nodes
654            .last()
655            .map(|n| n.id.clone())
656            .unwrap_or_default();
657
658        schematic.nodes.push(next_node);
659        schematic.nodes.push(comp_node);
660        schematic.edges.push(Edge {
661            from: last_node_id,
662            to: next_node_id.clone(),
663            kind: EdgeType::Linear,
664            label: Some("Next".to_string()),
665        });
666
667        // 3. Compose Executor with Compensation Logic
668        let node_id_for_exec = next_node_id.clone();
669        let comp_id_for_exec = comp_node_id.clone();
670        let node_label_for_exec = transition.label();
671        let bus_policy_for_exec = transition.bus_access_policy();
672        let step_idx_for_exec = schematic.nodes.len() as u64 - 2;
673        let comp_for_exec = compensation.clone();
674        let bus_policy_for_executor = bus_policy_for_exec.clone();
675        let bus_policy_for_registry = bus_policy_for_exec.clone();
676        let next_executor: Executor<In, Next, E, Res> = Arc::new(
677            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
678                let prev = prev_executor.clone();
679                let trans = transition.clone();
680                let comp = comp_for_exec.clone();
681                let timeline_node_id = node_id_for_exec.clone();
682                let timeline_comp_id = comp_id_for_exec.clone();
683                let timeline_node_label = node_label_for_exec.clone();
684                let transition_bus_policy = bus_policy_for_executor.clone();
685                let step_idx = step_idx_for_exec;
686
687                Box::pin(async move {
688                    // Check for manual intervention jump
689                    if let Some(jump) = bus.read::<ManualJump>()
690                        && (jump.target_node == timeline_node_id
691                            || jump.target_node == timeline_node_label)
692                    {
693                        tracing::info!(
694                            node_id = %timeline_node_id,
695                            node_label = %timeline_node_label,
696                            "Manual jump target reached (compensated); skipping previous steps"
697                        );
698
699                        let state = if let Some(ow) = jump.payload_override.clone() {
700                            match serde_json::from_value::<Out>(ow) {
701                                Ok(s) => s,
702                                Err(e) => {
703                                    tracing::error!(
704                                        "Payload override deserialization failed: {}",
705                                        e
706                                    );
707                                    return Outcome::emit(
708                                        "execution.jump.payload_error",
709                                        Some(serde_json::json!({"error": e.to_string()})),
710                                    )
711                                    .map(|_: ()| unreachable!());
712                                }
713                            }
714                        } else {
715                            return Outcome::emit(
716                                "execution.jump.missing_payload",
717                                Some(serde_json::json!({"node_id": timeline_node_id})),
718                            )
719                            .map(|_: ()| unreachable!());
720                        };
721
722                        // Skip prev() and continue with trans.run(state, ...)
723                        return run_this_compensated_step::<Out, Next, E, Res, Comp>(
724                            &trans,
725                            &comp,
726                            state,
727                            res,
728                            bus,
729                            &timeline_node_id,
730                            &timeline_comp_id,
731                            &timeline_node_label,
732                            &transition_bus_policy,
733                            step_idx,
734                        )
735                        .await;
736                    }
737
738                    // Handle resumption skip
739                    if let Some(start) = bus.read::<StartStep>()
740                        && step_idx == start.0
741                        && bus.read::<ResumptionState>().is_some()
742                    {
743                        let fresh_state = serde_json::to_value(&input)
744                            .ok()
745                            .and_then(|v| serde_json::from_value::<Out>(v).ok());
746                        let persisted_state = bus
747                            .read::<ResumptionState>()
748                            .and_then(|r| r.payload.clone())
749                            .and_then(|p| serde_json::from_value::<Out>(p).ok());
750
751                        if let Some(s) = fresh_state.or(persisted_state) {
752                            tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint (compensated)");
753                            return run_this_compensated_step::<Out, Next, E, Res, Comp>(
754                                &trans,
755                                &comp,
756                                s,
757                                res,
758                                bus,
759                                &timeline_node_id,
760                                &timeline_comp_id,
761                                &timeline_node_label,
762                                &transition_bus_policy,
763                                step_idx,
764                            )
765                            .await;
766                        }
767
768                        return Outcome::emit(
769                            "execution.resumption.payload_error",
770                            Some(serde_json::json!({"error": "no compatible resumption state"})),
771                        )
772                        .map(|_: ()| unreachable!());
773                    }
774
775                    // Run previous step
776                    let prev_result = prev(input, res, bus).await;
777
778                    // Unpack
779                    let state = match prev_result {
780                        Outcome::Next(t) => t,
781                        other => return other.map(|_| unreachable!()),
782                    };
783
784                    run_this_compensated_step::<Out, Next, E, Res, Comp>(
785                        &trans,
786                        &comp,
787                        state,
788                        res,
789                        bus,
790                        &timeline_node_id,
791                        &timeline_comp_id,
792                        &timeline_node_label,
793                        &transition_bus_policy,
794                        step_idx,
795                    )
796                    .await
797                })
798            },
799        );
800        // 4. Register Saga Compensation if enabled
801        {
802            let mut registry = saga_compensation_registry.write().unwrap();
803            let comp_fn = compensation.clone();
804            let transition_bus_policy = bus_policy_for_registry.clone();
805
806            let handler: ranvier_core::saga::SagaCompensationFn<E, Res> =
807                Arc::new(move |input_data, res, bus| {
808                    let comp = comp_fn.clone();
809                    let bus_policy = transition_bus_policy.clone();
810                    Box::pin(async move {
811                        let input: Out = serde_json::from_slice(&input_data).unwrap();
812                        bus.set_access_policy(comp.label(), bus_policy);
813                        let res = comp.run(input, res, bus).await;
814                        bus.clear_access_policy();
815                        res
816                    })
817                });
818            registry.register(next_node_id.clone(), handler);
819        }
820
821        Axon {
822            schematic,
823            executor: next_executor,
824            execution_mode,
825            persistence_store,
826            audit_sink,
827            dlq_sink,
828            dlq_policy,
829            dynamic_dlq_policy,
830            saga_policy,
831            dynamic_saga_policy,
832            saga_compensation_registry,
833            iam_handle,
834        }
835    }
836
837    /// Attach a compensation transition to the previously added node.
838    /// This establishes a Schematic-level Saga compensation mapping.
839    #[track_caller]
840    pub fn compensate_with<Comp>(mut self, transition: Comp) -> Self
841    where
842        Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
843    {
844        // NOTE: This currently only updates the Schematic.
845        // For runtime compensation behavior, use `then_compensated`.
846        let caller = Location::caller();
847        let comp_node_id = uuid::Uuid::new_v4().to_string();
848
849        let comp_node = Node {
850            id: comp_node_id.clone(),
851            kind: NodeKind::Atom,
852            label: transition.label(),
853            description: transition.description(),
854            input_type: type_name_of::<Out>(),
855            output_type: "void".to_string(),
856            resource_type: type_name_of::<Res>(),
857            metadata: Default::default(),
858            bus_capability: None,
859            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
860            position: transition
861                .position()
862                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
863            compensation_node_id: None,
864        };
865
866        if let Some(last_node) = self.schematic.nodes.last_mut() {
867            last_node.compensation_node_id = Some(comp_node_id.clone());
868        }
869
870        self.schematic.nodes.push(comp_node);
871        self
872    }
873
874    /// Add a branch point
875    #[track_caller]
876    pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
877        let caller = Location::caller();
878        let branch_id_str = branch_id.into();
879        let last_node_id = self
880            .schematic
881            .nodes
882            .last()
883            .map(|n| n.id.clone())
884            .unwrap_or_default();
885
886        let branch_node = Node {
887            id: uuid::Uuid::new_v4().to_string(),
888            kind: NodeKind::Synapse,
889            label: label.to_string(),
890            description: None,
891            input_type: type_name_of::<Out>(),
892            output_type: type_name_of::<Out>(),
893            resource_type: type_name_of::<Res>(),
894            metadata: Default::default(),
895            bus_capability: None,
896            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
897            position: None,
898            compensation_node_id: None,
899        };
900
901        self.schematic.nodes.push(branch_node);
902        self.schematic.edges.push(Edge {
903            from: last_node_id,
904            to: branch_id_str.clone(),
905            kind: EdgeType::Branch(branch_id_str),
906            label: Some("Branch".to_string()),
907        });
908
909        self
910    }
911
912    /// Execute the Axon with the given input and resources.
913    pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
914        if let ExecutionMode::Singleton {
915            lock_key,
916            ttl_ms,
917            lock_provider,
918        } = &self.execution_mode
919        {
920            let trace_span = tracing::info_span!("Singleton Execution", key = %lock_key);
921            let _enter = trace_span.enter();
922            match lock_provider.try_acquire(lock_key, *ttl_ms).await {
923                Ok(true) => {
924                    tracing::debug!("Successfully acquired singleton lock: {}", lock_key);
925                }
926                Ok(false) => {
927                    tracing::debug!(
928                        "Singleton lock {} already held, aborting execution.",
929                        lock_key
930                    );
931                    // Emit a specific event indicating skip
932                    return Outcome::emit(
933                        "execution.skipped.lock_held",
934                        Some(serde_json::json!({
935                            "lock_key": lock_key
936                        })),
937                    );
938                }
939                Err(e) => {
940                    tracing::error!("Failed to check singleton lock {}: {:?}", lock_key, e);
941                    return Outcome::emit(
942                        "execution.skipped.lock_error",
943                        Some(serde_json::json!({
944                            "error": e.to_string()
945                        })),
946                    );
947                }
948            }
949        }
950
951        // ── IAM Boundary Check ────────────────────────────────────
952        if let Some(iam) = &self.iam_handle {
953            use ranvier_core::iam::{IamPolicy, IamToken, enforce_policy};
954
955            if matches!(iam.policy, IamPolicy::None) {
956                // No verification required — skip
957            } else {
958                let token = bus.read::<IamToken>().map(|t| t.0.clone());
959
960                match token {
961                    Some(raw_token) => {
962                        match iam.verifier.verify(&raw_token).await {
963                            Ok(identity) => {
964                                if let Err(e) = enforce_policy(&iam.policy, &identity) {
965                                    tracing::warn!(
966                                        policy = ?iam.policy,
967                                        subject = %identity.subject,
968                                        "IAM policy enforcement failed: {}",
969                                        e
970                                    );
971                                    return Outcome::emit(
972                                        "iam.policy_denied",
973                                        Some(serde_json::json!({
974                                            "error": e.to_string(),
975                                            "subject": identity.subject,
976                                        })),
977                                    );
978                                }
979                                // Insert verified identity into Bus for downstream access
980                                bus.insert(identity);
981                            }
982                            Err(e) => {
983                                tracing::warn!("IAM token verification failed: {}", e);
984                                return Outcome::emit(
985                                    "iam.verification_failed",
986                                    Some(serde_json::json!({
987                                        "error": e.to_string()
988                                    })),
989                                );
990                            }
991                        }
992                    }
993                    None => {
994                        tracing::warn!("IAM policy requires token but none found in Bus");
995                        return Outcome::emit("iam.missing_token", None);
996                    }
997                }
998            }
999        }
1000
1001        let trace_id = persistence_trace_id(bus);
1002        let label = self.schematic.name.clone();
1003
1004        // Inject DLQ config into Bus for step-level reporting
1005        if let Some(sink) = &self.dlq_sink {
1006            bus.insert(sink.clone());
1007        }
1008        // Prefer dynamic (hot-reloadable) policy if available, otherwise use static
1009        let effective_dlq_policy = self
1010            .dynamic_dlq_policy
1011            .as_ref()
1012            .map(|d| d.current())
1013            .unwrap_or_else(|| self.dlq_policy.clone());
1014        bus.insert(effective_dlq_policy);
1015        bus.insert(self.schematic.clone());
1016        let effective_saga_policy = self
1017            .dynamic_saga_policy
1018            .as_ref()
1019            .map(|d| d.current())
1020            .unwrap_or_else(|| self.saga_policy.clone());
1021        bus.insert(effective_saga_policy.clone());
1022
1023        // Initialize Saga stack if enabled and not already present (e.g. from resumption)
1024        if effective_saga_policy == SagaPolicy::Enabled && bus.read::<SagaStack>().is_none() {
1025            bus.insert(SagaStack::new());
1026        }
1027
1028        let persistence_handle = bus.read::<PersistenceHandle>().cloned();
1029        let compensation_handle = bus.read::<CompensationHandle>().cloned();
1030        let compensation_retry_policy = compensation_retry_policy(bus);
1031        let compensation_idempotency = bus.read::<CompensationIdempotencyHandle>().cloned();
1032        let version = self.schematic.schema_version.clone();
1033        let migration_registry = bus
1034            .read::<ranvier_core::schematic::MigrationRegistry>()
1035            .cloned();
1036
1037        let persistence_start_step = if let Some(handle) = persistence_handle.as_ref() {
1038            let (mut start_step, trace_version, intervention, last_node_id, mut last_payload) =
1039                load_persistence_version(handle, &trace_id).await;
1040
1041            if let Some(interv) = intervention {
1042                tracing::info!(
1043                    trace_id = %trace_id,
1044                    target_node = %interv.target_node,
1045                    "Applying manual intervention command"
1046                );
1047
1048                // Find the step index for the target node
1049                if let Some(target_idx) = self
1050                    .schematic
1051                    .nodes
1052                    .iter()
1053                    .position(|n| n.id == interv.target_node || n.label == interv.target_node)
1054                {
1055                    tracing::info!(
1056                        trace_id = %trace_id,
1057                        target_node = %interv.target_node,
1058                        target_step = target_idx,
1059                        "Intervention: Jumping to target node"
1060                    );
1061                    start_step = target_idx as u64;
1062
1063                    // Inject ManualJump into the bus so executors can handle skipping/payload overrides
1064                    bus.insert(ManualJump {
1065                        target_node: interv.target_node.clone(),
1066                        payload_override: interv.payload_override.clone(),
1067                    });
1068
1069                    // Log audit event for intervention application
1070                    if let Some(sink) = self.audit_sink.as_ref() {
1071                        let event = AuditEvent::new(
1072                            uuid::Uuid::new_v4().to_string(),
1073                            "System".to_string(),
1074                            "ApplyIntervention".to_string(),
1075                            trace_id.to_string(),
1076                        )
1077                        .with_metadata("target_node", interv.target_node.clone())
1078                        .with_metadata("target_step", target_idx);
1079
1080                        let _ = sink.append(&event).await;
1081                    }
1082                } else {
1083                    tracing::warn!(
1084                        trace_id = %trace_id,
1085                        target_node = %interv.target_node,
1086                        "Intervention target node not found in schematic; ignoring jump"
1087                    );
1088                }
1089            }
1090
1091            if let Some(old_version) = trace_version
1092                && old_version != version
1093            {
1094                tracing::info!(
1095                    trace_id = %trace_id,
1096                    old_version = %old_version,
1097                    current_version = %version,
1098                    "Version mismatch detected during resumption"
1099                );
1100
1101                // Try multi-hop migration path first, fall back to direct lookup
1102                let migration_path = migration_registry
1103                    .as_ref()
1104                    .and_then(|r| r.find_migration_path(&old_version, &version));
1105
1106                let (final_migration, mapped_payload) = if let Some(path) = migration_path {
1107                    if path.is_empty() {
1108                        (None, last_payload.clone())
1109                    } else {
1110                        // Apply payload mappers along the migration chain
1111                        let mut payload = last_payload.clone();
1112                        for hop in &path {
1113                            if let (Some(mapper), Some(p)) = (&hop.payload_mapper, payload.as_ref())
1114                            {
1115                                match mapper.map_state(p) {
1116                                    Ok(mapped) => payload = Some(mapped),
1117                                    Err(e) => {
1118                                        tracing::error!(
1119                                            trace_id = %trace_id,
1120                                            from = %hop.from_version,
1121                                            to = %hop.to_version,
1122                                            error = %e,
1123                                            "Payload migration mapper failed"
1124                                        );
1125                                        return Outcome::emit(
1126                                            "execution.resumption.payload_migration_failed",
1127                                            Some(serde_json::json!({
1128                                                "trace_id": trace_id,
1129                                                "from": hop.from_version,
1130                                                "to": hop.to_version,
1131                                                "error": e.to_string()
1132                                            })),
1133                                        );
1134                                    }
1135                                }
1136                            }
1137                        }
1138                        let hops: Vec<String> = path
1139                            .iter()
1140                            .map(|h| format!("{}->{}", h.from_version, h.to_version))
1141                            .collect();
1142                        tracing::info!(trace_id = %trace_id, hops = ?hops, "Applied multi-hop migration path");
1143                        (path.last().copied(), payload)
1144                    }
1145                } else {
1146                    (None, last_payload.clone())
1147                };
1148
1149                // Use the final migration in the path to determine strategy
1150                let migration = final_migration.or_else(|| {
1151                    migration_registry
1152                        .as_ref()
1153                        .and_then(|r| r.find_migration(&old_version, &version))
1154                });
1155
1156                // Update last_payload with mapped version
1157                if mapped_payload.is_some() {
1158                    last_payload = mapped_payload;
1159                }
1160
1161                let strategy = if let (Some(m), Some(node_id)) = (migration, last_node_id.as_ref())
1162                {
1163                    m.node_mapping
1164                        .get(node_id)
1165                        .cloned()
1166                        .unwrap_or(m.default_strategy.clone())
1167                } else {
1168                    migration
1169                        .map(|m| m.default_strategy.clone())
1170                        .unwrap_or(ranvier_core::schematic::MigrationStrategy::Fail)
1171                };
1172
1173                match strategy {
1174                    ranvier_core::schematic::MigrationStrategy::ResumeFromStart => {
1175                        tracing::info!(trace_id = %trace_id, "Applying ResumeFromStart migration strategy");
1176                        start_step = 0;
1177                    }
1178                    ranvier_core::schematic::MigrationStrategy::MigrateActiveNode {
1179                        new_node_id,
1180                        ..
1181                    } => {
1182                        tracing::info!(trace_id = %trace_id, to_node = %new_node_id, "Applying MigrateActiveNode strategy");
1183                        if let Some(target_idx) = self
1184                            .schematic
1185                            .nodes
1186                            .iter()
1187                            .position(|n| n.id == new_node_id || n.label == new_node_id)
1188                        {
1189                            start_step = target_idx as u64;
1190                        } else {
1191                            tracing::warn!(trace_id = %trace_id, "MigrateActiveNode: target node {} not found", new_node_id);
1192                            return Outcome::emit(
1193                                "execution.resumption.migration_target_not_found",
1194                                Some(serde_json::json!({ "node_id": new_node_id })),
1195                            );
1196                        }
1197                    }
1198                    ranvier_core::schematic::MigrationStrategy::FallbackToNode(node_id) => {
1199                        tracing::info!(trace_id = %trace_id, to_node = %node_id, "Applying FallbackToNode strategy");
1200                        if let Some(target_idx) = self
1201                            .schematic
1202                            .nodes
1203                            .iter()
1204                            .position(|n| n.id == node_id || n.label == node_id)
1205                        {
1206                            start_step = target_idx as u64;
1207                        } else {
1208                            tracing::warn!(trace_id = %trace_id, "FallbackToNode: node {} not found", node_id);
1209                            return Outcome::emit(
1210                                "execution.resumption.migration_target_not_found",
1211                                Some(serde_json::json!({ "node_id": node_id })),
1212                            );
1213                        }
1214                    }
1215                    ranvier_core::schematic::MigrationStrategy::Fail => {
1216                        tracing::error!(trace_id = %trace_id, "Version mismatch: no migration path found. Failing resumption.");
1217                        return Outcome::emit(
1218                            "execution.resumption.version_mismatch_failed",
1219                            Some(serde_json::json!({
1220                                "trace_id": trace_id,
1221                                "old_version": old_version,
1222                                "current_version": version
1223                            })),
1224                        );
1225                    }
1226                    _ => {
1227                        tracing::error!(trace_id = %trace_id, "Unsupported migration strategy: {:?}", strategy);
1228                        return Outcome::emit(
1229                            "execution.resumption.unsupported_migration",
1230                            Some(serde_json::json!({
1231                                "trace_id": trace_id,
1232                                "strategy": format!("{:?}", strategy)
1233                            })),
1234                        );
1235                    }
1236                }
1237            }
1238
1239            let ingress_node_id = self.schematic.nodes.first().map(|n| n.id.clone());
1240            persist_execution_event(
1241                handle,
1242                &trace_id,
1243                &label,
1244                &version,
1245                start_step,
1246                ingress_node_id,
1247                "Enter",
1248                None,
1249            )
1250            .await;
1251
1252            bus.insert(StartStep(start_step));
1253            if start_step > 0 {
1254                bus.insert(ResumptionState {
1255                    payload: last_payload,
1256                });
1257            }
1258
1259            Some(start_step)
1260        } else {
1261            None
1262        };
1263
1264        let should_capture = should_attach_timeline(bus);
1265        let inserted_timeline = if should_capture {
1266            ensure_timeline(bus)
1267        } else {
1268            false
1269        };
1270        let ingress_started = std::time::Instant::now();
1271        let ingress_enter_ts = now_ms();
1272        if should_capture
1273            && let (Some(timeline), Some(ingress)) =
1274                (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1275        {
1276            timeline.push(TimelineEvent::NodeEnter {
1277                node_id: ingress.id.clone(),
1278                node_label: ingress.label.clone(),
1279                timestamp: ingress_enter_ts,
1280            });
1281        }
1282
1283        let circuit_span = tracing::info_span!(
1284            "Circuit",
1285            ranvier.circuit = %label,
1286            ranvier.outcome_kind = tracing::field::Empty,
1287            ranvier.outcome_target = tracing::field::Empty
1288        );
1289        let outcome = (self.executor)(input, resources, bus)
1290            .instrument(circuit_span.clone())
1291            .await;
1292        circuit_span.record("ranvier.outcome_kind", outcome_kind_name(&outcome));
1293        if let Some(target) = outcome_target(&outcome) {
1294            circuit_span.record("ranvier.outcome_target", tracing::field::display(&target));
1295        }
1296
1297        // Automated Saga Rollback (LIFO)
1298        if matches!(outcome, Outcome::Fault(_)) && self.saga_policy == SagaPolicy::Enabled {
1299            while let Some(task) = {
1300                let mut stack = bus.read_mut::<SagaStack>();
1301                stack.as_mut().and_then(|s| s.pop())
1302            } {
1303                tracing::info!(trace_id = %trace_id, node_id = %task.node_id, "Compensating step: {}", task.node_label);
1304
1305                let handler = {
1306                    let registry = self.saga_compensation_registry.read().unwrap();
1307                    registry.get(&task.node_id)
1308                };
1309                if let Some(handler) = handler {
1310                    let res = handler(task.input_snapshot, resources, bus).await;
1311                    if let Outcome::Fault(e) = res {
1312                        tracing::error!(trace_id = %trace_id, node_id = %task.node_id, "Saga compensation FAILED: {:?}", e);
1313                    }
1314                } else {
1315                    tracing::warn!(trace_id = %trace_id, node_id = %task.node_id, "No compensation handler found in registry for saga rollback");
1316                }
1317            }
1318            tracing::info!(trace_id = %trace_id, "Saga automated rollback completed");
1319        }
1320
1321        let ingress_exit_ts = now_ms();
1322        if should_capture
1323            && let (Some(timeline), Some(ingress)) =
1324                (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1325        {
1326            timeline.push(TimelineEvent::NodeExit {
1327                node_id: ingress.id.clone(),
1328                outcome_type: outcome_type_name(&outcome),
1329                duration_ms: ingress_started.elapsed().as_millis() as u64,
1330                timestamp: ingress_exit_ts,
1331            });
1332        }
1333
1334        if let Some(handle) = persistence_handle.as_ref() {
1335            let fault_step = persistence_start_step.map(|s| s + 1).unwrap_or(1);
1336            persist_execution_event(
1337                handle,
1338                &trace_id,
1339                &label,
1340                &version,
1341                fault_step,
1342                None, // Outcome-level events might not have a single node_id context here
1343                outcome_kind_name(&outcome),
1344                Some(outcome.to_json_value()),
1345            )
1346            .await;
1347
1348            let mut completion = completion_from_outcome(&outcome);
1349            if matches!(outcome, Outcome::Fault(_))
1350                && let Some(compensation) = compensation_handle.as_ref()
1351                && compensation_auto_trigger(bus)
1352            {
1353                let context = CompensationContext {
1354                    trace_id: trace_id.clone(),
1355                    circuit: label.clone(),
1356                    fault_kind: outcome_kind_name(&outcome).to_string(),
1357                    fault_step,
1358                    timestamp_ms: now_ms(),
1359                };
1360
1361                if run_compensation(
1362                    compensation,
1363                    context,
1364                    compensation_retry_policy,
1365                    compensation_idempotency.clone(),
1366                )
1367                .await
1368                {
1369                    persist_execution_event(
1370                        handle,
1371                        &trace_id,
1372                        &label,
1373                        &version,
1374                        fault_step.saturating_add(1),
1375                        None,
1376                        "Compensated",
1377                        None,
1378                    )
1379                    .await;
1380                    completion = CompletionState::Compensated;
1381                }
1382            }
1383
1384            if persistence_auto_complete(bus) {
1385                persist_completion(handle, &trace_id, completion).await;
1386            }
1387        }
1388
1389        if should_capture {
1390            maybe_export_timeline(bus, &outcome);
1391        }
1392        if inserted_timeline {
1393            let _ = bus.remove::<Timeline>();
1394        }
1395
1396        outcome
1397    }
1398
1399    /// Starts the Ranvier Inspector for this Axon on the specified port.
1400    /// This spawns a background task to serve the Schematic.
1401    pub fn serve_inspector(self, port: u16) -> Self {
1402        if !inspector_dev_mode_from_env() {
1403            tracing::info!("Inspector disabled because RANVIER_MODE is production");
1404            return self;
1405        }
1406        if !inspector_enabled_from_env() {
1407            tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
1408            return self;
1409        }
1410
1411        let schematic = self.schematic.clone();
1412        let axon_inspector = Arc::new(self.clone());
1413        tokio::spawn(async move {
1414            if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
1415                .with_projection_files_from_env()
1416                .with_mode_from_env()
1417                .with_auth_policy_from_env()
1418                .with_state_inspector(axon_inspector)
1419                .serve()
1420                .await
1421            {
1422                tracing::error!("Inspector server failed: {}", e);
1423            }
1424        });
1425        self
1426    }
1427
1428    /// Get a reference to the Schematic (structural view).
1429    pub fn schematic(&self) -> &Schematic {
1430        &self.schematic
1431    }
1432
1433    /// Consume and return the Schematic.
1434    pub fn into_schematic(self) -> Schematic {
1435        self.schematic
1436    }
1437
1438    /// Detect schematic export mode from runtime flags.
1439    ///
1440    /// Supported triggers:
1441    /// - `RANVIER_SCHEMATIC=1|true|on|yes`
1442    /// - `--schematic`
1443    ///
1444    /// Optional output path:
1445    /// - `RANVIER_SCHEMATIC_OUTPUT=<path>`
1446    /// - `--schematic-output <path>` / `--schematic-output=<path>`
1447    /// - `--output <path>` / `--output=<path>` (only relevant in schematic mode)
1448    pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
1449        schematic_export_request_from_process()
1450    }
1451
1452    /// Export schematic and return `true` when schematic mode is active.
1453    ///
1454    /// Use this once after circuit construction and before server/custom loops:
1455    ///
1456    /// ```rust,ignore
1457    /// let axon = build_axon();
1458    /// if axon.maybe_export_and_exit()? {
1459    ///     return Ok(());
1460    /// }
1461    /// // Normal runtime path...
1462    /// ```
1463    pub fn maybe_export_and_exit(&self) -> anyhow::Result<bool> {
1464        self.maybe_export_and_exit_with(|_| ())
1465    }
1466
1467    /// Same as [`Self::maybe_export_and_exit`] but allows a custom hook right before export/exit.
1468    ///
1469    /// This is useful when your app has custom loop/bootstrap behavior and you want
1470    /// to skip or cleanup that logic in schematic mode.
1471    pub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> anyhow::Result<bool>
1472    where
1473        F: FnOnce(&SchematicExportRequest),
1474    {
1475        let Some(request) = self.schematic_export_request() else {
1476            return Ok(false);
1477        };
1478        on_before_exit(&request);
1479        self.export_schematic(&request)?;
1480        Ok(true)
1481    }
1482
1483    /// Export schematic according to the provided request.
1484    pub fn export_schematic(&self, request: &SchematicExportRequest) -> anyhow::Result<()> {
1485        let json = serde_json::to_string_pretty(self.schematic())?;
1486        if let Some(path) = &request.output {
1487            if let Some(parent) = path.parent()
1488                && !parent.as_os_str().is_empty()
1489            {
1490                fs::create_dir_all(parent)?;
1491            }
1492            fs::write(path, json.as_bytes())?;
1493            return Ok(());
1494        }
1495        println!("{}", json);
1496        Ok(())
1497    }
1498}
1499
1500fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
1501    let args: Vec<OsString> = std::env::args_os().skip(1).collect();
1502    let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
1503    let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
1504
1505    let mut i = 0;
1506    while i < args.len() {
1507        let arg = args[i].to_string_lossy();
1508
1509        if arg == "--schematic" {
1510            enabled = true;
1511            i += 1;
1512            continue;
1513        }
1514
1515        if arg == "--schematic-output" || arg == "--output" {
1516            if let Some(next) = args.get(i + 1) {
1517                output = Some(PathBuf::from(next));
1518                i += 2;
1519                continue;
1520            }
1521        } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
1522            output = Some(PathBuf::from(value));
1523        } else if let Some(value) = arg.strip_prefix("--output=") {
1524            output = Some(PathBuf::from(value));
1525        }
1526
1527        i += 1;
1528    }
1529
1530    if enabled {
1531        Some(SchematicExportRequest { output })
1532    } else {
1533        None
1534    }
1535}
1536
1537fn env_flag_is_true(key: &str) -> bool {
1538    match std::env::var(key) {
1539        Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
1540        Err(_) => false,
1541    }
1542}
1543
1544fn inspector_enabled_from_env() -> bool {
1545    let raw = std::env::var("RANVIER_INSPECTOR").ok();
1546    inspector_enabled_from_value(raw.as_deref())
1547}
1548
1549fn inspector_enabled_from_value(value: Option<&str>) -> bool {
1550    match value {
1551        Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
1552        None => true,
1553    }
1554}
1555
1556fn inspector_dev_mode_from_env() -> bool {
1557    let raw = std::env::var("RANVIER_MODE").ok();
1558    inspector_dev_mode_from_value(raw.as_deref())
1559}
1560
1561fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
1562    !matches!(
1563        value.map(|v| v.to_ascii_lowercase()),
1564        Some(mode) if mode == "prod" || mode == "production"
1565    )
1566}
1567
1568fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
1569    let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
1570        Ok(v) if !v.trim().is_empty() => v,
1571        _ => return,
1572    };
1573
1574    let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
1575    let policy = timeline_adaptive_policy();
1576    let forced = should_force_export(outcome, &policy);
1577    let should_export = sampled || forced;
1578    if !should_export {
1579        record_sampling_stats(false, sampled, forced, "none", &policy);
1580        return;
1581    }
1582
1583    let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
1584    timeline.sort();
1585
1586    let mode = std::env::var("RANVIER_TIMELINE_MODE")
1587        .unwrap_or_else(|_| "overwrite".to_string())
1588        .to_ascii_lowercase();
1589
1590    if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
1591        tracing::warn!(
1592            "Failed to persist timeline file {} (mode={}): {}",
1593            path,
1594            mode,
1595            err
1596        );
1597        record_sampling_stats(false, sampled, forced, &mode, &policy);
1598    } else {
1599        record_sampling_stats(true, sampled, forced, &mode, &policy);
1600    }
1601}
1602
1603fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
1604    match outcome {
1605        Outcome::Next(_) => "Next".to_string(),
1606        Outcome::Branch(id, _) => format!("Branch:{}", id),
1607        Outcome::Jump(id, _) => format!("Jump:{}", id),
1608        Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
1609        Outcome::Fault(_) => "Fault".to_string(),
1610    }
1611}
1612
1613fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
1614    match outcome {
1615        Outcome::Next(_) => "Next",
1616        Outcome::Branch(_, _) => "Branch",
1617        Outcome::Jump(_, _) => "Jump",
1618        Outcome::Emit(_, _) => "Emit",
1619        Outcome::Fault(_) => "Fault",
1620    }
1621}
1622
1623fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
1624    match outcome {
1625        Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
1626        Outcome::Jump(node_id, _) => Some(node_id.to_string()),
1627        Outcome::Emit(event_type, _) => Some(event_type.clone()),
1628        Outcome::Next(_) | Outcome::Fault(_) => None,
1629    }
1630}
1631
1632fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
1633    match outcome {
1634        Outcome::Fault(_) => CompletionState::Fault,
1635        _ => CompletionState::Success,
1636    }
1637}
1638
1639fn persistence_trace_id(bus: &Bus) -> String {
1640    if let Some(explicit) = bus.read::<PersistenceTraceId>() {
1641        explicit.0.clone()
1642    } else {
1643        format!("{}:{}", bus.id, now_ms())
1644    }
1645}
1646
1647fn persistence_auto_complete(bus: &Bus) -> bool {
1648    bus.read::<PersistenceAutoComplete>()
1649        .map(|v| v.0)
1650        .unwrap_or(true)
1651}
1652
1653fn compensation_auto_trigger(bus: &Bus) -> bool {
1654    bus.read::<CompensationAutoTrigger>()
1655        .map(|v| v.0)
1656        .unwrap_or(true)
1657}
1658
1659fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
1660    bus.read::<CompensationRetryPolicy>()
1661        .copied()
1662        .unwrap_or_default()
1663}
1664
1665/// Unwrap the Outcome enum layer from a persisted event payload.
1666///
1667/// Events are stored with `outcome.to_json_value()` which serializes the full
1668/// Outcome enum, e.g. `{"Next": {"order_id": "1001", ...}}`. The resumption
1669/// handler expects the raw inner value, so we extract it here.
1670fn unwrap_outcome_payload(payload: Option<&serde_json::Value>) -> Option<serde_json::Value> {
1671    payload.map(|p| {
1672        p.get("Next")
1673            .or_else(|| p.get("Branch"))
1674            .or_else(|| p.get("Jump"))
1675            .cloned()
1676            .unwrap_or_else(|| p.clone())
1677    })
1678}
1679
1680async fn load_persistence_version(
1681    handle: &PersistenceHandle,
1682    trace_id: &str,
1683) -> (
1684    u64,
1685    Option<String>,
1686    Option<crate::persistence::Intervention>,
1687    Option<String>,
1688    Option<serde_json::Value>,
1689) {
1690    let store = handle.store();
1691    match store.load(trace_id).await {
1692        Ok(Some(trace)) => {
1693            let (next_step, last_node_id, last_payload) =
1694                if let Some(resume_from_step) = trace.resumed_from_step {
1695                    let anchor_event = trace
1696                        .events
1697                        .iter()
1698                        .rev()
1699                        .find(|event| {
1700                            event.step <= resume_from_step
1701                                && event.outcome_kind == "Next"
1702                                && event.payload.is_some()
1703                        })
1704                        .or_else(|| {
1705                            trace.events.iter().rev().find(|event| {
1706                                event.step <= resume_from_step
1707                                    && event.outcome_kind != "Fault"
1708                                    && event.payload.is_some()
1709                            })
1710                        })
1711                        .or_else(|| {
1712                            trace.events.iter().rev().find(|event| {
1713                                event.step <= resume_from_step && event.payload.is_some()
1714                            })
1715                        })
1716                        .or_else(|| trace.events.last());
1717
1718                    (
1719                        resume_from_step.saturating_add(1),
1720                        anchor_event.and_then(|event| event.node_id.clone()),
1721                        anchor_event.and_then(|event| {
1722                            unwrap_outcome_payload(event.payload.as_ref())
1723                        }),
1724                    )
1725                } else {
1726                    let last_event = trace.events.last();
1727                    (
1728                        last_event
1729                            .map(|event| event.step.saturating_add(1))
1730                            .unwrap_or(0),
1731                        last_event.and_then(|event| event.node_id.clone()),
1732                        last_event.and_then(|event| {
1733                            unwrap_outcome_payload(event.payload.as_ref())
1734                        }),
1735                    )
1736                };
1737
1738            (
1739                next_step,
1740                Some(trace.schematic_version),
1741                trace.interventions.last().cloned(),
1742                last_node_id,
1743                last_payload,
1744            )
1745        }
1746        Ok(None) => (0, None, None, None, None),
1747        Err(err) => {
1748            tracing::warn!(
1749                trace_id = %trace_id,
1750                error = %err,
1751                "Failed to load persistence trace; falling back to step=0"
1752            );
1753            (0, None, None, None, None)
1754        }
1755    }
1756}
1757
1758#[allow(clippy::too_many_arguments)]
1759async fn run_this_step<In, Out, E, Res>(
1760    trans: &(impl Transition<In, Out, Resources = Res, Error = E> + Clone + 'static),
1761    state: In,
1762    res: &Res,
1763    bus: &mut Bus,
1764    node_id: &str,
1765    node_label: &str,
1766    bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
1767    step_idx: u64,
1768) -> Outcome<Out, E>
1769where
1770    In: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
1771    Out: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
1772    E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
1773    Res: ranvier_core::transition::ResourceRequirement,
1774{
1775    let label = trans.label();
1776    let res_type = std::any::type_name::<Res>()
1777        .split("::")
1778        .last()
1779        .unwrap_or("unknown");
1780
1781    // Debug pausing
1782    let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
1783        debug.should_pause(node_id)
1784    } else {
1785        false
1786    };
1787
1788    if should_pause {
1789        let trace_id = persistence_trace_id(bus);
1790        tracing::event!(
1791            target: "ranvier.debugger",
1792            tracing::Level::INFO,
1793            trace_id = %trace_id,
1794            node_id = %node_id,
1795            "Node paused"
1796        );
1797
1798        if let Some(timeline) = bus.read_mut::<Timeline>() {
1799            timeline.push(TimelineEvent::NodePaused {
1800                node_id: node_id.to_string(),
1801                timestamp: now_ms(),
1802            });
1803        }
1804        if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
1805            debug.wait().await;
1806        }
1807    }
1808
1809    let enter_ts = now_ms();
1810    if let Some(timeline) = bus.read_mut::<Timeline>() {
1811        timeline.push(TimelineEvent::NodeEnter {
1812            node_id: node_id.to_string(),
1813            node_label: node_label.to_string(),
1814            timestamp: enter_ts,
1815        });
1816    }
1817
1818    // Check DLQ retry policy and pre-serialize state for potential retries
1819    let dlq_retry_config = bus.read::<DlqPolicy>().and_then(|p| {
1820        if let DlqPolicy::RetryThenDlq {
1821            max_attempts,
1822            backoff_ms,
1823        } = p
1824        {
1825            Some((*max_attempts, *backoff_ms))
1826        } else {
1827            None
1828        }
1829    });
1830    let retry_state_snapshot = if dlq_retry_config.is_some() {
1831        serde_json::to_value(&state).ok()
1832    } else {
1833        None
1834    };
1835
1836    // State capture for Saga - SERIALIZE BEFORE CONSUMPTION
1837    let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
1838        Some(serde_json::to_vec(&state).unwrap_or_default())
1839    } else {
1840        None
1841    };
1842
1843    let node_span = tracing::info_span!(
1844        "Node",
1845        ranvier.node = %label,
1846        ranvier.resource_type = %res_type,
1847        ranvier.outcome_kind = tracing::field::Empty,
1848        ranvier.outcome_target = tracing::field::Empty
1849    );
1850    let started = std::time::Instant::now();
1851    bus.set_access_policy(label.clone(), bus_policy.clone());
1852    let result = trans
1853        .run(state, res, bus)
1854        .instrument(node_span.clone())
1855        .await;
1856    bus.clear_access_policy();
1857
1858    // DLQ Retry loop: if first attempt faulted and RetryThenDlq is configured,
1859    // retry with exponential backoff before giving up.
1860    let result = if let Outcome::Fault(_) = &result {
1861        if let (Some((max_attempts, backoff_ms)), Some(snapshot)) =
1862            (dlq_retry_config, &retry_state_snapshot)
1863        {
1864            let mut final_result = result;
1865            // attempt 1 already done; retry from 2..=max_attempts
1866            for attempt in 2..=max_attempts {
1867                let delay = backoff_ms.saturating_mul(2u64.saturating_pow(attempt - 2));
1868
1869                tracing::info!(
1870                    ranvier.node = %label,
1871                    attempt = attempt,
1872                    max_attempts = max_attempts,
1873                    backoff_ms = delay,
1874                    "Retrying faulted node"
1875                );
1876
1877                if let Some(timeline) = bus.read_mut::<Timeline>() {
1878                    timeline.push(TimelineEvent::NodeRetry {
1879                        node_id: node_id.to_string(),
1880                        attempt,
1881                        max_attempts,
1882                        backoff_ms: delay,
1883                        timestamp: now_ms(),
1884                    });
1885                }
1886
1887                tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
1888
1889                if let Ok(retry_state) = serde_json::from_value::<In>(snapshot.clone()) {
1890                    bus.set_access_policy(label.clone(), bus_policy.clone());
1891                    let retry_result = trans
1892                        .run(retry_state, res, bus)
1893                        .instrument(tracing::info_span!(
1894                            "NodeRetry",
1895                            ranvier.node = %label,
1896                            attempt = attempt
1897                        ))
1898                        .await;
1899                    bus.clear_access_policy();
1900
1901                    match &retry_result {
1902                        Outcome::Fault(_) => {
1903                            final_result = retry_result;
1904                        }
1905                        _ => {
1906                            final_result = retry_result;
1907                            break;
1908                        }
1909                    }
1910                }
1911            }
1912            final_result
1913        } else {
1914            result
1915        }
1916    } else {
1917        result
1918    };
1919
1920    node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
1921    if let Some(target) = outcome_target(&result) {
1922        node_span.record("ranvier.outcome_target", tracing::field::display(&target));
1923    }
1924    let duration_ms = started.elapsed().as_millis() as u64;
1925    let exit_ts = now_ms();
1926
1927    if let Some(timeline) = bus.read_mut::<Timeline>() {
1928        timeline.push(TimelineEvent::NodeExit {
1929            node_id: node_id.to_string(),
1930            outcome_type: outcome_type_name(&result),
1931            duration_ms,
1932            timestamp: exit_ts,
1933        });
1934
1935        if let Outcome::Branch(branch_id, _) = &result {
1936            timeline.push(TimelineEvent::Branchtaken {
1937                branch_id: branch_id.clone(),
1938                timestamp: exit_ts,
1939            });
1940        }
1941    }
1942
1943    // Push to Saga Stack if Next outcome and snapshot taken
1944    if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot)
1945        && let Some(stack) = bus.read_mut::<SagaStack>()
1946    {
1947        stack.push(node_id.to_string(), label.clone(), snapshot);
1948    }
1949
1950    if let Some(handle) = bus.read::<PersistenceHandle>() {
1951        let trace_id = persistence_trace_id(bus);
1952        let circuit = bus
1953            .read::<ranvier_core::schematic::Schematic>()
1954            .map(|s| s.name.clone())
1955            .unwrap_or_default();
1956        let version = bus
1957            .read::<ranvier_core::schematic::Schematic>()
1958            .map(|s| s.schema_version.clone())
1959            .unwrap_or_default();
1960
1961        persist_execution_event(
1962            handle,
1963            &trace_id,
1964            &circuit,
1965            &version,
1966            step_idx,
1967            Some(node_id.to_string()),
1968            outcome_kind_name(&result),
1969            Some(result.to_json_value()),
1970        )
1971        .await;
1972    }
1973
1974    // DLQ reporting — only fires after all retries are exhausted (RetryThenDlq)
1975    // or immediately (SendToDlq). Drop policy skips entirely.
1976    if let Outcome::Fault(f) = &result {
1977        // Read policy and sink, then drop the borrows before mutable timeline access
1978        let dlq_action = {
1979            let policy = bus.read::<DlqPolicy>();
1980            let sink = bus.read::<Arc<dyn DlqSink>>();
1981            match (sink, policy) {
1982                (Some(s), Some(p)) if !matches!(p, DlqPolicy::Drop) => Some(s.clone()),
1983                _ => None,
1984            }
1985        };
1986
1987        if let Some(sink) = dlq_action {
1988            if let Some((max_attempts, _)) = dlq_retry_config
1989                && let Some(timeline) = bus.read_mut::<Timeline>()
1990            {
1991                timeline.push(TimelineEvent::DlqExhausted {
1992                    node_id: node_id.to_string(),
1993                    total_attempts: max_attempts,
1994                    timestamp: now_ms(),
1995                });
1996            }
1997
1998            let trace_id = persistence_trace_id(bus);
1999            let circuit = bus
2000                .read::<ranvier_core::schematic::Schematic>()
2001                .map(|s| s.name.clone())
2002                .unwrap_or_default();
2003            let _ = sink
2004                .store_dead_letter(
2005                    &trace_id,
2006                    &circuit,
2007                    node_id,
2008                    &format!("{:?}", f),
2009                    &serde_json::to_vec(&f).unwrap_or_default(),
2010                )
2011                .await;
2012        }
2013    }
2014
2015    result
2016}
2017
2018#[allow(clippy::too_many_arguments)]
2019async fn run_this_compensated_step<Out, Next, E, Res, Comp>(
2020    trans: &(impl Transition<Out, Next, Resources = Res, Error = E> + Clone + 'static),
2021    comp: &Comp,
2022    state: Out,
2023    res: &Res,
2024    bus: &mut Bus,
2025    node_id: &str,
2026    comp_node_id: &str,
2027    node_label: &str,
2028    bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2029    step_idx: u64,
2030) -> Outcome<Next, E>
2031where
2032    Out: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
2033    Next: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2034    E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2035    Res: ranvier_core::transition::ResourceRequirement,
2036    Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
2037{
2038    let label = trans.label();
2039
2040    // Debug pausing
2041    let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2042        debug.should_pause(node_id)
2043    } else {
2044        false
2045    };
2046
2047    if should_pause {
2048        let trace_id = persistence_trace_id(bus);
2049        tracing::event!(
2050            target: "ranvier.debugger",
2051            tracing::Level::INFO,
2052            trace_id = %trace_id,
2053            node_id = %node_id,
2054            "Node paused (compensated)"
2055        );
2056
2057        if let Some(timeline) = bus.read_mut::<Timeline>() {
2058            timeline.push(TimelineEvent::NodePaused {
2059                node_id: node_id.to_string(),
2060                timestamp: now_ms(),
2061            });
2062        }
2063        if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2064            debug.wait().await;
2065        }
2066    }
2067
2068    let enter_ts = now_ms();
2069    if let Some(timeline) = bus.read_mut::<Timeline>() {
2070        timeline.push(TimelineEvent::NodeEnter {
2071            node_id: node_id.to_string(),
2072            node_label: node_label.to_string(),
2073            timestamp: enter_ts,
2074        });
2075    }
2076
2077    // State capture for Saga - SERIALIZE BEFORE CONSUMPTION
2078    let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2079        Some(serde_json::to_vec(&state).unwrap_or_default())
2080    } else {
2081        None
2082    };
2083
2084    let node_span = tracing::info_span!("Node", ranvier.node = %label);
2085    bus.set_access_policy(label.clone(), bus_policy.clone());
2086    let result = trans
2087        .run(state.clone(), res, bus)
2088        .instrument(node_span)
2089        .await;
2090    bus.clear_access_policy();
2091
2092    let duration_ms = 0; // Simplified
2093    let exit_ts = now_ms();
2094
2095    if let Some(timeline) = bus.read_mut::<Timeline>() {
2096        timeline.push(TimelineEvent::NodeExit {
2097            node_id: node_id.to_string(),
2098            outcome_type: outcome_type_name(&result),
2099            duration_ms,
2100            timestamp: exit_ts,
2101        });
2102    }
2103
2104    // Automated Compensation Trigger
2105    if let Outcome::Fault(ref err) = result {
2106        if compensation_auto_trigger(bus) {
2107            tracing::info!(
2108                ranvier.node = %label,
2109                ranvier.compensation.trigger = "saga",
2110                error = ?err,
2111                "Saga compensation triggered"
2112            );
2113
2114            if let Some(timeline) = bus.read_mut::<Timeline>() {
2115                timeline.push(TimelineEvent::NodeEnter {
2116                    node_id: comp_node_id.to_string(),
2117                    node_label: format!("Compensate: {}", comp.label()),
2118                    timestamp: exit_ts,
2119                });
2120            }
2121
2122            // Run compensation
2123            let _ = comp.run(state, res, bus).await;
2124
2125            if let Some(timeline) = bus.read_mut::<Timeline>() {
2126                timeline.push(TimelineEvent::NodeExit {
2127                    node_id: comp_node_id.to_string(),
2128                    outcome_type: "Compensated".to_string(),
2129                    duration_ms: 0,
2130                    timestamp: now_ms(),
2131                });
2132            }
2133
2134            if let Some(handle) = bus.read::<PersistenceHandle>() {
2135                let trace_id = persistence_trace_id(bus);
2136                let circuit = bus
2137                    .read::<ranvier_core::schematic::Schematic>()
2138                    .map(|s| s.name.clone())
2139                    .unwrap_or_default();
2140                let version = bus
2141                    .read::<ranvier_core::schematic::Schematic>()
2142                    .map(|s| s.schema_version.clone())
2143                    .unwrap_or_default();
2144
2145                persist_execution_event(
2146                    handle,
2147                    &trace_id,
2148                    &circuit,
2149                    &version,
2150                    step_idx + 1, // Compensation node index
2151                    Some(comp_node_id.to_string()),
2152                    "Compensated",
2153                    None,
2154                )
2155                .await;
2156            }
2157        }
2158    } else if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot) {
2159        // Push to Saga Stack if Next outcome and snapshot taken
2160        if let Some(stack) = bus.read_mut::<SagaStack>() {
2161            stack.push(node_id.to_string(), label.clone(), snapshot);
2162        }
2163
2164        if let Some(handle) = bus.read::<PersistenceHandle>() {
2165            let trace_id = persistence_trace_id(bus);
2166            let circuit = bus
2167                .read::<ranvier_core::schematic::Schematic>()
2168                .map(|s| s.name.clone())
2169                .unwrap_or_default();
2170            let version = bus
2171                .read::<ranvier_core::schematic::Schematic>()
2172                .map(|s| s.schema_version.clone())
2173                .unwrap_or_default();
2174
2175            persist_execution_event(
2176                handle,
2177                &trace_id,
2178                &circuit,
2179                &version,
2180                step_idx,
2181                Some(node_id.to_string()),
2182                outcome_kind_name(&result),
2183                Some(result.to_json_value()),
2184            )
2185            .await;
2186        }
2187    }
2188
2189    // DLQ reporting for compensated steps
2190    if let Outcome::Fault(f) = &result
2191        && let (Some(sink), Some(policy)) =
2192            (bus.read::<Arc<dyn DlqSink>>(), bus.read::<DlqPolicy>())
2193    {
2194        let should_dlq = !matches!(policy, DlqPolicy::Drop);
2195        if should_dlq {
2196            let trace_id = persistence_trace_id(bus);
2197            let circuit = bus
2198                .read::<ranvier_core::schematic::Schematic>()
2199                .map(|s| s.name.clone())
2200                .unwrap_or_default();
2201            let _ = sink
2202                .store_dead_letter(
2203                    &trace_id,
2204                    &circuit,
2205                    node_id,
2206                    &format!("{:?}", f),
2207                    &serde_json::to_vec(&f).unwrap_or_default(),
2208                )
2209                .await;
2210        }
2211    }
2212
2213    result
2214}
2215
2216#[allow(clippy::too_many_arguments)]
2217pub async fn persist_execution_event(
2218    handle: &PersistenceHandle,
2219    trace_id: &str,
2220    circuit: &str,
2221    schematic_version: &str,
2222    step: u64,
2223    node_id: Option<String>,
2224    outcome_kind: &str,
2225    payload: Option<serde_json::Value>,
2226) {
2227    let store = handle.store();
2228    let envelope = PersistenceEnvelope {
2229        trace_id: trace_id.to_string(),
2230        circuit: circuit.to_string(),
2231        schematic_version: schematic_version.to_string(),
2232        step,
2233        node_id,
2234        outcome_kind: outcome_kind.to_string(),
2235        timestamp_ms: now_ms(),
2236        payload_hash: None,
2237        payload,
2238    };
2239
2240    if let Err(err) = store.append(envelope).await {
2241        tracing::warn!(
2242            trace_id = %trace_id,
2243            circuit = %circuit,
2244            step,
2245            outcome_kind = %outcome_kind,
2246            error = %err,
2247            "Failed to append persistence envelope"
2248        );
2249    }
2250}
2251
2252async fn persist_completion(
2253    handle: &PersistenceHandle,
2254    trace_id: &str,
2255    completion: CompletionState,
2256) {
2257    let store = handle.store();
2258    if let Err(err) = store.complete(trace_id, completion).await {
2259        tracing::warn!(
2260            trace_id = %trace_id,
2261            error = %err,
2262            "Failed to complete persistence trace"
2263        );
2264    }
2265}
2266
2267fn compensation_idempotency_key(context: &CompensationContext) -> String {
2268    format!(
2269        "{}:{}:{}",
2270        context.trace_id, context.circuit, context.fault_kind
2271    )
2272}
2273
2274async fn run_compensation(
2275    handle: &CompensationHandle,
2276    context: CompensationContext,
2277    retry_policy: CompensationRetryPolicy,
2278    idempotency: Option<CompensationIdempotencyHandle>,
2279) -> bool {
2280    let hook = handle.hook();
2281    let key = compensation_idempotency_key(&context);
2282
2283    if let Some(store_handle) = idempotency.as_ref() {
2284        let store = store_handle.store();
2285        match store.was_compensated(&key).await {
2286            Ok(true) => {
2287                tracing::info!(
2288                    trace_id = %context.trace_id,
2289                    circuit = %context.circuit,
2290                    key = %key,
2291                    "Compensation already recorded; skipping duplicate hook execution"
2292                );
2293                return true;
2294            }
2295            Ok(false) => {}
2296            Err(err) => {
2297                tracing::warn!(
2298                    trace_id = %context.trace_id,
2299                    key = %key,
2300                    error = %err,
2301                    "Failed to check compensation idempotency state"
2302                );
2303            }
2304        }
2305    }
2306
2307    let max_attempts = retry_policy.max_attempts.max(1);
2308    for attempt in 1..=max_attempts {
2309        match hook.compensate(context.clone()).await {
2310            Ok(()) => {
2311                if let Some(store_handle) = idempotency.as_ref() {
2312                    let store = store_handle.store();
2313                    if let Err(err) = store.mark_compensated(&key).await {
2314                        tracing::warn!(
2315                            trace_id = %context.trace_id,
2316                            key = %key,
2317                            error = %err,
2318                            "Failed to mark compensation idempotency state"
2319                        );
2320                    }
2321                }
2322                return true;
2323            }
2324            Err(err) => {
2325                let is_last = attempt == max_attempts;
2326                tracing::warn!(
2327                    trace_id = %context.trace_id,
2328                    circuit = %context.circuit,
2329                    fault_kind = %context.fault_kind,
2330                    fault_step = context.fault_step,
2331                    attempt,
2332                    max_attempts,
2333                    error = %err,
2334                    "Compensation hook attempt failed"
2335                );
2336                if !is_last && retry_policy.backoff_ms > 0 {
2337                    tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
2338                        .await;
2339                }
2340            }
2341        }
2342    }
2343    false
2344}
2345
2346fn ensure_timeline(bus: &mut Bus) -> bool {
2347    if bus.has::<Timeline>() {
2348        false
2349    } else {
2350        bus.insert(Timeline::new());
2351        true
2352    }
2353}
2354
2355fn should_attach_timeline(bus: &Bus) -> bool {
2356    // Respect explicitly provided timeline collector from caller.
2357    if bus.has::<Timeline>() {
2358        return true;
2359    }
2360
2361    // Attach timeline when runtime export path exists.
2362    has_timeline_output_path()
2363}
2364
2365fn has_timeline_output_path() -> bool {
2366    std::env::var("RANVIER_TIMELINE_OUTPUT")
2367        .ok()
2368        .map(|v| !v.trim().is_empty())
2369        .unwrap_or(false)
2370}
2371
2372fn timeline_sample_rate() -> f64 {
2373    std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
2374        .ok()
2375        .and_then(|v| v.parse::<f64>().ok())
2376        .map(|v| v.clamp(0.0, 1.0))
2377        .unwrap_or(1.0)
2378}
2379
2380fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
2381    if rate <= 0.0 {
2382        return false;
2383    }
2384    if rate >= 1.0 {
2385        return true;
2386    }
2387    let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
2388    bucket < rate
2389}
2390
2391fn timeline_adaptive_policy() -> String {
2392    std::env::var("RANVIER_TIMELINE_ADAPTIVE")
2393        .unwrap_or_else(|_| "fault_branch".to_string())
2394        .to_ascii_lowercase()
2395}
2396
2397fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
2398    match policy {
2399        "off" => false,
2400        "fault_only" => matches!(outcome, Outcome::Fault(_)),
2401        "fault_branch_emit" => {
2402            matches!(
2403                outcome,
2404                Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
2405            )
2406        }
2407        _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
2408    }
2409}
2410
2411#[derive(Default, Clone)]
2412struct SamplingStats {
2413    total_decisions: u64,
2414    exported: u64,
2415    skipped: u64,
2416    sampled_exports: u64,
2417    forced_exports: u64,
2418    last_mode: String,
2419    last_policy: String,
2420    last_updated_ms: u64,
2421}
2422
2423static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
2424
2425fn stats_cell() -> &'static Mutex<SamplingStats> {
2426    TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
2427}
2428
2429fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
2430    let snapshot = {
2431        let mut stats = match stats_cell().lock() {
2432            Ok(guard) => guard,
2433            Err(_) => return,
2434        };
2435
2436        stats.total_decisions += 1;
2437        if exported {
2438            stats.exported += 1;
2439        } else {
2440            stats.skipped += 1;
2441        }
2442        if sampled && exported {
2443            stats.sampled_exports += 1;
2444        }
2445        if forced && exported {
2446            stats.forced_exports += 1;
2447        }
2448        stats.last_mode = mode.to_string();
2449        stats.last_policy = policy.to_string();
2450        stats.last_updated_ms = now_ms();
2451        stats.clone()
2452    };
2453
2454    tracing::debug!(
2455        ranvier.timeline.total_decisions = snapshot.total_decisions,
2456        ranvier.timeline.exported = snapshot.exported,
2457        ranvier.timeline.skipped = snapshot.skipped,
2458        ranvier.timeline.sampled_exports = snapshot.sampled_exports,
2459        ranvier.timeline.forced_exports = snapshot.forced_exports,
2460        ranvier.timeline.mode = %snapshot.last_mode,
2461        ranvier.timeline.policy = %snapshot.last_policy,
2462        "Timeline sampling stats updated"
2463    );
2464
2465    if let Some(path) = timeline_stats_output_path() {
2466        let payload = serde_json::json!({
2467            "total_decisions": snapshot.total_decisions,
2468            "exported": snapshot.exported,
2469            "skipped": snapshot.skipped,
2470            "sampled_exports": snapshot.sampled_exports,
2471            "forced_exports": snapshot.forced_exports,
2472            "last_mode": snapshot.last_mode,
2473            "last_policy": snapshot.last_policy,
2474            "last_updated_ms": snapshot.last_updated_ms
2475        });
2476        if let Some(parent) = Path::new(&path).parent() {
2477            let _ = fs::create_dir_all(parent);
2478        }
2479        if let Err(err) = fs::write(&path, payload.to_string()) {
2480            tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
2481        }
2482    }
2483}
2484
2485fn timeline_stats_output_path() -> Option<String> {
2486    std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
2487        .ok()
2488        .filter(|v| !v.trim().is_empty())
2489}
2490
2491fn write_timeline_with_policy(
2492    path: &str,
2493    mode: &str,
2494    mut timeline: Timeline,
2495) -> Result<(), String> {
2496    match mode {
2497        "append" => {
2498            if Path::new(path).exists() {
2499                let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
2500                match serde_json::from_str::<Timeline>(&content) {
2501                    Ok(mut existing) => {
2502                        existing.events.append(&mut timeline.events);
2503                        existing.sort();
2504                        if let Some(max_events) = max_events_limit() {
2505                            truncate_timeline_events(&mut existing, max_events);
2506                        }
2507                        write_timeline_json(path, &existing)
2508                    }
2509                    Err(_) => {
2510                        // Fallback: if existing is invalid, replace with current timeline
2511                        if let Some(max_events) = max_events_limit() {
2512                            truncate_timeline_events(&mut timeline, max_events);
2513                        }
2514                        write_timeline_json(path, &timeline)
2515                    }
2516                }
2517            } else {
2518                if let Some(max_events) = max_events_limit() {
2519                    truncate_timeline_events(&mut timeline, max_events);
2520                }
2521                write_timeline_json(path, &timeline)
2522            }
2523        }
2524        "rotate" => {
2525            let rotated_path = rotated_path(path, now_ms());
2526            write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
2527            if let Some(keep) = rotate_keep_limit() {
2528                cleanup_rotated_files(path, keep)?;
2529            }
2530            Ok(())
2531        }
2532        _ => write_timeline_json(path, &timeline),
2533    }
2534}
2535
2536fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
2537    if let Some(parent) = Path::new(path).parent()
2538        && !parent.as_os_str().is_empty()
2539    {
2540        fs::create_dir_all(parent).map_err(|e| e.to_string())?;
2541    }
2542    let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
2543    fs::write(path, json).map_err(|e| e.to_string())
2544}
2545
2546fn rotated_path(path: &str, suffix: u64) -> PathBuf {
2547    let p = Path::new(path);
2548    let parent = p.parent().unwrap_or_else(|| Path::new(""));
2549    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
2550    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
2551    parent.join(format!("{}_{}.{}", stem, suffix, ext))
2552}
2553
2554fn max_events_limit() -> Option<usize> {
2555    std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
2556        .ok()
2557        .and_then(|v| v.parse::<usize>().ok())
2558        .filter(|v| *v > 0)
2559}
2560
2561fn rotate_keep_limit() -> Option<usize> {
2562    std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
2563        .ok()
2564        .and_then(|v| v.parse::<usize>().ok())
2565        .filter(|v| *v > 0)
2566}
2567
2568fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
2569    let len = timeline.events.len();
2570    if len > max_events {
2571        let keep_from = len - max_events;
2572        timeline.events = timeline.events.split_off(keep_from);
2573    }
2574}
2575
2576fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
2577    let p = Path::new(base_path);
2578    let parent = p.parent().unwrap_or_else(|| Path::new("."));
2579    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
2580    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
2581    let prefix = format!("{}_", stem);
2582    let suffix = format!(".{}", ext);
2583
2584    let mut files = fs::read_dir(parent)
2585        .map_err(|e| e.to_string())?
2586        .filter_map(|entry| entry.ok())
2587        .filter(|entry| {
2588            let name = entry.file_name();
2589            let name = name.to_string_lossy();
2590            name.starts_with(&prefix) && name.ends_with(&suffix)
2591        })
2592        .map(|entry| {
2593            let modified = entry
2594                .metadata()
2595                .ok()
2596                .and_then(|m| m.modified().ok())
2597                .unwrap_or(SystemTime::UNIX_EPOCH);
2598            (entry.path(), modified)
2599        })
2600        .collect::<Vec<_>>();
2601
2602    files.sort_by(|a, b| b.1.cmp(&a.1));
2603    for (path, _) in files.into_iter().skip(keep) {
2604        let _ = fs::remove_file(path);
2605    }
2606    Ok(())
2607}
2608
2609fn bus_capability_schema_from_policy(
2610    policy: Option<ranvier_core::bus::BusAccessPolicy>,
2611) -> Option<BusCapabilitySchema> {
2612    let policy = policy?;
2613
2614    let mut allow = policy
2615        .allow
2616        .unwrap_or_default()
2617        .into_iter()
2618        .map(|entry| entry.type_name.to_string())
2619        .collect::<Vec<_>>();
2620    let mut deny = policy
2621        .deny
2622        .into_iter()
2623        .map(|entry| entry.type_name.to_string())
2624        .collect::<Vec<_>>();
2625    allow.sort();
2626    deny.sort();
2627
2628    if allow.is_empty() && deny.is_empty() {
2629        return None;
2630    }
2631
2632    Some(BusCapabilitySchema { allow, deny })
2633}
2634
2635fn now_ms() -> u64 {
2636    SystemTime::now()
2637        .duration_since(UNIX_EPOCH)
2638        .map(|d| d.as_millis() as u64)
2639        .unwrap_or(0)
2640}
2641
2642#[cfg(test)]
2643mod tests {
2644    use super::{
2645        Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
2646        should_force_export,
2647    };
2648    use crate::persistence::{
2649        CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
2650        CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
2651        InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
2652        PersistenceHandle, PersistenceStore, PersistenceTraceId,
2653    };
2654    use anyhow::Result;
2655    use async_trait::async_trait;
2656    use ranvier_audit::{AuditError, AuditEvent, AuditSink};
2657    use ranvier_core::event::{DlqPolicy, DlqSink};
2658    use ranvier_core::saga::SagaStack;
2659    use ranvier_core::timeline::{Timeline, TimelineEvent};
2660    use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
2661    use serde::{Deserialize, Serialize};
2662    use std::sync::Arc;
2663    use tokio::sync::Mutex;
2664    use uuid::Uuid;
2665
2666    struct MockAuditSink {
2667        events: Arc<Mutex<Vec<AuditEvent>>>,
2668    }
2669
2670    #[async_trait]
2671    impl AuditSink for MockAuditSink {
2672        async fn append(&self, event: &AuditEvent) -> Result<(), AuditError> {
2673            self.events.lock().await.push(event.clone());
2674            Ok(())
2675        }
2676    }
2677
2678    #[tokio::test]
2679    async fn execute_logs_audit_events_for_intervention() {
2680        use ranvier_inspector::StateInspector;
2681
2682        let trace_id = "test-audit-trace";
2683        let store_impl = InMemoryPersistenceStore::new();
2684        let events = Arc::new(Mutex::new(Vec::new()));
2685        let sink = MockAuditSink {
2686            events: events.clone(),
2687        };
2688
2689        let axon = Axon::<i32, i32, TestInfallible>::start("AuditTest")
2690            .then(AddOne)
2691            .with_persistence_store(store_impl.clone())
2692            .with_audit_sink(sink);
2693
2694        let mut bus = Bus::new();
2695        bus.insert(PersistenceHandle::from_arc(Arc::new(store_impl.clone())));
2696        bus.insert(PersistenceTraceId::new(trace_id));
2697        let target_node_id = axon.schematic.nodes[0].id.clone();
2698
2699        // 0. Pre-requisite: Save an initial trace state so intervention has a target to attach to
2700        store_impl
2701            .append(crate::persistence::PersistenceEnvelope {
2702                trace_id: trace_id.to_string(),
2703                circuit: "AuditTest".to_string(),
2704                schematic_version: "v1.0".to_string(),
2705                step: 0,
2706                node_id: None,
2707                outcome_kind: "Next".to_string(),
2708                timestamp_ms: 0,
2709                payload_hash: None,
2710                payload: None,
2711            })
2712            .await
2713            .unwrap();
2714
2715        // 1. Trigger force_resume (should log ForceResume)
2716        axon.force_resume(trace_id, &target_node_id, None)
2717            .await
2718            .unwrap();
2719
2720        // 2. Execute (should log ApplyIntervention)
2721        axon.execute(10, &(), &mut bus).await;
2722
2723        let recorded = events.lock().await;
2724        assert_eq!(
2725            recorded.len(),
2726            2,
2727            "Should have 2 audit events: ForceResume and ApplyIntervention"
2728        );
2729        assert_eq!(recorded[0].action, "ForceResume");
2730        assert_eq!(recorded[0].target, trace_id);
2731        assert_eq!(recorded[1].action, "ApplyIntervention");
2732        assert_eq!(recorded[1].target, trace_id);
2733    }
2734
2735    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2736    pub enum TestInfallible {}
2737
2738    #[test]
2739    fn inspector_enabled_flag_matrix() {
2740        assert!(inspector_enabled_from_value(None));
2741        assert!(inspector_enabled_from_value(Some("1")));
2742        assert!(inspector_enabled_from_value(Some("true")));
2743        assert!(inspector_enabled_from_value(Some("on")));
2744        assert!(!inspector_enabled_from_value(Some("0")));
2745        assert!(!inspector_enabled_from_value(Some("false")));
2746    }
2747
2748    #[test]
2749    fn inspector_dev_mode_matrix() {
2750        assert!(inspector_dev_mode_from_value(None));
2751        assert!(inspector_dev_mode_from_value(Some("dev")));
2752        assert!(inspector_dev_mode_from_value(Some("staging")));
2753        assert!(!inspector_dev_mode_from_value(Some("prod")));
2754        assert!(!inspector_dev_mode_from_value(Some("production")));
2755    }
2756
2757    #[test]
2758    fn adaptive_policy_force_export_matrix() {
2759        let next = Outcome::<(), &'static str>::Next(());
2760        let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
2761        let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
2762        let fault = Outcome::<(), &'static str>::Fault("boom");
2763
2764        assert!(!should_force_export(&next, "off"));
2765        assert!(!should_force_export(&fault, "off"));
2766
2767        assert!(!should_force_export(&branch, "fault_only"));
2768        assert!(should_force_export(&fault, "fault_only"));
2769
2770        assert!(should_force_export(&branch, "fault_branch"));
2771        assert!(!should_force_export(&emit, "fault_branch"));
2772        assert!(should_force_export(&fault, "fault_branch"));
2773
2774        assert!(should_force_export(&branch, "fault_branch_emit"));
2775        assert!(should_force_export(&emit, "fault_branch_emit"));
2776        assert!(should_force_export(&fault, "fault_branch_emit"));
2777    }
2778
2779    #[test]
2780    fn sampling_and_adaptive_combination_decisions() {
2781        let bus_id = Uuid::nil();
2782        let next = Outcome::<(), &'static str>::Next(());
2783        let fault = Outcome::<(), &'static str>::Fault("boom");
2784
2785        let sampled_never = sampled_by_bus_id(bus_id, 0.0);
2786        assert!(!sampled_never);
2787        assert!(!(sampled_never || should_force_export(&next, "off")));
2788        assert!(sampled_never || should_force_export(&fault, "fault_only"));
2789
2790        let sampled_always = sampled_by_bus_id(bus_id, 1.0);
2791        assert!(sampled_always);
2792        assert!(sampled_always || should_force_export(&next, "off"));
2793        assert!(sampled_always || should_force_export(&fault, "off"));
2794    }
2795
2796    #[derive(Clone)]
2797    struct AddOne;
2798
2799    #[async_trait]
2800    impl Transition<i32, i32> for AddOne {
2801        type Error = TestInfallible;
2802        type Resources = ();
2803
2804        async fn run(
2805            &self,
2806            state: i32,
2807            _resources: &Self::Resources,
2808            _bus: &mut Bus,
2809        ) -> Outcome<i32, Self::Error> {
2810            Outcome::Next(state + 1)
2811        }
2812    }
2813
2814    #[derive(Clone)]
2815    struct AlwaysFault;
2816
2817    #[async_trait]
2818    impl Transition<i32, i32> for AlwaysFault {
2819        type Error = String;
2820        type Resources = ();
2821
2822        async fn run(
2823            &self,
2824            _state: i32,
2825            _resources: &Self::Resources,
2826            _bus: &mut Bus,
2827        ) -> Outcome<i32, Self::Error> {
2828            Outcome::Fault("boom".to_string())
2829        }
2830    }
2831
2832    #[derive(Clone)]
2833    struct CapabilityGuarded;
2834
2835    #[async_trait]
2836    impl Transition<(), ()> for CapabilityGuarded {
2837        type Error = String;
2838        type Resources = ();
2839
2840        fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
2841            Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
2842        }
2843
2844        async fn run(
2845            &self,
2846            _state: (),
2847            _resources: &Self::Resources,
2848            bus: &mut Bus,
2849        ) -> Outcome<(), Self::Error> {
2850            match bus.get::<String>() {
2851                Ok(_) => Outcome::Next(()),
2852                Err(err) => Outcome::Fault(err.to_string()),
2853            }
2854        }
2855    }
2856
2857    #[derive(Clone)]
2858    struct RecordingCompensationHook {
2859        calls: Arc<Mutex<Vec<CompensationContext>>>,
2860        should_fail: bool,
2861    }
2862
2863    #[async_trait]
2864    impl CompensationHook for RecordingCompensationHook {
2865        async fn compensate(&self, context: CompensationContext) -> Result<()> {
2866            self.calls.lock().await.push(context);
2867            if self.should_fail {
2868                return Err(anyhow::anyhow!("compensation failed"));
2869            }
2870            Ok(())
2871        }
2872    }
2873
2874    #[derive(Clone)]
2875    struct FlakyCompensationHook {
2876        calls: Arc<Mutex<u32>>,
2877        failures_remaining: Arc<Mutex<u32>>,
2878    }
2879
2880    #[async_trait]
2881    impl CompensationHook for FlakyCompensationHook {
2882        async fn compensate(&self, _context: CompensationContext) -> Result<()> {
2883            {
2884                let mut calls = self.calls.lock().await;
2885                *calls += 1;
2886            }
2887            let mut failures_remaining = self.failures_remaining.lock().await;
2888            if *failures_remaining > 0 {
2889                *failures_remaining -= 1;
2890                return Err(anyhow::anyhow!("transient compensation failure"));
2891            }
2892            Ok(())
2893        }
2894    }
2895
2896    #[derive(Clone)]
2897    struct FailingCompensationIdempotencyStore {
2898        read_calls: Arc<Mutex<u32>>,
2899        write_calls: Arc<Mutex<u32>>,
2900    }
2901
2902    #[async_trait]
2903    impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
2904        async fn was_compensated(&self, _key: &str) -> Result<bool> {
2905            let mut read_calls = self.read_calls.lock().await;
2906            *read_calls += 1;
2907            Err(anyhow::anyhow!("forced idempotency read failure"))
2908        }
2909
2910        async fn mark_compensated(&self, _key: &str) -> Result<()> {
2911            let mut write_calls = self.write_calls.lock().await;
2912            *write_calls += 1;
2913            Err(anyhow::anyhow!("forced idempotency write failure"))
2914        }
2915    }
2916
2917    #[tokio::test]
2918    async fn execute_persists_success_trace_when_handle_exists() {
2919        let store_impl = Arc::new(InMemoryPersistenceStore::new());
2920        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
2921
2922        let mut bus = Bus::new();
2923        bus.insert(PersistenceHandle::from_arc(store_dyn));
2924        bus.insert(PersistenceTraceId::new("trace-success"));
2925
2926        let axon = Axon::<i32, i32, TestInfallible>::start("PersistSuccess").then(AddOne);
2927        let outcome = axon.execute(41, &(), &mut bus).await;
2928        assert!(matches!(outcome, Outcome::Next(42)));
2929
2930        let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
2931        assert_eq!(persisted.events.len(), 3); // Enter + step-level Next + final Next
2932        assert_eq!(persisted.events[0].outcome_kind, "Enter");
2933        assert_eq!(persisted.events[1].outcome_kind, "Next"); // step-level
2934        assert_eq!(persisted.events[2].outcome_kind, "Next"); // final
2935        assert_eq!(persisted.completion, Some(CompletionState::Success));
2936    }
2937
2938    #[tokio::test]
2939    async fn execute_persists_fault_completion_state() {
2940        let store_impl = Arc::new(InMemoryPersistenceStore::new());
2941        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
2942
2943        let mut bus = Bus::new();
2944        bus.insert(PersistenceHandle::from_arc(store_dyn));
2945        bus.insert(PersistenceTraceId::new("trace-fault"));
2946
2947        let axon = Axon::<i32, i32, String>::start("PersistFault").then(AlwaysFault);
2948        let outcome = axon.execute(41, &(), &mut bus).await;
2949        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
2950
2951        let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
2952        assert_eq!(persisted.events.len(), 3); // Enter + step-level Fault + final Fault
2953        assert_eq!(persisted.events[1].outcome_kind, "Fault"); // step-level
2954        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
2955        assert_eq!(persisted.completion, Some(CompletionState::Fault));
2956    }
2957
2958    #[tokio::test]
2959    async fn execute_respects_persistence_auto_complete_off() {
2960        let store_impl = Arc::new(InMemoryPersistenceStore::new());
2961        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
2962
2963        let mut bus = Bus::new();
2964        bus.insert(PersistenceHandle::from_arc(store_dyn));
2965        bus.insert(PersistenceTraceId::new("trace-no-complete"));
2966        bus.insert(PersistenceAutoComplete(false));
2967
2968        let axon = Axon::<i32, i32, TestInfallible>::start("PersistNoComplete").then(AddOne);
2969        let outcome = axon.execute(1, &(), &mut bus).await;
2970        assert!(matches!(outcome, Outcome::Next(2)));
2971
2972        let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
2973        assert_eq!(persisted.events.len(), 3); // Enter + step-level Next + final Next
2974        assert_eq!(persisted.completion, None);
2975    }
2976
2977    #[tokio::test]
2978    async fn fault_triggers_compensation_and_marks_compensated() {
2979        let store_impl = Arc::new(InMemoryPersistenceStore::new());
2980        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
2981        let calls = Arc::new(Mutex::new(Vec::new()));
2982        let compensation = RecordingCompensationHook {
2983            calls: calls.clone(),
2984            should_fail: false,
2985        };
2986
2987        let mut bus = Bus::new();
2988        bus.insert(PersistenceHandle::from_arc(store_dyn));
2989        bus.insert(PersistenceTraceId::new("trace-compensated"));
2990        bus.insert(CompensationHandle::from_hook(compensation));
2991
2992        let axon = Axon::<i32, i32, String>::start("CompensatedFault").then(AlwaysFault);
2993        let outcome = axon.execute(7, &(), &mut bus).await;
2994        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
2995
2996        let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
2997        assert_eq!(persisted.events.len(), 4); // Enter + step-level Fault + final Fault + Compensated
2998        assert_eq!(persisted.events[0].outcome_kind, "Enter");
2999        assert_eq!(persisted.events[1].outcome_kind, "Fault"); // step-level
3000        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3001        assert_eq!(persisted.events[3].outcome_kind, "Compensated");
3002        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3003
3004        let recorded = calls.lock().await;
3005        assert_eq!(recorded.len(), 1);
3006        assert_eq!(recorded[0].trace_id, "trace-compensated");
3007        assert_eq!(recorded[0].fault_kind, "Fault");
3008    }
3009
3010    #[tokio::test]
3011    async fn failed_compensation_keeps_fault_completion() {
3012        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3013        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3014        let calls = Arc::new(Mutex::new(Vec::new()));
3015        let compensation = RecordingCompensationHook {
3016            calls: calls.clone(),
3017            should_fail: true,
3018        };
3019
3020        let mut bus = Bus::new();
3021        bus.insert(PersistenceHandle::from_arc(store_dyn));
3022        bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
3023        bus.insert(CompensationHandle::from_hook(compensation));
3024
3025        let axon = Axon::<i32, i32, String>::start("CompensationFails").then(AlwaysFault);
3026        let outcome = axon.execute(7, &(), &mut bus).await;
3027        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3028
3029        let persisted = store_impl
3030            .load("trace-compensation-failed")
3031            .await
3032            .unwrap()
3033            .unwrap();
3034        assert_eq!(persisted.events.len(), 3); // Enter + step-level Fault + final Fault
3035        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3036        assert_eq!(persisted.completion, Some(CompletionState::Fault));
3037
3038        let recorded = calls.lock().await;
3039        assert_eq!(recorded.len(), 1);
3040    }
3041
3042    #[tokio::test]
3043    async fn compensation_retry_policy_succeeds_after_retries() {
3044        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3045        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3046        let calls = Arc::new(Mutex::new(0u32));
3047        let failures_remaining = Arc::new(Mutex::new(2u32));
3048        let compensation = FlakyCompensationHook {
3049            calls: calls.clone(),
3050            failures_remaining,
3051        };
3052
3053        let mut bus = Bus::new();
3054        bus.insert(PersistenceHandle::from_arc(store_dyn));
3055        bus.insert(PersistenceTraceId::new("trace-retry-success"));
3056        bus.insert(CompensationHandle::from_hook(compensation));
3057        bus.insert(CompensationRetryPolicy {
3058            max_attempts: 3,
3059            backoff_ms: 0,
3060        });
3061
3062        let axon = Axon::<i32, i32, String>::start("CompensationRetry").then(AlwaysFault);
3063        let outcome = axon.execute(7, &(), &mut bus).await;
3064        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3065
3066        let persisted = store_impl
3067            .load("trace-retry-success")
3068            .await
3069            .unwrap()
3070            .unwrap();
3071        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3072        assert_eq!(
3073            persisted.events.last().map(|e| e.outcome_kind.as_str()),
3074            Some("Compensated")
3075        );
3076
3077        let attempt_count = calls.lock().await;
3078        assert_eq!(*attempt_count, 3);
3079    }
3080
3081    #[tokio::test]
3082    async fn compensation_idempotency_skips_duplicate_hook_execution() {
3083        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3084        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3085        let calls = Arc::new(Mutex::new(Vec::new()));
3086        let compensation = RecordingCompensationHook {
3087            calls: calls.clone(),
3088            should_fail: false,
3089        };
3090        let idempotency = InMemoryCompensationIdempotencyStore::new();
3091
3092        let mut bus = Bus::new();
3093        bus.insert(PersistenceHandle::from_arc(store_dyn));
3094        bus.insert(PersistenceTraceId::new("trace-idempotent"));
3095        bus.insert(PersistenceAutoComplete(false));
3096        bus.insert(CompensationHandle::from_hook(compensation));
3097        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3098
3099        let axon = Axon::<i32, i32, String>::start("CompensationIdempotency").then(AlwaysFault);
3100
3101        let outcome1 = axon.execute(7, &(), &mut bus).await;
3102        let outcome2 = axon.execute(8, &(), &mut bus).await;
3103        assert!(matches!(outcome1, Outcome::Fault(msg) if msg == "boom"));
3104        assert!(matches!(outcome2, Outcome::Fault(msg) if msg == "boom"));
3105
3106        let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
3107        assert_eq!(persisted.completion, None);
3108        // Verify that "Compensated" events are present for both executions
3109        let compensated_count = persisted
3110            .events
3111            .iter()
3112            .filter(|e| e.outcome_kind == "Compensated")
3113            .count();
3114        assert_eq!(
3115            compensated_count, 2,
3116            "Should have 2 Compensated events (one per execution)"
3117        );
3118
3119        let recorded = calls.lock().await;
3120        assert_eq!(recorded.len(), 1);
3121    }
3122
3123    #[tokio::test]
3124    async fn compensation_idempotency_store_failure_does_not_block_compensation() {
3125        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3126        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3127        let calls = Arc::new(Mutex::new(Vec::new()));
3128        let read_calls = Arc::new(Mutex::new(0u32));
3129        let write_calls = Arc::new(Mutex::new(0u32));
3130        let compensation = RecordingCompensationHook {
3131            calls: calls.clone(),
3132            should_fail: false,
3133        };
3134        let idempotency = FailingCompensationIdempotencyStore {
3135            read_calls: read_calls.clone(),
3136            write_calls: write_calls.clone(),
3137        };
3138
3139        let mut bus = Bus::new();
3140        bus.insert(PersistenceHandle::from_arc(store_dyn));
3141        bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
3142        bus.insert(CompensationHandle::from_hook(compensation));
3143        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3144
3145        let axon = Axon::<i32, i32, String>::start("IdempotencyStoreFailure").then(AlwaysFault);
3146        let outcome = axon.execute(9, &(), &mut bus).await;
3147        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3148
3149        let persisted = store_impl
3150            .load("trace-idempotency-store-failure")
3151            .await
3152            .unwrap()
3153            .unwrap();
3154        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3155        assert_eq!(
3156            persisted.events.last().map(|e| e.outcome_kind.as_str()),
3157            Some("Compensated")
3158        );
3159
3160        let recorded = calls.lock().await;
3161        assert_eq!(recorded.len(), 1);
3162        assert_eq!(*read_calls.lock().await, 1);
3163        assert_eq!(*write_calls.lock().await, 1);
3164    }
3165
3166    #[tokio::test]
3167    async fn transition_bus_policy_blocks_unauthorized_resource_access() {
3168        let mut bus = Bus::new();
3169        bus.insert(1_i32);
3170        bus.insert("secret".to_string());
3171
3172        let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
3173        let outcome = axon.execute((), &(), &mut bus).await;
3174
3175        match outcome {
3176            Outcome::Fault(msg) => {
3177                assert!(msg.contains("Bus access denied"), "{msg}");
3178                assert!(msg.contains("CapabilityGuarded"), "{msg}");
3179                assert!(msg.contains("alloc::string::String"), "{msg}");
3180            }
3181            other => panic!("expected fault, got {other:?}"),
3182        }
3183    }
3184
3185    #[tokio::test]
3186    async fn execute_fails_on_version_mismatch_without_migration() {
3187        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3188        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3189
3190        let trace_id = "v-mismatch";
3191        // Create an existing trace with an older version
3192        let old_envelope = crate::persistence::PersistenceEnvelope {
3193            trace_id: trace_id.to_string(),
3194            circuit: "TestCircuit".to_string(),
3195            schematic_version: "0.9".to_string(),
3196            step: 0,
3197            node_id: None,
3198            outcome_kind: "Enter".to_string(),
3199            timestamp_ms: 0,
3200            payload_hash: None,
3201            payload: None,
3202        };
3203        store_impl.append(old_envelope).await.unwrap();
3204
3205        let mut bus = Bus::new();
3206        bus.insert(PersistenceHandle::from_arc(store_dyn));
3207        bus.insert(PersistenceTraceId::new(trace_id));
3208
3209        // Current axon is version 1.0
3210        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3211        let outcome = axon.execute(10, &(), &mut bus).await;
3212
3213        if let Outcome::Emit(kind, _) = outcome {
3214            assert_eq!(kind, "execution.resumption.version_mismatch_failed");
3215        } else {
3216            panic!("Expected version mismatch emission, got {:?}", outcome);
3217        }
3218    }
3219
3220    #[tokio::test]
3221    async fn execute_resumes_from_start_on_migration_strategy() {
3222        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3223        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3224
3225        let trace_id = "v-migration";
3226        // Create an existing trace with an older version at step 5
3227        let old_envelope = crate::persistence::PersistenceEnvelope {
3228            trace_id: trace_id.to_string(),
3229            circuit: "TestCircuit".to_string(),
3230            schematic_version: "0.9".to_string(),
3231            step: 5,
3232            node_id: None,
3233            outcome_kind: "Next".to_string(),
3234            timestamp_ms: 0,
3235            payload_hash: None,
3236            payload: None,
3237        };
3238        store_impl.append(old_envelope).await.unwrap();
3239
3240        let mut registry = ranvier_core::schematic::MigrationRegistry::new("TestCircuit");
3241        registry.register(ranvier_core::schematic::SnapshotMigration {
3242            name: Some("v0.9 to v1.0".to_string()),
3243            from_version: "0.9".to_string(),
3244            to_version: "1.0".to_string(),
3245            default_strategy: ranvier_core::schematic::MigrationStrategy::ResumeFromStart,
3246            node_mapping: std::collections::HashMap::new(),
3247            payload_mapper: None,
3248        });
3249
3250        let mut bus = Bus::new();
3251        bus.insert(PersistenceHandle::from_arc(store_dyn));
3252        bus.insert(PersistenceTraceId::new(trace_id));
3253        bus.insert(registry);
3254
3255        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3256        let outcome = axon.execute(10, &(), &mut bus).await;
3257
3258        // Should have resumed from start (step 0), resulting in 11
3259        assert!(matches!(outcome, Outcome::Next(11)));
3260
3261        // Verify new event has current version
3262        let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3263        assert_eq!(persisted.schematic_version, "1.0");
3264    }
3265
3266    #[tokio::test]
3267    async fn execute_applies_manual_intervention_jump_and_payload() {
3268        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3269        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3270
3271        let trace_id = "intervention-test";
3272        // 1. Run a normal trace part-way
3273        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit")
3274            .then(AddOne)
3275            .then(AddOne);
3276
3277        let mut bus = Bus::new();
3278        bus.insert(PersistenceHandle::from_arc(store_dyn));
3279        bus.insert(PersistenceTraceId::new(trace_id));
3280
3281        // Save an intervention: Jump to the second 'AddOne' node (which has the label 'AddOne')
3282        // with a payload override of 100.
3283        // The first node is 'AddOne', the second is ALSO 'AddOne'.
3284        // Schematic position: 0=Ingress, 1=AddOne, 2=AddOne
3285        let _target_node_label = "AddOne";
3286        // To be precise, let's find the ID of the second node.
3287        let target_node_id = axon.schematic.nodes[2].id.clone();
3288
3289        // Pre-seed an initial trace entry so save_intervention doesn't fail
3290        store_impl
3291            .append(crate::persistence::PersistenceEnvelope {
3292                trace_id: trace_id.to_string(),
3293                circuit: "TestCircuit".to_string(),
3294                schematic_version: "1.0".to_string(),
3295                step: 0,
3296                node_id: None,
3297                outcome_kind: "Enter".to_string(),
3298                timestamp_ms: 0,
3299                payload_hash: None,
3300                payload: None,
3301            })
3302            .await
3303            .unwrap();
3304
3305        store_impl
3306            .save_intervention(
3307                trace_id,
3308                crate::persistence::Intervention {
3309                    target_node: target_node_id.clone(),
3310                    payload_override: Some(serde_json::json!(100)),
3311                    timestamp_ms: 0,
3312                },
3313            )
3314            .await
3315            .unwrap();
3316
3317        // 2. Execute. It should skip the first AddOne and use 100 for the second AddOne.
3318        // Result should be 100 + 1 = 101.
3319        let outcome = axon.execute(10, &(), &mut bus).await;
3320
3321        match outcome {
3322            Outcome::Next(val) => assert_eq!(val, 101, "Should have used payload 100 and added 1"),
3323            other => panic!("Expected Outcome::Next(101), got {:?}", other),
3324        }
3325
3326        // Verify the jump was logged in trace
3327        let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3328        // The last event should be from the jump target's execution.
3329        assert_eq!(persisted.interventions.len(), 1);
3330        assert_eq!(persisted.interventions[0].target_node, target_node_id);
3331    }
3332
3333    // ── DLQ Retry Tests ──────────────────────────────────────────────
3334
3335    /// A transition that fails a configurable number of times before succeeding.
3336    #[derive(Clone)]
3337    struct FailNThenSucceed {
3338        remaining: Arc<tokio::sync::Mutex<u32>>,
3339    }
3340
3341    #[async_trait]
3342    impl Transition<i32, i32> for FailNThenSucceed {
3343        type Error = String;
3344        type Resources = ();
3345
3346        async fn run(
3347            &self,
3348            state: i32,
3349            _resources: &Self::Resources,
3350            _bus: &mut Bus,
3351        ) -> Outcome<i32, Self::Error> {
3352            let mut rem = self.remaining.lock().await;
3353            if *rem > 0 {
3354                *rem -= 1;
3355                Outcome::Fault("transient failure".to_string())
3356            } else {
3357                Outcome::Next(state + 1)
3358            }
3359        }
3360    }
3361
3362    /// A mock DLQ sink that records all dead letters.
3363    #[derive(Clone)]
3364    struct MockDlqSink {
3365        letters: Arc<tokio::sync::Mutex<Vec<String>>>,
3366    }
3367
3368    #[async_trait]
3369    impl DlqSink for MockDlqSink {
3370        async fn store_dead_letter(
3371            &self,
3372            workflow_id: &str,
3373            _circuit_label: &str,
3374            node_id: &str,
3375            error_msg: &str,
3376            _payload: &[u8],
3377        ) -> Result<(), String> {
3378            let entry = format!("{}:{}:{}", workflow_id, node_id, error_msg);
3379            self.letters.lock().await.push(entry);
3380            Ok(())
3381        }
3382    }
3383
3384    #[tokio::test]
3385    async fn retry_then_dlq_retries_and_succeeds_before_exhaustion() {
3386        // Fail 2 times, succeed on 3rd attempt. Policy allows 5 attempts.
3387        let remaining = Arc::new(tokio::sync::Mutex::new(2u32));
3388        let trans = FailNThenSucceed { remaining };
3389
3390        let dlq_sink = MockDlqSink {
3391            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3392        };
3393
3394        let mut bus = Bus::new();
3395        bus.insert(Timeline::new());
3396
3397        let axon = Axon::<i32, i32, String>::start("RetrySucceed")
3398            .then(trans)
3399            .with_dlq_policy(DlqPolicy::RetryThenDlq {
3400                max_attempts: 5,
3401                backoff_ms: 1,
3402            })
3403            .with_dlq_sink(dlq_sink.clone());
3404        let outcome = axon.execute(10, &(), &mut bus).await;
3405
3406        // Should succeed (10 + 1 = 11)
3407        assert!(
3408            matches!(outcome, Outcome::Next(11)),
3409            "Expected Next(11), got {:?}",
3410            outcome
3411        );
3412
3413        // No dead letters since it recovered
3414        let letters = dlq_sink.letters.lock().await;
3415        assert!(
3416            letters.is_empty(),
3417            "Should have 0 dead letters, got {}",
3418            letters.len()
3419        );
3420
3421        // Timeline should contain NodeRetry events
3422        let timeline = bus.read::<Timeline>().unwrap();
3423        let retry_count = timeline
3424            .events
3425            .iter()
3426            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3427            .count();
3428        assert_eq!(retry_count, 2, "Should have 2 retry events");
3429    }
3430
3431    #[tokio::test]
3432    async fn retry_then_dlq_exhausts_retries_and_sends_to_dlq() {
3433        // Always fails. Policy allows 3 attempts (1 initial + 2 retries).
3434        let mut bus = Bus::new();
3435        bus.insert(Timeline::new());
3436
3437        let dlq_sink = MockDlqSink {
3438            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3439        };
3440
3441        let axon = Axon::<i32, i32, String>::start("RetryExhaust")
3442            .then(AlwaysFault)
3443            .with_dlq_policy(DlqPolicy::RetryThenDlq {
3444                max_attempts: 3,
3445                backoff_ms: 1,
3446            })
3447            .with_dlq_sink(dlq_sink.clone());
3448        let outcome = axon.execute(42, &(), &mut bus).await;
3449
3450        assert!(
3451            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
3452            "Expected Fault(boom), got {:?}",
3453            outcome
3454        );
3455
3456        // Should have exactly 1 dead letter
3457        let letters = dlq_sink.letters.lock().await;
3458        assert_eq!(letters.len(), 1, "Should have 1 dead letter");
3459
3460        // Timeline should have 2 retry events and 1 DlqExhausted event
3461        let timeline = bus.read::<Timeline>().unwrap();
3462        let retry_count = timeline
3463            .events
3464            .iter()
3465            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3466            .count();
3467        let dlq_count = timeline
3468            .events
3469            .iter()
3470            .filter(|e| matches!(e, TimelineEvent::DlqExhausted { .. }))
3471            .count();
3472        assert_eq!(
3473            retry_count, 2,
3474            "Should have 2 retry events (attempts 2 and 3)"
3475        );
3476        assert_eq!(dlq_count, 1, "Should have 1 DlqExhausted event");
3477    }
3478
3479    #[tokio::test]
3480    async fn send_to_dlq_policy_sends_immediately_without_retry() {
3481        let mut bus = Bus::new();
3482        bus.insert(Timeline::new());
3483
3484        let dlq_sink = MockDlqSink {
3485            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3486        };
3487
3488        let axon = Axon::<i32, i32, String>::start("SendDlq")
3489            .then(AlwaysFault)
3490            .with_dlq_policy(DlqPolicy::SendToDlq)
3491            .with_dlq_sink(dlq_sink.clone());
3492        let outcome = axon.execute(1, &(), &mut bus).await;
3493
3494        assert!(matches!(outcome, Outcome::Fault(_)));
3495
3496        // Should have exactly 1 dead letter (immediate, no retries)
3497        let letters = dlq_sink.letters.lock().await;
3498        assert_eq!(letters.len(), 1);
3499
3500        // No retry or DlqExhausted events
3501        let timeline = bus.read::<Timeline>().unwrap();
3502        let retry_count = timeline
3503            .events
3504            .iter()
3505            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3506            .count();
3507        assert_eq!(retry_count, 0);
3508    }
3509
3510    #[tokio::test]
3511    async fn drop_policy_does_not_send_to_dlq() {
3512        let mut bus = Bus::new();
3513
3514        let dlq_sink = MockDlqSink {
3515            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3516        };
3517
3518        let axon = Axon::<i32, i32, String>::start("DropDlq")
3519            .then(AlwaysFault)
3520            .with_dlq_policy(DlqPolicy::Drop)
3521            .with_dlq_sink(dlq_sink.clone());
3522        let outcome = axon.execute(1, &(), &mut bus).await;
3523
3524        assert!(matches!(outcome, Outcome::Fault(_)));
3525
3526        // No dead letters
3527        let letters = dlq_sink.letters.lock().await;
3528        assert!(letters.is_empty());
3529    }
3530
3531    #[tokio::test]
3532    async fn dynamic_policy_hot_reload_changes_dlq_behavior() {
3533        use ranvier_core::policy::DynamicPolicy;
3534
3535        // Start with Drop policy (no DLQ)
3536        let (tx, dynamic) = DynamicPolicy::new(DlqPolicy::Drop);
3537        let dlq_sink = MockDlqSink {
3538            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3539        };
3540
3541        let axon = Axon::<i32, i32, String>::start("DynamicDlq")
3542            .then(AlwaysFault)
3543            .with_dynamic_dlq_policy(dynamic)
3544            .with_dlq_sink(dlq_sink.clone());
3545
3546        // First execution: Drop policy → no dead letters
3547        let mut bus = Bus::new();
3548        let outcome = axon.execute(1, &(), &mut bus).await;
3549        assert!(matches!(outcome, Outcome::Fault(_)));
3550        assert!(
3551            dlq_sink.letters.lock().await.is_empty(),
3552            "Drop policy should produce no DLQ entries"
3553        );
3554
3555        // Hot-reload: switch to SendToDlq
3556        tx.send(DlqPolicy::SendToDlq).unwrap();
3557
3558        // Second execution: SendToDlq policy → dead letter captured
3559        let mut bus2 = Bus::new();
3560        let outcome2 = axon.execute(2, &(), &mut bus2).await;
3561        assert!(matches!(outcome2, Outcome::Fault(_)));
3562        assert_eq!(
3563            dlq_sink.letters.lock().await.len(),
3564            1,
3565            "SendToDlq policy should produce 1 DLQ entry"
3566        );
3567    }
3568
3569    #[tokio::test]
3570    async fn dynamic_saga_policy_hot_reload() {
3571        use ranvier_core::policy::DynamicPolicy;
3572        use ranvier_core::saga::SagaPolicy;
3573
3574        // Start with Disabled saga
3575        let (tx, dynamic) = DynamicPolicy::new(SagaPolicy::Disabled);
3576
3577        let axon = Axon::<i32, i32, TestInfallible>::start("DynamicSaga")
3578            .then(AddOne)
3579            .with_dynamic_saga_policy(dynamic);
3580
3581        // First execution: Disabled → no SagaStack in bus
3582        let mut bus = Bus::new();
3583        let _outcome = axon.execute(1, &(), &mut bus).await;
3584        assert!(
3585            bus.read::<SagaStack>().is_none() || bus.read::<SagaStack>().unwrap().is_empty(),
3586            "SagaStack should be absent or empty when disabled"
3587        );
3588
3589        // Hot-reload: enable saga
3590        tx.send(SagaPolicy::Enabled).unwrap();
3591
3592        // Second execution: Enabled → SagaStack populated
3593        let mut bus2 = Bus::new();
3594        let _outcome2 = axon.execute(10, &(), &mut bus2).await;
3595        assert!(
3596            bus2.read::<SagaStack>().is_some(),
3597            "SagaStack should exist when saga is enabled"
3598        );
3599    }
3600
3601    // ── IAM Boundary Tests ──────────────────────────────────────
3602
3603    mod iam_tests {
3604        use super::*;
3605        use ranvier_core::iam::{IamError, IamIdentity, IamPolicy, IamToken, IamVerifier};
3606
3607        /// Mock IamVerifier that returns a fixed identity.
3608        #[derive(Clone)]
3609        struct MockVerifier {
3610            identity: IamIdentity,
3611            should_fail: bool,
3612        }
3613
3614        #[async_trait]
3615        impl IamVerifier for MockVerifier {
3616            async fn verify(&self, _token: &str) -> Result<IamIdentity, IamError> {
3617                if self.should_fail {
3618                    Err(IamError::InvalidToken("mock verification failure".into()))
3619                } else {
3620                    Ok(self.identity.clone())
3621                }
3622            }
3623        }
3624
3625        #[tokio::test]
3626        async fn iam_require_identity_passes_with_valid_token() {
3627            let verifier = MockVerifier {
3628                identity: IamIdentity::new("alice").with_role("user"),
3629                should_fail: false,
3630            };
3631
3632            let axon = Axon::<i32, i32, TestInfallible>::new("IamTest")
3633                .with_iam(IamPolicy::RequireIdentity, verifier)
3634                .then(AddOne);
3635
3636            let mut bus = Bus::new();
3637            bus.insert(IamToken("valid-token".to_string()));
3638            let outcome = axon.execute(10, &(), &mut bus).await;
3639
3640            assert!(matches!(outcome, Outcome::Next(11)));
3641            // Verify IamIdentity was injected into Bus
3642            let identity = bus
3643                .read::<IamIdentity>()
3644                .expect("IamIdentity should be in Bus");
3645            assert_eq!(identity.subject, "alice");
3646        }
3647
3648        #[tokio::test]
3649        async fn iam_require_identity_rejects_missing_token() {
3650            let verifier = MockVerifier {
3651                identity: IamIdentity::new("ignored"),
3652                should_fail: false,
3653            };
3654
3655            let axon = Axon::<i32, i32, TestInfallible>::new("IamNoToken")
3656                .with_iam(IamPolicy::RequireIdentity, verifier)
3657                .then(AddOne);
3658
3659            let mut bus = Bus::new();
3660            // No IamToken inserted
3661            let outcome = axon.execute(10, &(), &mut bus).await;
3662
3663            // Should emit missing_token event
3664            match &outcome {
3665                Outcome::Emit(label, _) => {
3666                    assert_eq!(label, "iam.missing_token");
3667                }
3668                other => panic!("Expected Emit(iam.missing_token), got {:?}", other),
3669            }
3670        }
3671
3672        #[tokio::test]
3673        async fn iam_rejects_failed_verification() {
3674            let verifier = MockVerifier {
3675                identity: IamIdentity::new("ignored"),
3676                should_fail: true,
3677            };
3678
3679            let axon = Axon::<i32, i32, TestInfallible>::new("IamBadToken")
3680                .with_iam(IamPolicy::RequireIdentity, verifier)
3681                .then(AddOne);
3682
3683            let mut bus = Bus::new();
3684            bus.insert(IamToken("bad-token".to_string()));
3685            let outcome = axon.execute(10, &(), &mut bus).await;
3686
3687            match &outcome {
3688                Outcome::Emit(label, _) => {
3689                    assert_eq!(label, "iam.verification_failed");
3690                }
3691                other => panic!("Expected Emit(iam.verification_failed), got {:?}", other),
3692            }
3693        }
3694
3695        #[tokio::test]
3696        async fn iam_require_role_passes_with_matching_role() {
3697            let verifier = MockVerifier {
3698                identity: IamIdentity::new("bob").with_role("admin").with_role("user"),
3699                should_fail: false,
3700            };
3701
3702            let axon = Axon::<i32, i32, TestInfallible>::new("IamRole")
3703                .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
3704                .then(AddOne);
3705
3706            let mut bus = Bus::new();
3707            bus.insert(IamToken("token".to_string()));
3708            let outcome = axon.execute(5, &(), &mut bus).await;
3709
3710            assert!(matches!(outcome, Outcome::Next(6)));
3711        }
3712
3713        #[tokio::test]
3714        async fn iam_require_role_denies_without_role() {
3715            let verifier = MockVerifier {
3716                identity: IamIdentity::new("carol").with_role("user"),
3717                should_fail: false,
3718            };
3719
3720            let axon = Axon::<i32, i32, TestInfallible>::new("IamRoleDeny")
3721                .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
3722                .then(AddOne);
3723
3724            let mut bus = Bus::new();
3725            bus.insert(IamToken("token".to_string()));
3726            let outcome = axon.execute(5, &(), &mut bus).await;
3727
3728            match &outcome {
3729                Outcome::Emit(label, _) => {
3730                    assert_eq!(label, "iam.policy_denied");
3731                }
3732                other => panic!("Expected Emit(iam.policy_denied), got {:?}", other),
3733            }
3734        }
3735
3736        #[tokio::test]
3737        async fn iam_policy_none_skips_verification() {
3738            let verifier = MockVerifier {
3739                identity: IamIdentity::new("ignored"),
3740                should_fail: true, // would fail if actually called
3741            };
3742
3743            let axon = Axon::<i32, i32, TestInfallible>::new("IamNone")
3744                .with_iam(IamPolicy::None, verifier)
3745                .then(AddOne);
3746
3747            let mut bus = Bus::new();
3748            // No token needed when policy is None
3749            let outcome = axon.execute(10, &(), &mut bus).await;
3750
3751            assert!(matches!(outcome, Outcome::Next(11)));
3752        }
3753    }
3754}