Skip to main content

ranvier_runtime/
axon.rs

1//! # Axon: Executable Decision Tree
2//!
3//! The `Axon` is the **runtime execution path** of a Typed Decision Tree.
4//! It functions as a reusable Typed Decision flow (Axon<In, Out, E>).
5//!
6//! ## Design Philosophy
7//!
8//! * **Axon flows, Schematic shows**: Axon executes; Schematic describes
9//! * **Builder pattern**: `Axon::start().then().then()`
10//! * **Schematic extraction**: Every Axon carries its structural metadata
11//!
12//! "Axon is the flowing thing, Schematic is the visible thing."
13
14use crate::persistence::{
15    CompensationAutoTrigger, CompensationContext, CompensationHandle,
16    CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17    PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use async_trait::async_trait;
20use ranvier_audit::{AuditEvent, AuditSink};
21use ranvier_core::bus::Bus;
22use ranvier_core::cluster::DistributedLock;
23use ranvier_core::event::{DlqPolicy, DlqSink};
24use ranvier_core::outcome::Outcome;
25use ranvier_core::policy::DynamicPolicy;
26use ranvier_core::saga::{SagaPolicy, SagaStack};
27use ranvier_core::schematic::{
28    BusCapabilitySchema, Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation,
29};
30use ranvier_core::timeline::{Timeline, TimelineEvent};
31use ranvier_core::transition::Transition;
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use std::any::type_name;
35use std::ffi::OsString;
36use std::fs;
37use std::future::Future;
38use std::panic::Location;
39use std::path::{Path, PathBuf};
40use std::pin::Pin;
41use std::sync::{Arc, Mutex, OnceLock};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tracing::Instrument;
44
45/// Configuration for Execution Mode.
46#[derive(Clone)]
47pub enum ExecutionMode {
48    /// Normal, unpartitioned local execution.
49    Local,
50    /// Singleton execution, ensures only one instance runs across the entire cluster.
51    Singleton {
52        lock_key: String,
53        ttl_ms: u64,
54        lock_provider: Arc<dyn DistributedLock>,
55    },
56}
57
58/// Strategy for parallel step execution.
59///
60/// Controls how the Axon handles faults during parallel branch execution.
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum ParallelStrategy {
63    /// All parallel steps must succeed; if any faults, return the first fault.
64    AllMustSucceed,
65    /// Continue even if some steps fail; return first successful result.
66    /// If all branches fault, returns the first fault.
67    AnyCanFail,
68}
69
70/// Type alias for async boxed futures used in Axon execution.
71pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
72
73/// Executor type for Axon steps.
74/// Now takes an input state `In`, a resource bundle `Res`, and returns an `Outcome<Out, E>`.
75/// Must be Send + Sync to be reusable across threads and clones.
76pub type Executor<In, Out, E, Res> =
77    Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
78
79/// Manual intervention jump command injected into the Bus.
80#[derive(Debug, Clone)]
81pub struct ManualJump {
82    pub target_node: String,
83    pub payload_override: Option<serde_json::Value>,
84}
85
86/// Start step index for resumption, injected into the Bus.
87#[derive(Debug, Clone, Copy)]
88struct StartStep(u64);
89
90/// Persisted state for resumption, injected into the Bus.
91#[derive(Debug, Clone)]
92struct ResumptionState {
93    payload: Option<serde_json::Value>,
94}
95
96/// Helper to extract a readable type name from a type.
97fn type_name_of<T: ?Sized>() -> String {
98    let full = type_name::<T>();
99    full.split("::").last().unwrap_or(full).to_string()
100}
101
102/// The Axon Builder and Runtime.
103///
104/// `Axon` represents an executable decision tree.
105/// It is reusable and thread-safe.
106///
107/// ## Example
108///
109/// ```rust,ignore
110/// use ranvier_core::prelude::*;
111/// // ...
112/// // Start with an identity Axon (In -> In)
113/// let axon = Axon::<(), (), _>::new("My Axon")
114///     .then(StepA)
115///     .then(StepB);
116///
117/// // Execute multiple times
118/// let res1 = axon.execute((), &mut bus1).await;
119/// let res2 = axon.execute((), &mut bus2).await;
120/// ```
121pub struct Axon<In, Out, E, Res = ()> {
122    /// The static structure (for visualization/analysis)
123    pub schematic: Schematic,
124    /// The runtime executor
125    executor: Executor<In, Out, E, Res>,
126    /// How this Axon is executed across the cluster
127    pub execution_mode: ExecutionMode,
128    /// Optional persistence store for state inspection
129    pub persistence_store: Option<Arc<dyn crate::persistence::PersistenceStore>>,
130    /// Optional audit sink for tamper-evident logging of interventions
131    pub audit_sink: Option<Arc<dyn AuditSink>>,
132    /// Optional dead-letter queue sink for storing failed events
133    pub dlq_sink: Option<Arc<dyn DlqSink>>,
134    /// Policy for handling event failures
135    pub dlq_policy: DlqPolicy,
136    /// Optional dynamic (hot-reloadable) DLQ policy — takes precedence over static `dlq_policy`
137    pub dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>,
138    /// Policy for automated saga compensation
139    pub saga_policy: SagaPolicy,
140    /// Optional dynamic (hot-reloadable) Saga policy — takes precedence over static `saga_policy`
141    pub dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>,
142    /// Registry for Saga compensation handlers
143    pub saga_compensation_registry:
144        Arc<std::sync::RwLock<ranvier_core::saga::SagaCompensationRegistry<E, Res>>>,
145    /// Optional IAM handle for identity verification at the Schematic boundary
146    pub iam_handle: Option<ranvier_core::iam::IamHandle>,
147}
148
149/// Schematic export request derived from command-line args/env.
150#[derive(Debug, Clone)]
151pub struct SchematicExportRequest {
152    /// Optional output file path. If omitted, schematic is written to stdout.
153    pub output: Option<PathBuf>,
154}
155
156impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
157    fn clone(&self) -> Self {
158        Self {
159            schematic: self.schematic.clone(),
160            executor: self.executor.clone(),
161            execution_mode: self.execution_mode.clone(),
162            persistence_store: self.persistence_store.clone(),
163            audit_sink: self.audit_sink.clone(),
164            dlq_sink: self.dlq_sink.clone(),
165            dlq_policy: self.dlq_policy.clone(),
166            dynamic_dlq_policy: self.dynamic_dlq_policy.clone(),
167            saga_policy: self.saga_policy.clone(),
168            dynamic_saga_policy: self.dynamic_saga_policy.clone(),
169            saga_compensation_registry: self.saga_compensation_registry.clone(),
170            iam_handle: self.iam_handle.clone(),
171        }
172    }
173}
174
175impl<In, E, Res> Axon<In, In, E, Res>
176where
177    In: Send + Sync + Serialize + DeserializeOwned + 'static,
178    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
179    Res: ranvier_core::transition::ResourceRequirement,
180{
181    /// Create a new Axon flow with the given label.
182    /// This is the preferred entry point per Flat API guidelines.
183    #[track_caller]
184    pub fn new(label: &str) -> Self {
185        let caller = Location::caller();
186        Self::start_with_source(label, caller)
187    }
188
189    /// Start defining a new Axon flow.
190    /// This creates an Identity Axon (In -> In) with no initial resource requirements.
191    #[track_caller]
192    pub fn start(label: &str) -> Self {
193        let caller = Location::caller();
194        Self::start_with_source(label, caller)
195    }
196
197    fn start_with_source(label: &str, caller: &'static Location<'static>) -> Self {
198        let node_id = uuid::Uuid::new_v4().to_string();
199        let node = Node {
200            id: node_id,
201            kind: NodeKind::Ingress,
202            label: label.to_string(),
203            description: None,
204            input_type: "void".to_string(),
205            output_type: type_name_of::<In>(),
206            resource_type: type_name_of::<Res>(),
207            metadata: Default::default(),
208            bus_capability: None,
209            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
210            position: None,
211            compensation_node_id: None,
212            input_schema: None,
213            output_schema: None,
214        };
215
216        let mut schematic = Schematic::new(label);
217        schematic.nodes.push(node);
218
219        let executor: Executor<In, In, E, Res> =
220            Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
221
222        Self {
223            schematic,
224            executor,
225            execution_mode: ExecutionMode::Local,
226            persistence_store: None,
227            audit_sink: None,
228            dlq_sink: None,
229            dlq_policy: DlqPolicy::default(),
230            dynamic_dlq_policy: None,
231            saga_policy: SagaPolicy::default(),
232            dynamic_saga_policy: None,
233            saga_compensation_registry: Arc::new(std::sync::RwLock::new(
234                ranvier_core::saga::SagaCompensationRegistry::new(),
235            )),
236            iam_handle: None,
237        }
238    }
239}
240
241impl Axon<(), (), (), ()> {
242    /// Convenience constructor for simple pipelines with no input state or resources.
243    ///
244    /// Reduces the common `Axon::<(), (), E>::new("label")` turbofish to
245    /// `Axon::simple::<E>("label")`, requiring only the error type parameter.
246    ///
247    /// # Example
248    ///
249    /// ```rust,ignore
250    /// // Before: 3 type parameters, 2 of which are always ()
251    /// let axon = Axon::<(), (), String>::new("pipeline");
252    ///
253    /// // After: only the error type
254    /// let axon = Axon::simple::<String>("pipeline");
255    /// ```
256    #[track_caller]
257    pub fn simple<E>(label: &str) -> Axon<(), (), E, ()>
258    where
259        E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
260    {
261        let caller = Location::caller();
262        <Axon<(), (), E, ()>>::start_with_source(label, caller)
263    }
264
265    /// Convenience constructor for pipelines with a typed input.
266    ///
267    /// Creates an identity Axon `In → In` with no resources, useful with
268    /// `HttpIngress::post_typed::<T>()` where the Axon's input type must
269    /// match the deserialized request body.
270    ///
271    /// # Example
272    ///
273    /// ```rust,ignore
274    /// #[derive(Deserialize, Serialize)]
275    /// struct CreateOrder { product_id: u64, qty: u32 }
276    ///
277    /// let axon = Axon::typed::<CreateOrder, String>("create-order")
278    ///     .then_fn("validate", |order: CreateOrder, _bus| {
279    ///         if order.qty == 0 { Outcome::Fault("empty order".into()) }
280    ///         else { Outcome::Next(order) }
281    ///     });
282    /// ```
283    #[track_caller]
284    pub fn typed<In, E>(label: &str) -> Axon<In, In, E, ()>
285    where
286        In: Send + Sync + Serialize + DeserializeOwned + 'static,
287        E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
288    {
289        let caller = Location::caller();
290        <Axon<In, In, E, ()>>::start_with_source(label, caller)
291    }
292}
293
294impl<In, Out, E, Res> Axon<In, Out, E, Res>
295where
296    In: Send + Sync + Serialize + DeserializeOwned + 'static,
297    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
298    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
299    Res: ranvier_core::transition::ResourceRequirement,
300{
301    /// Update the Execution Mode for this Axon (e.g., Local vs Singleton).
302    pub fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
303        self.execution_mode = mode;
304        self
305    }
306
307    /// Set the schematic version for this Axon.
308    pub fn with_version(mut self, version: impl Into<String>) -> Self {
309        self.schematic.schema_version = version.into();
310        self
311    }
312
313    /// Attach a persistence store to enable state inspection via the Inspector.
314    pub fn with_persistence_store<S>(mut self, store: S) -> Self
315    where
316        S: crate::persistence::PersistenceStore + 'static,
317    {
318        self.persistence_store = Some(Arc::new(store));
319        self
320    }
321
322    /// Attach an audit sink for tamper-evident logging.
323    pub fn with_audit_sink<S>(mut self, sink: S) -> Self
324    where
325        S: AuditSink + 'static,
326    {
327        self.audit_sink = Some(Arc::new(sink));
328        self
329    }
330
331    /// Set the Dead Letter Queue sink for this Axon.
332    pub fn with_dlq_sink<S>(mut self, sink: S) -> Self
333    where
334        S: DlqSink + 'static,
335    {
336        self.dlq_sink = Some(Arc::new(sink));
337        self
338    }
339
340    /// Set the Dead Letter Queue policy for this Axon.
341    pub fn with_dlq_policy(mut self, policy: DlqPolicy) -> Self {
342        self.dlq_policy = policy;
343        self
344    }
345
346    /// Set the Saga compensation policy for this Axon.
347    pub fn with_saga_policy(mut self, policy: SagaPolicy) -> Self {
348        self.saga_policy = policy;
349        self
350    }
351
352    /// Set a dynamic (hot-reloadable) DLQ policy. When set, the dynamic policy's
353    /// current value is read at each execution, overriding the static `dlq_policy`.
354    pub fn with_dynamic_dlq_policy(mut self, dynamic: DynamicPolicy<DlqPolicy>) -> Self {
355        self.dynamic_dlq_policy = Some(dynamic);
356        self
357    }
358
359    /// Set a dynamic (hot-reloadable) Saga policy. When set, the dynamic policy's
360    /// current value is read at each execution, overriding the static `saga_policy`.
361    pub fn with_dynamic_saga_policy(mut self, dynamic: DynamicPolicy<SagaPolicy>) -> Self {
362        self.dynamic_saga_policy = Some(dynamic);
363        self
364    }
365
366    /// Set an IAM policy and verifier for identity verification at the Axon boundary.
367    ///
368    /// When set, `execute()` will:
369    /// 1. Read `IamToken` from the Bus (injected by the HTTP layer or test harness)
370    /// 2. Verify the token using the provided verifier
371    /// 3. Enforce the policy against the verified identity
372    /// 4. Insert the resulting `IamIdentity` into the Bus for downstream Transitions
373    pub fn with_iam(
374        mut self,
375        policy: ranvier_core::iam::IamPolicy,
376        verifier: impl ranvier_core::iam::IamVerifier + 'static,
377    ) -> Self {
378        self.iam_handle = Some(ranvier_core::iam::IamHandle::new(
379            policy,
380            Arc::new(verifier),
381        ));
382        self
383    }
384
385    /// Attach a JSON Schema for the **last node's input type** in the schematic.
386    ///
387    /// Requires the `schema` feature and `T: schemars::JsonSchema`.
388    ///
389    /// ```rust,ignore
390    /// let axon = Axon::new("My Circuit")
391    ///     .then(ProcessStep)
392    ///     .with_input_schema::<CreateUserRequest>()
393    ///     .with_output_schema::<UserResponse>();
394    /// ```
395    #[cfg(feature = "schema")]
396    pub fn with_input_schema<T>(mut self) -> Self
397    where
398        T: schemars::JsonSchema,
399    {
400        if let Some(last_node) = self.schematic.nodes.last_mut() {
401            let schema = schemars::schema_for!(T);
402            last_node.input_schema =
403                Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
404        }
405        self
406    }
407
408    /// Attach a JSON Schema for the **last node's output type** in the schematic.
409    ///
410    /// Requires the `schema` feature and `T: schemars::JsonSchema`.
411    #[cfg(feature = "schema")]
412    pub fn with_output_schema<T>(mut self) -> Self
413    where
414        T: schemars::JsonSchema,
415    {
416        if let Some(last_node) = self.schematic.nodes.last_mut() {
417            let schema = schemars::schema_for!(T);
418            last_node.output_schema =
419                Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
420        }
421        self
422    }
423
424    /// Attach a raw JSON Schema value for the **last node's input type** in the schematic.
425    ///
426    /// Use this for pre-built schemas without requiring the `schema` feature.
427    pub fn with_input_schema_value(mut self, schema: serde_json::Value) -> Self {
428        if let Some(last_node) = self.schematic.nodes.last_mut() {
429            last_node.input_schema = Some(schema);
430        }
431        self
432    }
433
434    /// Attach a raw JSON Schema value for the **last node's output type** in the schematic.
435    pub fn with_output_schema_value(mut self, schema: serde_json::Value) -> Self {
436        if let Some(last_node) = self.schematic.nodes.last_mut() {
437            last_node.output_schema = Some(schema);
438        }
439        self
440    }
441}
442
443#[async_trait]
444impl<In, Out, E, Res> ranvier_inspector::StateInspector for Axon<In, Out, E, Res>
445where
446    In: Send + Sync + Serialize + DeserializeOwned + 'static,
447    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
448    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
449    Res: ranvier_core::transition::ResourceRequirement,
450{
451    async fn get_state(&self, trace_id: &str) -> Option<serde_json::Value> {
452        let store = self.persistence_store.as_ref()?;
453        let trace = store.load(trace_id).await.ok().flatten()?;
454        Some(serde_json::to_value(trace).unwrap_or(serde_json::Value::Null))
455    }
456
457    async fn force_resume(
458        &self,
459        trace_id: &str,
460        target_node: &str,
461        payload_override: Option<Value>,
462    ) -> Result<(), String> {
463        let store = self
464            .persistence_store
465            .as_ref()
466            .ok_or("No persistence store attached to Axon")?;
467
468        let intervention = crate::persistence::Intervention {
469            target_node: target_node.to_string(),
470            payload_override,
471            timestamp_ms: now_ms(),
472        };
473
474        store
475            .save_intervention(trace_id, intervention)
476            .await
477            .map_err(|e| format!("Failed to save intervention: {}", e))?;
478
479        if let Some(sink) = self.audit_sink.as_ref() {
480            let event = AuditEvent::new(
481                uuid::Uuid::new_v4().to_string(),
482                "Inspector".to_string(),
483                "ForceResume".to_string(),
484                trace_id.to_string(),
485            )
486            .with_metadata("target_node", target_node);
487
488            let _ = sink.append(&event).await;
489        }
490
491        tracing::info!(trace_id = %trace_id, target_node = %target_node, "Force resume requested via Inspector");
492        Ok(())
493    }
494}
495
496impl<In, Out, E, Res> Axon<In, Out, E, Res>
497where
498    In: Send + Sync + Serialize + DeserializeOwned + 'static,
499    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
500    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
501    Res: ranvier_core::transition::ResourceRequirement,
502{
503    /// Chain a transition to this Axon.
504    ///
505    /// Requires the transition to use the SAME resource bundle as the previous steps.
506    #[track_caller]
507    pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
508    where
509        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
510        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
511    {
512        let caller = Location::caller();
513        // Decompose self to avoid partial move issues
514        let Axon {
515            mut schematic,
516            executor: prev_executor,
517            execution_mode,
518            persistence_store,
519            audit_sink,
520            dlq_sink,
521            dlq_policy,
522            dynamic_dlq_policy,
523            saga_policy,
524            dynamic_saga_policy,
525            saga_compensation_registry,
526            iam_handle,
527        } = self;
528
529        // Update Schematic
530        let next_node_id = uuid::Uuid::new_v4().to_string();
531        let next_node = Node {
532            id: next_node_id.clone(),
533            kind: NodeKind::Atom,
534            label: transition.label(),
535            description: transition.description(),
536            input_type: type_name_of::<Out>(),
537            output_type: type_name_of::<Next>(),
538            resource_type: type_name_of::<Res>(),
539            metadata: Default::default(),
540            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
541            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
542            position: transition
543                .position()
544                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
545            compensation_node_id: None,
546            input_schema: transition.input_schema(),
547            output_schema: None,
548        };
549
550        let last_node_id = schematic
551            .nodes
552            .last()
553            .map(|n| n.id.clone())
554            .unwrap_or_default();
555
556        schematic.nodes.push(next_node);
557        schematic.edges.push(Edge {
558            from: last_node_id,
559            to: next_node_id.clone(),
560            kind: EdgeType::Linear,
561            label: Some("Next".to_string()),
562        });
563
564        // Compose Executor
565        let node_id_for_exec = next_node_id.clone();
566        let node_label_for_exec = transition.label();
567        let bus_policy_for_exec = transition.bus_access_policy();
568        let bus_policy_clone = bus_policy_for_exec.clone();
569        let current_step_idx = schematic.nodes.len() as u64 - 1;
570        let next_executor: Executor<In, Next, E, Res> = Arc::new(
571            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
572                let prev = prev_executor.clone();
573                let trans = transition.clone();
574                let timeline_node_id = node_id_for_exec.clone();
575                let timeline_node_label = node_label_for_exec.clone();
576                let transition_bus_policy = bus_policy_clone.clone();
577                let step_idx = current_step_idx;
578
579                Box::pin(async move {
580                    // Check for manual intervention jump
581                    if let Some(jump) = bus.read::<ManualJump>()
582                        && (jump.target_node == timeline_node_id
583                            || jump.target_node == timeline_node_label)
584                    {
585                        tracing::info!(
586                            node_id = %timeline_node_id,
587                            node_label = %timeline_node_label,
588                            "Manual jump target reached; skipping previous steps"
589                        );
590
591                        let state = if let Some(ow) = jump.payload_override.clone() {
592                            match serde_json::from_value::<Out>(ow) {
593                                Ok(s) => s,
594                                Err(e) => {
595                                    tracing::error!(
596                                        "Payload override deserialization failed: {}",
597                                        e
598                                    );
599                                    return Outcome::emit(
600                                        "execution.jump.payload_error",
601                                        Some(serde_json::json!({"error": e.to_string()})),
602                                    )
603                                    .map(|_: ()| unreachable!());
604                                }
605                            }
606                        } else {
607                            // Default back to the provided input if this is an identity jump or types match
608                            // For now, treat missing payload on a mid-flow jump as an avoidable error if Possible.
609                            // In a better implementation, we'd try to load the last persisted Out for the previous step.
610                            return Outcome::emit(
611                                "execution.jump.missing_payload",
612                                Some(serde_json::json!({"node_id": timeline_node_id})),
613                            );
614                        };
615
616                        // Skip prev() and continue with trans.run(state, ...)
617                        return run_this_step::<Out, Next, E, Res>(
618                            &trans,
619                            state,
620                            res,
621                            bus,
622                            &timeline_node_id,
623                            &timeline_node_label,
624                            &transition_bus_policy,
625                            step_idx,
626                        )
627                        .await;
628                    }
629
630                    // Handle resumption skip
631                    if let Some(start) = bus.read::<StartStep>()
632                        && step_idx == start.0
633                        && bus.read::<ResumptionState>().is_some()
634                    {
635                        // Prefer fresh input (In → Out via JSON round-trip).
636                        // The caller provides updated state (e.g., corrected data after a fault).
637                        // Falls back to persisted checkpoint state when types are incompatible.
638                        let fresh_state = serde_json::to_value(&input)
639                            .ok()
640                            .and_then(|v| serde_json::from_value::<Out>(v).ok());
641                        let persisted_state = bus
642                            .read::<ResumptionState>()
643                            .and_then(|r| r.payload.clone())
644                            .and_then(|p| serde_json::from_value::<Out>(p).ok());
645
646                        if let Some(s) = fresh_state.or(persisted_state) {
647                            tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint");
648                            return run_this_step::<Out, Next, E, Res>(
649                                &trans,
650                                s,
651                                res,
652                                bus,
653                                &timeline_node_id,
654                                &timeline_node_label,
655                                &transition_bus_policy,
656                                step_idx,
657                            )
658                            .await;
659                        }
660
661                        return Outcome::emit(
662                            "execution.resumption.payload_error",
663                            Some(serde_json::json!({"error": "no compatible resumption state"})),
664                        )
665                        .map(|_: ()| unreachable!());
666                    }
667
668                    // Run previous step
669                    let prev_result = prev(input, res, bus).await;
670
671                    // Unpack
672                    let state = match prev_result {
673                        Outcome::Next(t) => t,
674                        other => return other.map(|_| unreachable!()),
675                    };
676
677                    run_this_step::<Out, Next, E, Res>(
678                        &trans,
679                        state,
680                        res,
681                        bus,
682                        &timeline_node_id,
683                        &timeline_node_label,
684                        &transition_bus_policy,
685                        step_idx,
686                    )
687                    .await
688                })
689            },
690        );
691        Axon {
692            schematic,
693            executor: next_executor,
694            execution_mode,
695            persistence_store,
696            audit_sink,
697            dlq_sink,
698            dlq_policy,
699            dynamic_dlq_policy,
700            saga_policy,
701            dynamic_saga_policy,
702            saga_compensation_registry,
703            iam_handle,
704        }
705    }
706
707    /// Chain a closure as a lightweight Transition step.
708    ///
709    /// The closure receives `(input, &mut Bus)` and returns `Outcome<Next, E>`.
710    /// This is ideal for simple data transformations or validation checks that
711    /// don't need a full `#[transition]` struct.
712    ///
713    /// The closure does not receive typed resources. For resource-dependent
714    /// logic, use a full `Transition` struct with `then()`.
715    ///
716    /// # Example
717    ///
718    /// ```rust,ignore
719    /// use ranvier_runtime::Axon;
720    /// use ranvier_core::prelude::*;
721    ///
722    /// let axon = Axon::simple::<String>("pipeline")
723    ///     .then_fn("score_check", |_input: (), bus: &mut Bus| {
724    ///         let score = bus.read::<u32>().copied().unwrap_or(0);
725    ///         if score > 75 {
726    ///             Outcome::next("REJECTED".to_string())
727    ///         } else {
728    ///             Outcome::next("APPROVED".to_string())
729    ///         }
730    ///     });
731    /// ```
732    #[track_caller]
733    pub fn then_fn<Next, F>(self, label: &str, f: F) -> Axon<In, Next, E, Res>
734    where
735        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
736        F: Fn(Out, &mut Bus) -> Outcome<Next, E> + Clone + Send + Sync + 'static,
737    {
738        self.then(crate::closure_transition::ClosureTransition::new(label, f))
739    }
740
741    /// Chain a transition with a retry policy.
742    ///
743    /// If the transition returns `Outcome::Fault`, it will be retried up to
744    /// `policy.max_retries` times with the configured backoff strategy.
745    /// Timeline events are recorded for each retry attempt.
746    ///
747    /// # Example
748    ///
749    /// ```rust,ignore
750    /// use ranvier_runtime::{Axon, RetryPolicy};
751    /// use std::time::Duration;
752    ///
753    /// let axon = Axon::new("pipeline")
754    ///     .then_with_retry(my_transition, RetryPolicy::fixed(3, Duration::from_millis(100)));
755    /// ```
756    #[track_caller]
757    pub fn then_with_retry<Next, Trans>(
758        self,
759        transition: Trans,
760        policy: crate::retry::RetryPolicy,
761    ) -> Axon<In, Next, E, Res>
762    where
763        Out: Clone,
764        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
765        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
766    {
767        let caller = Location::caller();
768        let Axon {
769            mut schematic,
770            executor: prev_executor,
771            execution_mode,
772            persistence_store,
773            audit_sink,
774            dlq_sink,
775            dlq_policy,
776            dynamic_dlq_policy,
777            saga_policy,
778            dynamic_saga_policy,
779            saga_compensation_registry,
780            iam_handle,
781        } = self;
782
783        let next_node_id = uuid::Uuid::new_v4().to_string();
784        let next_node = Node {
785            id: next_node_id.clone(),
786            kind: NodeKind::Atom,
787            label: transition.label(),
788            description: transition.description(),
789            input_type: type_name_of::<Out>(),
790            output_type: type_name_of::<Next>(),
791            resource_type: type_name_of::<Res>(),
792            metadata: Default::default(),
793            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
794            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
795            position: transition
796                .position()
797                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
798            compensation_node_id: None,
799            input_schema: transition.input_schema(),
800            output_schema: None,
801        };
802
803        let last_node_id = schematic
804            .nodes
805            .last()
806            .map(|n| n.id.clone())
807            .unwrap_or_default();
808
809        schematic.nodes.push(next_node);
810        schematic.edges.push(Edge {
811            from: last_node_id,
812            to: next_node_id.clone(),
813            kind: EdgeType::Linear,
814            label: Some("Next (retryable)".to_string()),
815        });
816
817        let node_id_for_exec = next_node_id.clone();
818        let node_label_for_exec = transition.label();
819        let bus_policy_for_exec = transition.bus_access_policy();
820        let bus_policy_clone = bus_policy_for_exec.clone();
821        let current_step_idx = schematic.nodes.len() as u64 - 1;
822        let next_executor: Executor<In, Next, E, Res> = Arc::new(
823            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
824                let prev = prev_executor.clone();
825                let trans = transition.clone();
826                let timeline_node_id = node_id_for_exec.clone();
827                let timeline_node_label = node_label_for_exec.clone();
828                let transition_bus_policy = bus_policy_clone.clone();
829                let step_idx = current_step_idx;
830                let retry_policy = policy.clone();
831
832                Box::pin(async move {
833                    // Run previous step
834                    let prev_result = prev(input, res, bus).await;
835                    let state = match prev_result {
836                        Outcome::Next(t) => t,
837                        other => return other.map(|_| unreachable!()),
838                    };
839
840                    // Attempt with retries
841                    let mut last_result = None;
842                    for attempt in 0..=retry_policy.max_retries {
843                        let attempt_state = state.clone();
844
845                        let result = run_this_step::<Out, Next, E, Res>(
846                            &trans,
847                            attempt_state,
848                            res,
849                            bus,
850                            &timeline_node_id,
851                            &timeline_node_label,
852                            &transition_bus_policy,
853                            step_idx,
854                        )
855                        .await;
856
857                        match &result {
858                            Outcome::Next(_) => return result,
859                            Outcome::Fault(_) if attempt < retry_policy.max_retries => {
860                                let delay = retry_policy.delay_for_attempt(attempt);
861                                tracing::warn!(
862                                    node_id = %timeline_node_id,
863                                    attempt = attempt + 1,
864                                    max = retry_policy.max_retries,
865                                    delay_ms = delay.as_millis() as u64,
866                                    "Transition failed, retrying"
867                                );
868                                if let Some(timeline) = bus.read_mut::<Timeline>() {
869                                    timeline.push(TimelineEvent::NodeRetry {
870                                        node_id: timeline_node_id.clone(),
871                                        attempt: attempt + 1,
872                                        max_attempts: retry_policy.max_retries,
873                                        backoff_ms: delay.as_millis() as u64,
874                                        timestamp: now_ms(),
875                                    });
876                                }
877                                tokio::time::sleep(delay).await;
878                            }
879                            _ => {
880                                last_result = Some(result);
881                                break;
882                            }
883                        }
884                    }
885
886                    last_result.unwrap_or_else(|| {
887                        Outcome::emit("execution.retry.exhausted", None)
888                    })
889                })
890            },
891        );
892        Axon {
893            schematic,
894            executor: next_executor,
895            execution_mode,
896            persistence_store,
897            audit_sink,
898            dlq_sink,
899            dlq_policy,
900            dynamic_dlq_policy,
901            saga_policy,
902            dynamic_saga_policy,
903            saga_compensation_registry,
904            iam_handle,
905        }
906    }
907
908    /// Chain a transition to this Axon with a timeout guard.
909    ///
910    /// If the transition does not complete within the specified duration,
911    /// the execution is cancelled and a `Fault` is returned using the
912    /// provided error factory.
913    ///
914    /// # Example
915    ///
916    /// ```rust,ignore
917    /// use std::time::Duration;
918    ///
919    /// let pipeline = Axon::simple::<String>("API")
920    ///     .then_with_timeout(
921    ///         slow_handler,
922    ///         Duration::from_secs(5),
923    ///         || "Request timed out after 5 seconds".to_string(),
924    ///     );
925    /// ```
926    #[track_caller]
927    pub fn then_with_timeout<Next, Trans, F>(
928        self,
929        transition: Trans,
930        duration: std::time::Duration,
931        make_timeout_error: F,
932    ) -> Axon<In, Next, E, Res>
933    where
934        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
935        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
936        F: Fn() -> E + Clone + Send + Sync + 'static,
937    {
938        let caller = Location::caller();
939        let Axon {
940            mut schematic,
941            executor: prev_executor,
942            execution_mode,
943            persistence_store,
944            audit_sink,
945            dlq_sink,
946            dlq_policy,
947            dynamic_dlq_policy,
948            saga_policy,
949            dynamic_saga_policy,
950            saga_compensation_registry,
951            iam_handle,
952        } = self;
953
954        let next_node_id = uuid::Uuid::new_v4().to_string();
955        let next_node = Node {
956            id: next_node_id.clone(),
957            kind: NodeKind::Atom,
958            label: transition.label(),
959            description: transition.description(),
960            input_type: type_name_of::<Out>(),
961            output_type: type_name_of::<Next>(),
962            resource_type: type_name_of::<Res>(),
963            metadata: Default::default(),
964            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
965            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
966            position: transition
967                .position()
968                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
969            compensation_node_id: None,
970            input_schema: transition.input_schema(),
971            output_schema: None,
972        };
973
974        let last_node_id = schematic
975            .nodes
976            .last()
977            .map(|n| n.id.clone())
978            .unwrap_or_default();
979
980        schematic.nodes.push(next_node);
981        schematic.edges.push(Edge {
982            from: last_node_id,
983            to: next_node_id.clone(),
984            kind: EdgeType::Linear,
985            label: Some("Next (timeout-guarded)".to_string()),
986        });
987
988        let node_id_for_exec = next_node_id.clone();
989        let node_label_for_exec = transition.label();
990        let bus_policy_for_exec = transition.bus_access_policy();
991        let bus_policy_clone = bus_policy_for_exec.clone();
992        let current_step_idx = schematic.nodes.len() as u64 - 1;
993        let next_executor: Executor<In, Next, E, Res> = Arc::new(
994            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
995                let prev = prev_executor.clone();
996                let trans = transition.clone();
997                let timeline_node_id = node_id_for_exec.clone();
998                let timeline_node_label = node_label_for_exec.clone();
999                let transition_bus_policy = bus_policy_clone.clone();
1000                let step_idx = current_step_idx;
1001                let timeout_duration = duration;
1002                let error_factory = make_timeout_error.clone();
1003
1004                Box::pin(async move {
1005                    // Run previous step
1006                    let prev_result = prev(input, res, bus).await;
1007                    let state = match prev_result {
1008                        Outcome::Next(t) => t,
1009                        other => return other.map(|_| unreachable!()),
1010                    };
1011
1012                    // Execute with timeout
1013                    match tokio::time::timeout(
1014                        timeout_duration,
1015                        run_this_step::<Out, Next, E, Res>(
1016                            &trans,
1017                            state,
1018                            res,
1019                            bus,
1020                            &timeline_node_id,
1021                            &timeline_node_label,
1022                            &transition_bus_policy,
1023                            step_idx,
1024                        ),
1025                    )
1026                    .await
1027                    {
1028                        Ok(result) => result,
1029                        Err(_elapsed) => {
1030                            tracing::warn!(
1031                                node_id = %timeline_node_id,
1032                                timeout_ms = timeout_duration.as_millis() as u64,
1033                                "Transition timed out"
1034                            );
1035                            if let Some(timeline) = bus.read_mut::<Timeline>() {
1036                                timeline.push(TimelineEvent::NodeTimeout {
1037                                    node_id: timeline_node_id.clone(),
1038                                    timeout_ms: timeout_duration.as_millis() as u64,
1039                                    timestamp: now_ms(),
1040                                });
1041                            }
1042                            Outcome::Fault(error_factory())
1043                        }
1044                    }
1045                })
1046            },
1047        );
1048        Axon {
1049            schematic,
1050            executor: next_executor,
1051            execution_mode,
1052            persistence_store,
1053            audit_sink,
1054            dlq_sink,
1055            dlq_policy,
1056            dynamic_dlq_policy,
1057            saga_policy,
1058            dynamic_saga_policy,
1059            saga_compensation_registry,
1060            iam_handle,
1061        }
1062    }
1063
1064    /// Chain a transition to this Axon with a Saga compensation step.
1065    ///
1066    /// If the transition fails, the compensation transition will be executed
1067    /// automatically if `CompensationAutoTrigger` is enabled in the Bus.
1068    #[track_caller]
1069    pub fn then_compensated<Next, Trans, Comp>(
1070        self,
1071        transition: Trans,
1072        compensation: Comp,
1073    ) -> Axon<In, Next, E, Res>
1074    where
1075        Out: Clone,
1076        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
1077        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1078        Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1079    {
1080        let caller = Location::caller();
1081        let Axon {
1082            mut schematic,
1083            executor: prev_executor,
1084            execution_mode,
1085            persistence_store,
1086            audit_sink,
1087            dlq_sink,
1088            dlq_policy,
1089            dynamic_dlq_policy,
1090            saga_policy,
1091            dynamic_saga_policy,
1092            saga_compensation_registry,
1093            iam_handle,
1094        } = self;
1095
1096        // 1. Add Primary Node
1097        let next_node_id = uuid::Uuid::new_v4().to_string();
1098        let comp_node_id = uuid::Uuid::new_v4().to_string();
1099
1100        let next_node = Node {
1101            id: next_node_id.clone(),
1102            kind: NodeKind::Atom,
1103            label: transition.label(),
1104            description: transition.description(),
1105            input_type: type_name_of::<Out>(),
1106            output_type: type_name_of::<Next>(),
1107            resource_type: type_name_of::<Res>(),
1108            metadata: Default::default(),
1109            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
1110            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1111            position: transition
1112                .position()
1113                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1114            compensation_node_id: Some(comp_node_id.clone()),
1115            input_schema: None,
1116            output_schema: None,
1117        };
1118
1119        // 2. Add Compensation Node
1120        let comp_node = Node {
1121            id: comp_node_id.clone(),
1122            kind: NodeKind::Atom,
1123            label: format!("Compensate: {}", compensation.label()),
1124            description: compensation.description(),
1125            input_type: type_name_of::<Out>(),
1126            output_type: "void".to_string(),
1127            resource_type: type_name_of::<Res>(),
1128            metadata: Default::default(),
1129            bus_capability: None,
1130            source_location: None,
1131            position: compensation
1132                .position()
1133                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1134            compensation_node_id: None,
1135            input_schema: None,
1136            output_schema: None,
1137        };
1138
1139        let last_node_id = schematic
1140            .nodes
1141            .last()
1142            .map(|n| n.id.clone())
1143            .unwrap_or_default();
1144
1145        schematic.nodes.push(next_node);
1146        schematic.nodes.push(comp_node);
1147        schematic.edges.push(Edge {
1148            from: last_node_id,
1149            to: next_node_id.clone(),
1150            kind: EdgeType::Linear,
1151            label: Some("Next".to_string()),
1152        });
1153
1154        // 3. Compose Executor with Compensation Logic
1155        let node_id_for_exec = next_node_id.clone();
1156        let comp_id_for_exec = comp_node_id.clone();
1157        let node_label_for_exec = transition.label();
1158        let bus_policy_for_exec = transition.bus_access_policy();
1159        let step_idx_for_exec = schematic.nodes.len() as u64 - 2;
1160        let comp_for_exec = compensation.clone();
1161        let bus_policy_for_executor = bus_policy_for_exec.clone();
1162        let bus_policy_for_registry = bus_policy_for_exec.clone();
1163        let next_executor: Executor<In, Next, E, Res> = Arc::new(
1164            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
1165                let prev = prev_executor.clone();
1166                let trans = transition.clone();
1167                let comp = comp_for_exec.clone();
1168                let timeline_node_id = node_id_for_exec.clone();
1169                let timeline_comp_id = comp_id_for_exec.clone();
1170                let timeline_node_label = node_label_for_exec.clone();
1171                let transition_bus_policy = bus_policy_for_executor.clone();
1172                let step_idx = step_idx_for_exec;
1173
1174                Box::pin(async move {
1175                    // Check for manual intervention jump
1176                    if let Some(jump) = bus.read::<ManualJump>()
1177                        && (jump.target_node == timeline_node_id
1178                            || jump.target_node == timeline_node_label)
1179                    {
1180                        tracing::info!(
1181                            node_id = %timeline_node_id,
1182                            node_label = %timeline_node_label,
1183                            "Manual jump target reached (compensated); skipping previous steps"
1184                        );
1185
1186                        let state = if let Some(ow) = jump.payload_override.clone() {
1187                            match serde_json::from_value::<Out>(ow) {
1188                                Ok(s) => s,
1189                                Err(e) => {
1190                                    tracing::error!(
1191                                        "Payload override deserialization failed: {}",
1192                                        e
1193                                    );
1194                                    return Outcome::emit(
1195                                        "execution.jump.payload_error",
1196                                        Some(serde_json::json!({"error": e.to_string()})),
1197                                    )
1198                                    .map(|_: ()| unreachable!());
1199                                }
1200                            }
1201                        } else {
1202                            return Outcome::emit(
1203                                "execution.jump.missing_payload",
1204                                Some(serde_json::json!({"node_id": timeline_node_id})),
1205                            )
1206                            .map(|_: ()| unreachable!());
1207                        };
1208
1209                        // Skip prev() and continue with trans.run(state, ...)
1210                        return run_this_compensated_step::<Out, Next, E, Res, Comp>(
1211                            &trans,
1212                            &comp,
1213                            state,
1214                            res,
1215                            bus,
1216                            &timeline_node_id,
1217                            &timeline_comp_id,
1218                            &timeline_node_label,
1219                            &transition_bus_policy,
1220                            step_idx,
1221                        )
1222                        .await;
1223                    }
1224
1225                    // Handle resumption skip
1226                    if let Some(start) = bus.read::<StartStep>()
1227                        && step_idx == start.0
1228                        && bus.read::<ResumptionState>().is_some()
1229                    {
1230                        let fresh_state = serde_json::to_value(&input)
1231                            .ok()
1232                            .and_then(|v| serde_json::from_value::<Out>(v).ok());
1233                        let persisted_state = bus
1234                            .read::<ResumptionState>()
1235                            .and_then(|r| r.payload.clone())
1236                            .and_then(|p| serde_json::from_value::<Out>(p).ok());
1237
1238                        if let Some(s) = fresh_state.or(persisted_state) {
1239                            tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint (compensated)");
1240                            return run_this_compensated_step::<Out, Next, E, Res, Comp>(
1241                                &trans,
1242                                &comp,
1243                                s,
1244                                res,
1245                                bus,
1246                                &timeline_node_id,
1247                                &timeline_comp_id,
1248                                &timeline_node_label,
1249                                &transition_bus_policy,
1250                                step_idx,
1251                            )
1252                            .await;
1253                        }
1254
1255                        return Outcome::emit(
1256                            "execution.resumption.payload_error",
1257                            Some(serde_json::json!({"error": "no compatible resumption state"})),
1258                        )
1259                        .map(|_: ()| unreachable!());
1260                    }
1261
1262                    // Run previous step
1263                    let prev_result = prev(input, res, bus).await;
1264
1265                    // Unpack
1266                    let state = match prev_result {
1267                        Outcome::Next(t) => t,
1268                        other => return other.map(|_| unreachable!()),
1269                    };
1270
1271                    run_this_compensated_step::<Out, Next, E, Res, Comp>(
1272                        &trans,
1273                        &comp,
1274                        state,
1275                        res,
1276                        bus,
1277                        &timeline_node_id,
1278                        &timeline_comp_id,
1279                        &timeline_node_label,
1280                        &transition_bus_policy,
1281                        step_idx,
1282                    )
1283                    .await
1284                })
1285            },
1286        );
1287        // 4. Register Saga Compensation if enabled
1288        {
1289            let mut registry = saga_compensation_registry.write().expect("saga compensation registry lock poisoned");
1290            let comp_fn = compensation.clone();
1291            let transition_bus_policy = bus_policy_for_registry.clone();
1292
1293            let handler: ranvier_core::saga::SagaCompensationFn<E, Res> =
1294                Arc::new(move |input_data, res, bus| {
1295                    let comp = comp_fn.clone();
1296                    let bus_policy = transition_bus_policy.clone();
1297                    Box::pin(async move {
1298                        let input: Out = serde_json::from_slice(&input_data).expect("saga compensation input deserialization failed — type mismatch between snapshot and compensation handler");
1299                        bus.set_access_policy(comp.label(), bus_policy);
1300                        let res = comp.run(input, res, bus).await;
1301                        bus.clear_access_policy();
1302                        res
1303                    })
1304                });
1305            registry.register(next_node_id.clone(), handler);
1306        }
1307
1308        Axon {
1309            schematic,
1310            executor: next_executor,
1311            execution_mode,
1312            persistence_store,
1313            audit_sink,
1314            dlq_sink,
1315            dlq_policy,
1316            dynamic_dlq_policy,
1317            saga_policy,
1318            dynamic_saga_policy,
1319            saga_compensation_registry,
1320            iam_handle,
1321        }
1322    }
1323
1324    /// Attach a compensation transition to the previously added node.
1325    /// This establishes a Schematic-level Saga compensation mapping.
1326    #[track_caller]
1327    pub fn compensate_with<Comp>(mut self, transition: Comp) -> Self
1328    where
1329        Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1330    {
1331        // NOTE: This currently only updates the Schematic.
1332        // For runtime compensation behavior, use `then_compensated`.
1333        let caller = Location::caller();
1334        let comp_node_id = uuid::Uuid::new_v4().to_string();
1335
1336        let comp_node = Node {
1337            id: comp_node_id.clone(),
1338            kind: NodeKind::Atom,
1339            label: transition.label(),
1340            description: transition.description(),
1341            input_type: type_name_of::<Out>(),
1342            output_type: "void".to_string(),
1343            resource_type: type_name_of::<Res>(),
1344            metadata: Default::default(),
1345            bus_capability: None,
1346            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1347            position: transition
1348                .position()
1349                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1350            compensation_node_id: None,
1351            input_schema: None,
1352            output_schema: None,
1353        };
1354
1355        if let Some(last_node) = self.schematic.nodes.last_mut() {
1356            last_node.compensation_node_id = Some(comp_node_id.clone());
1357        }
1358
1359        self.schematic.nodes.push(comp_node);
1360        self
1361    }
1362
1363    /// Add a branch point
1364    #[track_caller]
1365    pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
1366        let caller = Location::caller();
1367        let branch_id_str = branch_id.into();
1368        let last_node_id = self
1369            .schematic
1370            .nodes
1371            .last()
1372            .map(|n| n.id.clone())
1373            .unwrap_or_default();
1374
1375        let branch_node = Node {
1376            id: uuid::Uuid::new_v4().to_string(),
1377            kind: NodeKind::Synapse,
1378            label: label.to_string(),
1379            description: None,
1380            input_type: type_name_of::<Out>(),
1381            output_type: type_name_of::<Out>(),
1382            resource_type: type_name_of::<Res>(),
1383            metadata: Default::default(),
1384            bus_capability: None,
1385            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1386            position: None,
1387            compensation_node_id: None,
1388            input_schema: None,
1389            output_schema: None,
1390        };
1391
1392        self.schematic.nodes.push(branch_node);
1393        self.schematic.edges.push(Edge {
1394            from: last_node_id,
1395            to: branch_id_str.clone(),
1396            kind: EdgeType::Branch(branch_id_str),
1397            label: Some("Branch".to_string()),
1398        });
1399
1400        self
1401    }
1402
1403    /// Run multiple transitions in parallel (fan-out / fan-in).
1404    ///
1405    /// Each transition receives a clone of the current step's output and runs
1406    /// concurrently via [`futures_util::future::join_all`]. The strategy controls
1407    /// how faults are handled:
1408    ///
1409    /// - [`ParallelStrategy::AllMustSucceed`]: All branches must produce `Next`.
1410    ///   If any branch returns `Fault`, the first fault is propagated.
1411    /// - [`ParallelStrategy::AnyCanFail`]: Branches that fault are ignored as
1412    ///   long as at least one succeeds. If all branches fault, the first fault
1413    ///   is returned.
1414    ///
1415    /// The **first successful `Next` value** is forwarded to the next step in
1416    /// the pipeline. A custom merge can be layered on top via a subsequent
1417    /// `.then()` step.
1418    ///
1419    /// Each parallel branch receives its own fresh [`Bus`] instance. Resources
1420    /// should be injected via the shared `Res` bundle rather than the Bus for
1421    /// parallel steps.
1422    ///
1423    /// ## Schematic
1424    ///
1425    /// The method emits a `FanOut` node, one `Atom` node per branch (connected
1426    /// via `Parallel` edges), and a `FanIn` join node.
1427    ///
1428    /// ## Example
1429    ///
1430    /// ```rust,ignore
1431    /// let axon = Axon::new("Pipeline")
1432    ///     .then(ParseInput)
1433    ///     .parallel(
1434    ///         vec![Arc::new(EnrichA), Arc::new(EnrichB)],
1435    ///         ParallelStrategy::AllMustSucceed,
1436    ///     )
1437    ///     .then(MergeResults);
1438    /// ```
1439    #[track_caller]
1440    pub fn parallel(
1441        self,
1442        transitions: Vec<Arc<dyn Transition<Out, Out, Resources = Res, Error = E> + Send + Sync>>,
1443        strategy: ParallelStrategy,
1444    ) -> Axon<In, Out, E, Res>
1445    where
1446        Out: Clone,
1447    {
1448        let caller = Location::caller();
1449        let Axon {
1450            mut schematic,
1451            executor: prev_executor,
1452            execution_mode,
1453            persistence_store,
1454            audit_sink,
1455            dlq_sink,
1456            dlq_policy,
1457            dynamic_dlq_policy,
1458            saga_policy,
1459            dynamic_saga_policy,
1460            saga_compensation_registry,
1461            iam_handle,
1462        } = self;
1463
1464        // ── Schematic: FanOut node ─────────────────────────────────
1465        let fanout_id = uuid::Uuid::new_v4().to_string();
1466        let fanin_id = uuid::Uuid::new_v4().to_string();
1467
1468        let last_node_id = schematic
1469            .nodes
1470            .last()
1471            .map(|n| n.id.clone())
1472            .unwrap_or_default();
1473
1474        let fanout_node = Node {
1475            id: fanout_id.clone(),
1476            kind: NodeKind::FanOut,
1477            label: "FanOut".to_string(),
1478            description: Some(format!(
1479                "Parallel split ({} branches, {:?})",
1480                transitions.len(),
1481                strategy
1482            )),
1483            input_type: type_name_of::<Out>(),
1484            output_type: type_name_of::<Out>(),
1485            resource_type: type_name_of::<Res>(),
1486            metadata: Default::default(),
1487            bus_capability: None,
1488            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1489            position: None,
1490            compensation_node_id: None,
1491            input_schema: None,
1492            output_schema: None,
1493        };
1494
1495        schematic.nodes.push(fanout_node);
1496        schematic.edges.push(Edge {
1497            from: last_node_id,
1498            to: fanout_id.clone(),
1499            kind: EdgeType::Linear,
1500            label: Some("Next".to_string()),
1501        });
1502
1503        // ── Schematic: one Atom node per parallel branch ───────────
1504        let mut branch_node_ids = Vec::with_capacity(transitions.len());
1505        for (i, trans) in transitions.iter().enumerate() {
1506            let branch_id = uuid::Uuid::new_v4().to_string();
1507            let branch_node = Node {
1508                id: branch_id.clone(),
1509                kind: NodeKind::Atom,
1510                label: trans.label(),
1511                description: trans.description(),
1512                input_type: type_name_of::<Out>(),
1513                output_type: type_name_of::<Out>(),
1514                resource_type: type_name_of::<Res>(),
1515                metadata: Default::default(),
1516                bus_capability: bus_capability_schema_from_policy(trans.bus_access_policy()),
1517                source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1518                position: None,
1519                compensation_node_id: None,
1520                input_schema: trans.input_schema(),
1521                output_schema: None,
1522            };
1523            schematic.nodes.push(branch_node);
1524            schematic.edges.push(Edge {
1525                from: fanout_id.clone(),
1526                to: branch_id.clone(),
1527                kind: EdgeType::Parallel,
1528                label: Some(format!("Branch {}", i)),
1529            });
1530            branch_node_ids.push(branch_id);
1531        }
1532
1533        // ── Schematic: FanIn node ──────────────────────────────────
1534        let fanin_node = Node {
1535            id: fanin_id.clone(),
1536            kind: NodeKind::FanIn,
1537            label: "FanIn".to_string(),
1538            description: Some(format!("Parallel join ({:?})", strategy)),
1539            input_type: type_name_of::<Out>(),
1540            output_type: type_name_of::<Out>(),
1541            resource_type: type_name_of::<Res>(),
1542            metadata: Default::default(),
1543            bus_capability: None,
1544            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1545            position: None,
1546            compensation_node_id: None,
1547            input_schema: None,
1548            output_schema: None,
1549        };
1550
1551        schematic.nodes.push(fanin_node);
1552        for branch_id in &branch_node_ids {
1553            schematic.edges.push(Edge {
1554                from: branch_id.clone(),
1555                to: fanin_id.clone(),
1556                kind: EdgeType::Parallel,
1557                label: Some("Join".to_string()),
1558            });
1559        }
1560
1561        // ── Executor: parallel composition ─────────────────────────
1562        let fanout_node_id = fanout_id.clone();
1563        let fanin_node_id = fanin_id.clone();
1564        let branch_ids_for_exec = branch_node_ids.clone();
1565        let step_idx = schematic.nodes.len() as u64 - 1;
1566
1567        let next_executor: Executor<In, Out, E, Res> = Arc::new(
1568            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Out, E>> {
1569                let prev = prev_executor.clone();
1570                let branches = transitions.clone();
1571                let fanout_id = fanout_node_id.clone();
1572                let fanin_id = fanin_node_id.clone();
1573                let branch_ids = branch_ids_for_exec.clone();
1574
1575                Box::pin(async move {
1576                    // Run previous steps
1577                    let prev_result = prev(input, res, bus).await;
1578                    let state = match prev_result {
1579                        Outcome::Next(t) => t,
1580                        other => return other.map(|_| unreachable!()),
1581                    };
1582
1583                    // Timeline: FanOut enter
1584                    let fanout_enter_ts = now_ms();
1585                    if let Some(timeline) = bus.read_mut::<Timeline>() {
1586                        timeline.push(TimelineEvent::NodeEnter {
1587                            node_id: fanout_id.clone(),
1588                            node_label: "FanOut".to_string(),
1589                            timestamp: fanout_enter_ts,
1590                        });
1591                    }
1592
1593                    // Build futures for all branches.
1594                    // Each branch gets its own Bus so they can run concurrently
1595                    // without &mut Bus aliasing. Resources are shared via &Res.
1596                    let futs: Vec<_> = branches
1597                        .iter()
1598                        .enumerate()
1599                        .map(|(i, trans)| {
1600                            let branch_state = state.clone();
1601                            let branch_node_id = branch_ids[i].clone();
1602                            let trans = trans.clone();
1603
1604                            async move {
1605                                let mut branch_bus = Bus::new();
1606                                let label = trans.label();
1607                                let bus_policy = trans.bus_access_policy();
1608
1609                                branch_bus.set_access_policy(label.clone(), bus_policy);
1610                                let result = trans.run(branch_state, res, &mut branch_bus).await;
1611                                branch_bus.clear_access_policy();
1612
1613                                (i, branch_node_id, label, result)
1614                            }
1615                        })
1616                        .collect();
1617
1618                    // Run all branches concurrently within the current task
1619                    let results: Vec<(usize, String, String, Outcome<Out, E>)> =
1620                        futures_util::future::join_all(futs).await;
1621
1622                    // Timeline: record each branch's enter/exit
1623                    for (_, branch_node_id, branch_label, outcome) in &results {
1624                        if let Some(timeline) = bus.read_mut::<Timeline>() {
1625                            let ts = now_ms();
1626                            timeline.push(TimelineEvent::NodeEnter {
1627                                node_id: branch_node_id.clone(),
1628                                node_label: branch_label.clone(),
1629                                timestamp: ts,
1630                            });
1631                            timeline.push(TimelineEvent::NodeExit {
1632                                node_id: branch_node_id.clone(),
1633                                outcome_type: outcome_type_name(outcome),
1634                                duration_ms: 0,
1635                                timestamp: ts,
1636                            });
1637                        }
1638                    }
1639
1640                    // Timeline: FanOut exit
1641                    if let Some(timeline) = bus.read_mut::<Timeline>() {
1642                        timeline.push(TimelineEvent::NodeExit {
1643                            node_id: fanout_id.clone(),
1644                            outcome_type: "Next".to_string(),
1645                            duration_ms: 0,
1646                            timestamp: now_ms(),
1647                        });
1648                    }
1649
1650                    // ── Apply strategy ─────────────────────────────
1651                    let combined = match strategy {
1652                        ParallelStrategy::AllMustSucceed => {
1653                            let mut first_fault = None;
1654                            let mut first_success = None;
1655
1656                            for (_, _, _, outcome) in results {
1657                                match outcome {
1658                                    Outcome::Fault(e) => {
1659                                        if first_fault.is_none() {
1660                                            first_fault = Some(Outcome::Fault(e));
1661                                        }
1662                                    }
1663                                    Outcome::Next(val) => {
1664                                        if first_success.is_none() {
1665                                            first_success = Some(Outcome::Next(val));
1666                                        }
1667                                    }
1668                                    other => {
1669                                        // Non-Next/non-Fault outcomes (Branch, Emit, Jump)
1670                                        // treated as non-success in AllMustSucceed
1671                                        if first_fault.is_none() {
1672                                            first_fault = Some(other);
1673                                        }
1674                                    }
1675                                }
1676                            }
1677
1678                            if let Some(fault) = first_fault {
1679                                fault
1680                            } else {
1681                                first_success.unwrap_or_else(|| {
1682                                    Outcome::emit("execution.parallel.no_results", None)
1683                                })
1684                            }
1685                        }
1686                        ParallelStrategy::AnyCanFail => {
1687                            let mut first_success = None;
1688                            let mut first_fault = None;
1689
1690                            for (_, _, _, outcome) in results {
1691                                match outcome {
1692                                    Outcome::Next(val) => {
1693                                        if first_success.is_none() {
1694                                            first_success = Some(Outcome::Next(val));
1695                                        }
1696                                    }
1697                                    Outcome::Fault(e) => {
1698                                        if first_fault.is_none() {
1699                                            first_fault = Some(Outcome::Fault(e));
1700                                        }
1701                                    }
1702                                    _ => {}
1703                                }
1704                            }
1705
1706                            first_success.unwrap_or_else(|| {
1707                                first_fault.unwrap_or_else(|| {
1708                                    Outcome::emit("execution.parallel.no_results", None)
1709                                })
1710                            })
1711                        }
1712                    };
1713
1714                    // Timeline: FanIn
1715                    let fanin_enter_ts = now_ms();
1716                    if let Some(timeline) = bus.read_mut::<Timeline>() {
1717                        timeline.push(TimelineEvent::NodeEnter {
1718                            node_id: fanin_id.clone(),
1719                            node_label: "FanIn".to_string(),
1720                            timestamp: fanin_enter_ts,
1721                        });
1722                        timeline.push(TimelineEvent::NodeExit {
1723                            node_id: fanin_id.clone(),
1724                            outcome_type: outcome_type_name(&combined),
1725                            duration_ms: 0,
1726                            timestamp: fanin_enter_ts,
1727                        });
1728                    }
1729
1730                    // Persistence
1731                    if let Some(handle) = bus.read::<PersistenceHandle>() {
1732                        let trace_id = persistence_trace_id(bus);
1733                        let circuit = bus
1734                            .read::<ranvier_core::schematic::Schematic>()
1735                            .map(|s| s.name.clone())
1736                            .unwrap_or_default();
1737                        let version = bus
1738                            .read::<ranvier_core::schematic::Schematic>()
1739                            .map(|s| s.schema_version.clone())
1740                            .unwrap_or_default();
1741
1742                        persist_execution_event(
1743                            handle,
1744                            &trace_id,
1745                            &circuit,
1746                            &version,
1747                            step_idx,
1748                            Some(fanin_id.clone()),
1749                            outcome_kind_name(&combined),
1750                            Some(combined.to_json_value()),
1751                        )
1752                        .await;
1753                    }
1754
1755                    combined
1756                })
1757            },
1758        );
1759
1760        Axon {
1761            schematic,
1762            executor: next_executor,
1763            execution_mode,
1764            persistence_store,
1765            audit_sink,
1766            dlq_sink,
1767            dlq_policy,
1768            dynamic_dlq_policy,
1769            saga_policy,
1770            dynamic_saga_policy,
1771            saga_compensation_registry,
1772            iam_handle,
1773        }
1774    }
1775
1776    /// Execute the Axon with the given input and resources.
1777    pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
1778        if let ExecutionMode::Singleton {
1779            lock_key,
1780            ttl_ms,
1781            lock_provider,
1782        } = &self.execution_mode
1783        {
1784            let trace_span = tracing::info_span!("Singleton Execution", key = %lock_key);
1785            let _enter = trace_span.enter();
1786            match lock_provider.try_acquire(lock_key, *ttl_ms).await {
1787                Ok(true) => {
1788                    tracing::debug!("Successfully acquired singleton lock: {}", lock_key);
1789                }
1790                Ok(false) => {
1791                    tracing::debug!(
1792                        "Singleton lock {} already held, aborting execution.",
1793                        lock_key
1794                    );
1795                    // Emit a specific event indicating skip
1796                    return Outcome::emit(
1797                        "execution.skipped.lock_held",
1798                        Some(serde_json::json!({
1799                            "lock_key": lock_key
1800                        })),
1801                    );
1802                }
1803                Err(e) => {
1804                    tracing::error!("Failed to check singleton lock {}: {:?}", lock_key, e);
1805                    return Outcome::emit(
1806                        "execution.skipped.lock_error",
1807                        Some(serde_json::json!({
1808                            "error": e.to_string()
1809                        })),
1810                    );
1811                }
1812            }
1813        }
1814
1815        // ── IAM Boundary Check ────────────────────────────────────
1816        if let Some(iam) = &self.iam_handle {
1817            use ranvier_core::iam::{IamPolicy, IamToken, enforce_policy};
1818
1819            if matches!(iam.policy, IamPolicy::None) {
1820                // No verification required — skip
1821            } else {
1822                let token = bus.read::<IamToken>().map(|t| t.0.clone());
1823
1824                match token {
1825                    Some(raw_token) => {
1826                        match iam.verifier.verify(&raw_token).await {
1827                            Ok(identity) => {
1828                                if let Err(e) = enforce_policy(&iam.policy, &identity) {
1829                                    tracing::warn!(
1830                                        policy = ?iam.policy,
1831                                        subject = %identity.subject,
1832                                        "IAM policy enforcement failed: {}",
1833                                        e
1834                                    );
1835                                    return Outcome::emit(
1836                                        "iam.policy_denied",
1837                                        Some(serde_json::json!({
1838                                            "error": e.to_string(),
1839                                            "subject": identity.subject,
1840                                        })),
1841                                    );
1842                                }
1843                                // Insert verified identity into Bus for downstream access
1844                                bus.insert(identity);
1845                            }
1846                            Err(e) => {
1847                                tracing::warn!("IAM token verification failed: {}", e);
1848                                return Outcome::emit(
1849                                    "iam.verification_failed",
1850                                    Some(serde_json::json!({
1851                                        "error": e.to_string()
1852                                    })),
1853                                );
1854                            }
1855                        }
1856                    }
1857                    None => {
1858                        tracing::warn!("IAM policy requires token but none found in Bus");
1859                        return Outcome::emit("iam.missing_token", None);
1860                    }
1861                }
1862            }
1863        }
1864
1865        let trace_id = persistence_trace_id(bus);
1866        let label = self.schematic.name.clone();
1867
1868        // Inject DLQ config into Bus for step-level reporting
1869        if let Some(sink) = &self.dlq_sink {
1870            bus.insert(sink.clone());
1871        }
1872        // Prefer dynamic (hot-reloadable) policy if available, otherwise use static
1873        let effective_dlq_policy = self
1874            .dynamic_dlq_policy
1875            .as_ref()
1876            .map(|d| d.current())
1877            .unwrap_or_else(|| self.dlq_policy.clone());
1878        bus.insert(effective_dlq_policy);
1879        bus.insert(self.schematic.clone());
1880        let effective_saga_policy = self
1881            .dynamic_saga_policy
1882            .as_ref()
1883            .map(|d| d.current())
1884            .unwrap_or_else(|| self.saga_policy.clone());
1885        bus.insert(effective_saga_policy.clone());
1886
1887        // Initialize Saga stack if enabled and not already present (e.g. from resumption)
1888        if effective_saga_policy == SagaPolicy::Enabled && bus.read::<SagaStack>().is_none() {
1889            bus.insert(SagaStack::new());
1890        }
1891
1892        let persistence_handle = bus.read::<PersistenceHandle>().cloned();
1893        let compensation_handle = bus.read::<CompensationHandle>().cloned();
1894        let compensation_retry_policy = compensation_retry_policy(bus);
1895        let compensation_idempotency = bus.read::<CompensationIdempotencyHandle>().cloned();
1896        let version = self.schematic.schema_version.clone();
1897        let migration_registry = bus
1898            .read::<ranvier_core::schematic::MigrationRegistry>()
1899            .cloned();
1900
1901        let persistence_start_step = if let Some(handle) = persistence_handle.as_ref() {
1902            let (mut start_step, trace_version, intervention, last_node_id, mut last_payload) =
1903                load_persistence_version(handle, &trace_id).await;
1904
1905            if let Some(interv) = intervention {
1906                tracing::info!(
1907                    trace_id = %trace_id,
1908                    target_node = %interv.target_node,
1909                    "Applying manual intervention command"
1910                );
1911
1912                // Find the step index for the target node
1913                if let Some(target_idx) = self
1914                    .schematic
1915                    .nodes
1916                    .iter()
1917                    .position(|n| n.id == interv.target_node || n.label == interv.target_node)
1918                {
1919                    tracing::info!(
1920                        trace_id = %trace_id,
1921                        target_node = %interv.target_node,
1922                        target_step = target_idx,
1923                        "Intervention: Jumping to target node"
1924                    );
1925                    start_step = target_idx as u64;
1926
1927                    // Inject ManualJump into the bus so executors can handle skipping/payload overrides
1928                    bus.insert(ManualJump {
1929                        target_node: interv.target_node.clone(),
1930                        payload_override: interv.payload_override.clone(),
1931                    });
1932
1933                    // Log audit event for intervention application
1934                    if let Some(sink) = self.audit_sink.as_ref() {
1935                        let event = AuditEvent::new(
1936                            uuid::Uuid::new_v4().to_string(),
1937                            "System".to_string(),
1938                            "ApplyIntervention".to_string(),
1939                            trace_id.to_string(),
1940                        )
1941                        .with_metadata("target_node", interv.target_node.clone())
1942                        .with_metadata("target_step", target_idx);
1943
1944                        let _ = sink.append(&event).await;
1945                    }
1946                } else {
1947                    tracing::warn!(
1948                        trace_id = %trace_id,
1949                        target_node = %interv.target_node,
1950                        "Intervention target node not found in schematic; ignoring jump"
1951                    );
1952                }
1953            }
1954
1955            if let Some(old_version) = trace_version
1956                && old_version != version
1957            {
1958                tracing::info!(
1959                    trace_id = %trace_id,
1960                    old_version = %old_version,
1961                    current_version = %version,
1962                    "Version mismatch detected during resumption"
1963                );
1964
1965                // Try multi-hop migration path first, fall back to direct lookup
1966                let migration_path = migration_registry
1967                    .as_ref()
1968                    .and_then(|r| r.find_migration_path(&old_version, &version));
1969
1970                let (final_migration, mapped_payload) = if let Some(path) = migration_path {
1971                    if path.is_empty() {
1972                        (None, last_payload.clone())
1973                    } else {
1974                        // Apply payload mappers along the migration chain
1975                        let mut payload = last_payload.clone();
1976                        for hop in &path {
1977                            if let (Some(mapper), Some(p)) = (&hop.payload_mapper, payload.as_ref())
1978                            {
1979                                match mapper.map_state(p) {
1980                                    Ok(mapped) => payload = Some(mapped),
1981                                    Err(e) => {
1982                                        tracing::error!(
1983                                            trace_id = %trace_id,
1984                                            from = %hop.from_version,
1985                                            to = %hop.to_version,
1986                                            error = %e,
1987                                            "Payload migration mapper failed"
1988                                        );
1989                                        return Outcome::emit(
1990                                            "execution.resumption.payload_migration_failed",
1991                                            Some(serde_json::json!({
1992                                                "trace_id": trace_id,
1993                                                "from": hop.from_version,
1994                                                "to": hop.to_version,
1995                                                "error": e.to_string()
1996                                            })),
1997                                        );
1998                                    }
1999                                }
2000                            }
2001                        }
2002                        let hops: Vec<String> = path
2003                            .iter()
2004                            .map(|h| format!("{}->{}", h.from_version, h.to_version))
2005                            .collect();
2006                        tracing::info!(trace_id = %trace_id, hops = ?hops, "Applied multi-hop migration path");
2007                        (path.last().copied(), payload)
2008                    }
2009                } else {
2010                    (None, last_payload.clone())
2011                };
2012
2013                // Use the final migration in the path to determine strategy
2014                let migration = final_migration.or_else(|| {
2015                    migration_registry
2016                        .as_ref()
2017                        .and_then(|r| r.find_migration(&old_version, &version))
2018                });
2019
2020                // Update last_payload with mapped version
2021                if mapped_payload.is_some() {
2022                    last_payload = mapped_payload;
2023                }
2024
2025                let strategy = if let (Some(m), Some(node_id)) = (migration, last_node_id.as_ref())
2026                {
2027                    m.node_mapping
2028                        .get(node_id)
2029                        .cloned()
2030                        .unwrap_or(m.default_strategy.clone())
2031                } else {
2032                    migration
2033                        .map(|m| m.default_strategy.clone())
2034                        .unwrap_or(ranvier_core::schematic::MigrationStrategy::Fail)
2035                };
2036
2037                match strategy {
2038                    ranvier_core::schematic::MigrationStrategy::ResumeFromStart => {
2039                        tracing::info!(trace_id = %trace_id, "Applying ResumeFromStart migration strategy");
2040                        start_step = 0;
2041                    }
2042                    ranvier_core::schematic::MigrationStrategy::MigrateActiveNode {
2043                        new_node_id,
2044                        ..
2045                    } => {
2046                        tracing::info!(trace_id = %trace_id, to_node = %new_node_id, "Applying MigrateActiveNode strategy");
2047                        if let Some(target_idx) = self
2048                            .schematic
2049                            .nodes
2050                            .iter()
2051                            .position(|n| n.id == new_node_id || n.label == new_node_id)
2052                        {
2053                            start_step = target_idx as u64;
2054                        } else {
2055                            tracing::warn!(trace_id = %trace_id, "MigrateActiveNode: target node {} not found", new_node_id);
2056                            return Outcome::emit(
2057                                "execution.resumption.migration_target_not_found",
2058                                Some(serde_json::json!({ "node_id": new_node_id })),
2059                            );
2060                        }
2061                    }
2062                    ranvier_core::schematic::MigrationStrategy::FallbackToNode(node_id) => {
2063                        tracing::info!(trace_id = %trace_id, to_node = %node_id, "Applying FallbackToNode strategy");
2064                        if let Some(target_idx) = self
2065                            .schematic
2066                            .nodes
2067                            .iter()
2068                            .position(|n| n.id == node_id || n.label == node_id)
2069                        {
2070                            start_step = target_idx as u64;
2071                        } else {
2072                            tracing::warn!(trace_id = %trace_id, "FallbackToNode: node {} not found", node_id);
2073                            return Outcome::emit(
2074                                "execution.resumption.migration_target_not_found",
2075                                Some(serde_json::json!({ "node_id": node_id })),
2076                            );
2077                        }
2078                    }
2079                    ranvier_core::schematic::MigrationStrategy::Fail => {
2080                        tracing::error!(trace_id = %trace_id, "Version mismatch: no migration path found. Failing resumption.");
2081                        return Outcome::emit(
2082                            "execution.resumption.version_mismatch_failed",
2083                            Some(serde_json::json!({
2084                                "trace_id": trace_id,
2085                                "old_version": old_version,
2086                                "current_version": version
2087                            })),
2088                        );
2089                    }
2090                    _ => {
2091                        tracing::error!(trace_id = %trace_id, "Unsupported migration strategy: {:?}", strategy);
2092                        return Outcome::emit(
2093                            "execution.resumption.unsupported_migration",
2094                            Some(serde_json::json!({
2095                                "trace_id": trace_id,
2096                                "strategy": format!("{:?}", strategy)
2097                            })),
2098                        );
2099                    }
2100                }
2101            }
2102
2103            let ingress_node_id = self.schematic.nodes.first().map(|n| n.id.clone());
2104            persist_execution_event(
2105                handle,
2106                &trace_id,
2107                &label,
2108                &version,
2109                start_step,
2110                ingress_node_id,
2111                "Enter",
2112                None,
2113            )
2114            .await;
2115
2116            bus.insert(StartStep(start_step));
2117            if start_step > 0 {
2118                bus.insert(ResumptionState {
2119                    payload: last_payload,
2120                });
2121            }
2122
2123            Some(start_step)
2124        } else {
2125            None
2126        };
2127
2128        let should_capture = should_attach_timeline(bus);
2129        let inserted_timeline = if should_capture {
2130            ensure_timeline(bus)
2131        } else {
2132            false
2133        };
2134        let ingress_started = std::time::Instant::now();
2135        let ingress_enter_ts = now_ms();
2136        if should_capture
2137            && let (Some(timeline), Some(ingress)) =
2138                (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
2139        {
2140            timeline.push(TimelineEvent::NodeEnter {
2141                node_id: ingress.id.clone(),
2142                node_label: ingress.label.clone(),
2143                timestamp: ingress_enter_ts,
2144            });
2145        }
2146
2147        let circuit_span = tracing::info_span!(
2148            "Circuit",
2149            ranvier.circuit = %label,
2150            ranvier.outcome_kind = tracing::field::Empty,
2151            ranvier.outcome_target = tracing::field::Empty
2152        );
2153        let outcome = (self.executor)(input, resources, bus)
2154            .instrument(circuit_span.clone())
2155            .await;
2156        circuit_span.record("ranvier.outcome_kind", outcome_kind_name(&outcome));
2157        if let Some(target) = outcome_target(&outcome) {
2158            circuit_span.record("ranvier.outcome_target", tracing::field::display(&target));
2159        }
2160
2161        // Automated Saga Rollback (LIFO)
2162        if matches!(outcome, Outcome::Fault(_)) && self.saga_policy == SagaPolicy::Enabled {
2163            while let Some(task) = {
2164                let mut stack = bus.read_mut::<SagaStack>();
2165                stack.as_mut().and_then(|s| s.pop())
2166            } {
2167                tracing::info!(trace_id = %trace_id, node_id = %task.node_id, "Compensating step: {}", task.node_label);
2168
2169                let handler = {
2170                    let registry = self.saga_compensation_registry.read().expect("saga compensation registry lock poisoned");
2171                    registry.get(&task.node_id)
2172                };
2173                if let Some(handler) = handler {
2174                    let res = handler(task.input_snapshot, resources, bus).await;
2175                    if let Outcome::Fault(e) = res {
2176                        tracing::error!(trace_id = %trace_id, node_id = %task.node_id, "Saga compensation FAILED: {:?}", e);
2177                    }
2178                } else {
2179                    tracing::warn!(trace_id = %trace_id, node_id = %task.node_id, "No compensation handler found in registry for saga rollback");
2180                }
2181            }
2182            tracing::info!(trace_id = %trace_id, "Saga automated rollback completed");
2183        }
2184
2185        let ingress_exit_ts = now_ms();
2186        if should_capture
2187            && let (Some(timeline), Some(ingress)) =
2188                (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
2189        {
2190            timeline.push(TimelineEvent::NodeExit {
2191                node_id: ingress.id.clone(),
2192                outcome_type: outcome_type_name(&outcome),
2193                duration_ms: ingress_started.elapsed().as_millis() as u64,
2194                timestamp: ingress_exit_ts,
2195            });
2196        }
2197
2198        if let Some(handle) = persistence_handle.as_ref() {
2199            let fault_step = persistence_start_step.map(|s| s + 1).unwrap_or(1);
2200            persist_execution_event(
2201                handle,
2202                &trace_id,
2203                &label,
2204                &version,
2205                fault_step,
2206                None, // Outcome-level events might not have a single node_id context here
2207                outcome_kind_name(&outcome),
2208                Some(outcome.to_json_value()),
2209            )
2210            .await;
2211
2212            let mut completion = completion_from_outcome(&outcome);
2213            if matches!(outcome, Outcome::Fault(_))
2214                && let Some(compensation) = compensation_handle.as_ref()
2215                && compensation_auto_trigger(bus)
2216            {
2217                let context = CompensationContext {
2218                    trace_id: trace_id.clone(),
2219                    circuit: label.clone(),
2220                    fault_kind: outcome_kind_name(&outcome).to_string(),
2221                    fault_step,
2222                    timestamp_ms: now_ms(),
2223                };
2224
2225                if run_compensation(
2226                    compensation,
2227                    context,
2228                    compensation_retry_policy,
2229                    compensation_idempotency.clone(),
2230                )
2231                .await
2232                {
2233                    persist_execution_event(
2234                        handle,
2235                        &trace_id,
2236                        &label,
2237                        &version,
2238                        fault_step.saturating_add(1),
2239                        None,
2240                        "Compensated",
2241                        None,
2242                    )
2243                    .await;
2244                    completion = CompletionState::Compensated;
2245                }
2246            }
2247
2248            if persistence_auto_complete(bus) {
2249                persist_completion(handle, &trace_id, completion).await;
2250            }
2251        }
2252
2253        if should_capture {
2254            maybe_export_timeline(bus, &outcome);
2255        }
2256        if inserted_timeline {
2257            let _ = bus.remove::<Timeline>();
2258        }
2259
2260        outcome
2261    }
2262
2263    /// Starts the Ranvier Inspector for this Axon on the specified port.
2264    /// This spawns a background task to serve the Schematic.
2265    pub fn serve_inspector(self, port: u16) -> Self {
2266        if !inspector_dev_mode_from_env() {
2267            tracing::info!("Inspector disabled because RANVIER_MODE is production");
2268            return self;
2269        }
2270        if !inspector_enabled_from_env() {
2271            tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
2272            return self;
2273        }
2274
2275        let schematic = self.schematic.clone();
2276        let axon_inspector = Arc::new(self.clone());
2277        tokio::spawn(async move {
2278            if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
2279                .with_projection_files_from_env()
2280                .with_mode_from_env()
2281                .with_auth_policy_from_env()
2282                .with_state_inspector(axon_inspector)
2283                .serve()
2284                .await
2285            {
2286                tracing::error!("Inspector server failed: {}", e);
2287            }
2288        });
2289        self
2290    }
2291
2292    /// Get a reference to the Schematic (structural view).
2293    pub fn schematic(&self) -> &Schematic {
2294        &self.schematic
2295    }
2296
2297    /// Consume and return the Schematic.
2298    pub fn into_schematic(self) -> Schematic {
2299        self.schematic
2300    }
2301
2302    /// Detect schematic export mode from runtime flags.
2303    ///
2304    /// Supported triggers:
2305    /// - `RANVIER_SCHEMATIC=1|true|on|yes`
2306    /// - `--schematic`
2307    ///
2308    /// Optional output path:
2309    /// - `RANVIER_SCHEMATIC_OUTPUT=<path>`
2310    /// - `--schematic-output <path>` / `--schematic-output=<path>`
2311    /// - `--output <path>` / `--output=<path>` (only relevant in schematic mode)
2312    pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
2313        schematic_export_request_from_process()
2314    }
2315
2316    /// Export schematic and return `true` when schematic mode is active.
2317    ///
2318    /// Use this once after circuit construction and before server/custom loops:
2319    ///
2320    /// ```rust,ignore
2321    /// let axon = build_axon();
2322    /// if axon.maybe_export_and_exit()? {
2323    ///     return Ok(());
2324    /// }
2325    /// // Normal runtime path...
2326    /// ```
2327    pub fn maybe_export_and_exit(&self) -> anyhow::Result<bool> {
2328        self.maybe_export_and_exit_with(|_| ())
2329    }
2330
2331    /// Same as [`Self::maybe_export_and_exit`] but allows a custom hook right before export/exit.
2332    ///
2333    /// This is useful when your app has custom loop/bootstrap behavior and you want
2334    /// to skip or cleanup that logic in schematic mode.
2335    pub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> anyhow::Result<bool>
2336    where
2337        F: FnOnce(&SchematicExportRequest),
2338    {
2339        let Some(request) = self.schematic_export_request() else {
2340            return Ok(false);
2341        };
2342        on_before_exit(&request);
2343        self.export_schematic(&request)?;
2344        Ok(true)
2345    }
2346
2347    /// Export schematic according to the provided request.
2348    pub fn export_schematic(&self, request: &SchematicExportRequest) -> anyhow::Result<()> {
2349        let json = serde_json::to_string_pretty(self.schematic())?;
2350        if let Some(path) = &request.output {
2351            if let Some(parent) = path.parent()
2352                && !parent.as_os_str().is_empty()
2353            {
2354                fs::create_dir_all(parent)?;
2355            }
2356            fs::write(path, json.as_bytes())?;
2357            return Ok(());
2358        }
2359        println!("{}", json);
2360        Ok(())
2361    }
2362}
2363
2364fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
2365    let args: Vec<OsString> = std::env::args_os().skip(1).collect();
2366    let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
2367    let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
2368
2369    let mut i = 0;
2370    while i < args.len() {
2371        let arg = args[i].to_string_lossy();
2372
2373        if arg == "--schematic" {
2374            enabled = true;
2375            i += 1;
2376            continue;
2377        }
2378
2379        if arg == "--schematic-output" || arg == "--output" {
2380            if let Some(next) = args.get(i + 1) {
2381                output = Some(PathBuf::from(next));
2382                i += 2;
2383                continue;
2384            }
2385        } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
2386            output = Some(PathBuf::from(value));
2387        } else if let Some(value) = arg.strip_prefix("--output=") {
2388            output = Some(PathBuf::from(value));
2389        }
2390
2391        i += 1;
2392    }
2393
2394    if enabled {
2395        Some(SchematicExportRequest { output })
2396    } else {
2397        None
2398    }
2399}
2400
2401fn env_flag_is_true(key: &str) -> bool {
2402    match std::env::var(key) {
2403        Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
2404        Err(_) => false,
2405    }
2406}
2407
2408fn inspector_enabled_from_env() -> bool {
2409    let raw = std::env::var("RANVIER_INSPECTOR").ok();
2410    inspector_enabled_from_value(raw.as_deref())
2411}
2412
2413fn inspector_enabled_from_value(value: Option<&str>) -> bool {
2414    match value {
2415        Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
2416        None => true,
2417    }
2418}
2419
2420fn inspector_dev_mode_from_env() -> bool {
2421    let raw = std::env::var("RANVIER_MODE").ok();
2422    inspector_dev_mode_from_value(raw.as_deref())
2423}
2424
2425fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
2426    !matches!(
2427        value.map(|v| v.to_ascii_lowercase()),
2428        Some(mode) if mode == "prod" || mode == "production"
2429    )
2430}
2431
2432fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
2433    let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
2434        Ok(v) if !v.trim().is_empty() => v,
2435        _ => return,
2436    };
2437
2438    let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
2439    let policy = timeline_adaptive_policy();
2440    let forced = should_force_export(outcome, &policy);
2441    let should_export = sampled || forced;
2442    if !should_export {
2443        record_sampling_stats(false, sampled, forced, "none", &policy);
2444        return;
2445    }
2446
2447    let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
2448    timeline.sort();
2449
2450    let mode = std::env::var("RANVIER_TIMELINE_MODE")
2451        .unwrap_or_else(|_| "overwrite".to_string())
2452        .to_ascii_lowercase();
2453
2454    if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
2455        tracing::warn!(
2456            "Failed to persist timeline file {} (mode={}): {}",
2457            path,
2458            mode,
2459            err
2460        );
2461        record_sampling_stats(false, sampled, forced, &mode, &policy);
2462    } else {
2463        record_sampling_stats(true, sampled, forced, &mode, &policy);
2464    }
2465}
2466
2467fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
2468    match outcome {
2469        Outcome::Next(_) => "Next".to_string(),
2470        Outcome::Branch(id, _) => format!("Branch:{}", id),
2471        Outcome::Jump(id, _) => format!("Jump:{}", id),
2472        Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
2473        Outcome::Fault(_) => "Fault".to_string(),
2474    }
2475}
2476
2477fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
2478    match outcome {
2479        Outcome::Next(_) => "Next",
2480        Outcome::Branch(_, _) => "Branch",
2481        Outcome::Jump(_, _) => "Jump",
2482        Outcome::Emit(_, _) => "Emit",
2483        Outcome::Fault(_) => "Fault",
2484    }
2485}
2486
2487fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
2488    match outcome {
2489        Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
2490        Outcome::Jump(node_id, _) => Some(node_id.to_string()),
2491        Outcome::Emit(event_type, _) => Some(event_type.clone()),
2492        Outcome::Next(_) | Outcome::Fault(_) => None,
2493    }
2494}
2495
2496fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
2497    match outcome {
2498        Outcome::Fault(_) => CompletionState::Fault,
2499        _ => CompletionState::Success,
2500    }
2501}
2502
2503fn persistence_trace_id(bus: &Bus) -> String {
2504    if let Some(explicit) = bus.read::<PersistenceTraceId>() {
2505        explicit.0.clone()
2506    } else {
2507        format!("{}:{}", bus.id, now_ms())
2508    }
2509}
2510
2511fn persistence_auto_complete(bus: &Bus) -> bool {
2512    bus.read::<PersistenceAutoComplete>()
2513        .map(|v| v.0)
2514        .unwrap_or(true)
2515}
2516
2517fn compensation_auto_trigger(bus: &Bus) -> bool {
2518    bus.read::<CompensationAutoTrigger>()
2519        .map(|v| v.0)
2520        .unwrap_or(true)
2521}
2522
2523fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
2524    bus.read::<CompensationRetryPolicy>()
2525        .copied()
2526        .unwrap_or_default()
2527}
2528
2529/// Unwrap the Outcome enum layer from a persisted event payload.
2530///
2531/// Events are stored with `outcome.to_json_value()` which serializes the full
2532/// Outcome enum, e.g. `{"Next": {"order_id": "1001", ...}}`. The resumption
2533/// handler expects the raw inner value, so we extract it here.
2534fn unwrap_outcome_payload(payload: Option<&serde_json::Value>) -> Option<serde_json::Value> {
2535    payload.map(|p| {
2536        p.get("Next")
2537            .or_else(|| p.get("Branch"))
2538            .or_else(|| p.get("Jump"))
2539            .cloned()
2540            .unwrap_or_else(|| p.clone())
2541    })
2542}
2543
2544async fn load_persistence_version(
2545    handle: &PersistenceHandle,
2546    trace_id: &str,
2547) -> (
2548    u64,
2549    Option<String>,
2550    Option<crate::persistence::Intervention>,
2551    Option<String>,
2552    Option<serde_json::Value>,
2553) {
2554    let store = handle.store();
2555    match store.load(trace_id).await {
2556        Ok(Some(trace)) => {
2557            let (next_step, last_node_id, last_payload) =
2558                if let Some(resume_from_step) = trace.resumed_from_step {
2559                    let anchor_event = trace
2560                        .events
2561                        .iter()
2562                        .rev()
2563                        .find(|event| {
2564                            event.step <= resume_from_step
2565                                && event.outcome_kind == "Next"
2566                                && event.payload.is_some()
2567                        })
2568                        .or_else(|| {
2569                            trace.events.iter().rev().find(|event| {
2570                                event.step <= resume_from_step
2571                                    && event.outcome_kind != "Fault"
2572                                    && event.payload.is_some()
2573                            })
2574                        })
2575                        .or_else(|| {
2576                            trace.events.iter().rev().find(|event| {
2577                                event.step <= resume_from_step && event.payload.is_some()
2578                            })
2579                        })
2580                        .or_else(|| trace.events.last());
2581
2582                    (
2583                        resume_from_step.saturating_add(1),
2584                        anchor_event.and_then(|event| event.node_id.clone()),
2585                        anchor_event.and_then(|event| {
2586                            unwrap_outcome_payload(event.payload.as_ref())
2587                        }),
2588                    )
2589                } else {
2590                    let last_event = trace.events.last();
2591                    (
2592                        last_event
2593                            .map(|event| event.step.saturating_add(1))
2594                            .unwrap_or(0),
2595                        last_event.and_then(|event| event.node_id.clone()),
2596                        last_event.and_then(|event| {
2597                            unwrap_outcome_payload(event.payload.as_ref())
2598                        }),
2599                    )
2600                };
2601
2602            (
2603                next_step,
2604                Some(trace.schematic_version),
2605                trace.interventions.last().cloned(),
2606                last_node_id,
2607                last_payload,
2608            )
2609        }
2610        Ok(None) => (0, None, None, None, None),
2611        Err(err) => {
2612            tracing::warn!(
2613                trace_id = %trace_id,
2614                error = %err,
2615                "Failed to load persistence trace; falling back to step=0"
2616            );
2617            (0, None, None, None, None)
2618        }
2619    }
2620}
2621
2622#[allow(clippy::too_many_arguments)]
2623async fn run_this_step<In, Out, E, Res>(
2624    trans: &(impl Transition<In, Out, Resources = Res, Error = E> + Clone + 'static),
2625    state: In,
2626    res: &Res,
2627    bus: &mut Bus,
2628    node_id: &str,
2629    node_label: &str,
2630    bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2631    step_idx: u64,
2632) -> Outcome<Out, E>
2633where
2634    In: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2635    Out: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2636    E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2637    Res: ranvier_core::transition::ResourceRequirement,
2638{
2639    let label = trans.label();
2640    let res_type = std::any::type_name::<Res>()
2641        .split("::")
2642        .last()
2643        .unwrap_or("unknown");
2644
2645    // Debug pausing
2646    let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2647        debug.should_pause(node_id)
2648    } else {
2649        false
2650    };
2651
2652    if should_pause {
2653        let trace_id = persistence_trace_id(bus);
2654        tracing::event!(
2655            target: "ranvier.debugger",
2656            tracing::Level::INFO,
2657            trace_id = %trace_id,
2658            node_id = %node_id,
2659            "Node paused"
2660        );
2661
2662        if let Some(timeline) = bus.read_mut::<Timeline>() {
2663            timeline.push(TimelineEvent::NodePaused {
2664                node_id: node_id.to_string(),
2665                timestamp: now_ms(),
2666            });
2667        }
2668        if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2669            debug.wait().await;
2670        }
2671    }
2672
2673    let enter_ts = now_ms();
2674    if let Some(timeline) = bus.read_mut::<Timeline>() {
2675        timeline.push(TimelineEvent::NodeEnter {
2676            node_id: node_id.to_string(),
2677            node_label: node_label.to_string(),
2678            timestamp: enter_ts,
2679        });
2680    }
2681
2682    // Check DLQ retry policy and pre-serialize state for potential retries
2683    let dlq_retry_config = bus.read::<DlqPolicy>().and_then(|p| {
2684        if let DlqPolicy::RetryThenDlq {
2685            max_attempts,
2686            backoff_ms,
2687        } = p
2688        {
2689            Some((*max_attempts, *backoff_ms))
2690        } else {
2691            None
2692        }
2693    });
2694    let retry_state_snapshot = if dlq_retry_config.is_some() {
2695        serde_json::to_value(&state).ok()
2696    } else {
2697        None
2698    };
2699
2700    // State capture for Saga - SERIALIZE BEFORE CONSUMPTION
2701    let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2702        Some(serde_json::to_vec(&state).unwrap_or_default())
2703    } else {
2704        None
2705    };
2706
2707    let node_span = tracing::info_span!(
2708        "Node",
2709        ranvier.node = %label,
2710        ranvier.resource_type = %res_type,
2711        ranvier.outcome_kind = tracing::field::Empty,
2712        ranvier.outcome_target = tracing::field::Empty
2713    );
2714    let started = std::time::Instant::now();
2715    bus.set_access_policy(label.clone(), bus_policy.clone());
2716    let result = trans
2717        .run(state, res, bus)
2718        .instrument(node_span.clone())
2719        .await;
2720    bus.clear_access_policy();
2721
2722    // DLQ Retry loop: if first attempt faulted and RetryThenDlq is configured,
2723    // retry with exponential backoff before giving up.
2724    let result = if let Outcome::Fault(_) = &result {
2725        if let (Some((max_attempts, backoff_ms)), Some(snapshot)) =
2726            (dlq_retry_config, &retry_state_snapshot)
2727        {
2728            let mut final_result = result;
2729            // attempt 1 already done; retry from 2..=max_attempts
2730            for attempt in 2..=max_attempts {
2731                let delay = backoff_ms.saturating_mul(2u64.saturating_pow(attempt - 2));
2732
2733                tracing::info!(
2734                    ranvier.node = %label,
2735                    attempt = attempt,
2736                    max_attempts = max_attempts,
2737                    backoff_ms = delay,
2738                    "Retrying faulted node"
2739                );
2740
2741                if let Some(timeline) = bus.read_mut::<Timeline>() {
2742                    timeline.push(TimelineEvent::NodeRetry {
2743                        node_id: node_id.to_string(),
2744                        attempt,
2745                        max_attempts,
2746                        backoff_ms: delay,
2747                        timestamp: now_ms(),
2748                    });
2749                }
2750
2751                tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
2752
2753                if let Ok(retry_state) = serde_json::from_value::<In>(snapshot.clone()) {
2754                    bus.set_access_policy(label.clone(), bus_policy.clone());
2755                    let retry_result = trans
2756                        .run(retry_state, res, bus)
2757                        .instrument(tracing::info_span!(
2758                            "NodeRetry",
2759                            ranvier.node = %label,
2760                            attempt = attempt
2761                        ))
2762                        .await;
2763                    bus.clear_access_policy();
2764
2765                    match &retry_result {
2766                        Outcome::Fault(_) => {
2767                            final_result = retry_result;
2768                        }
2769                        _ => {
2770                            final_result = retry_result;
2771                            break;
2772                        }
2773                    }
2774                }
2775            }
2776            final_result
2777        } else {
2778            result
2779        }
2780    } else {
2781        result
2782    };
2783
2784    node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
2785    if let Some(target) = outcome_target(&result) {
2786        node_span.record("ranvier.outcome_target", tracing::field::display(&target));
2787    }
2788
2789    // Inject TransitionErrorContext on fault
2790    if let Outcome::Fault(ref err) = result {
2791        let pipeline_name = bus
2792            .read::<ranvier_core::schematic::Schematic>()
2793            .map(|s| s.name.clone())
2794            .unwrap_or_default();
2795        let ctx = ranvier_core::error::TransitionErrorContext {
2796            pipeline_name: pipeline_name.clone(),
2797            transition_name: label.clone(),
2798            step_index: step_idx as usize,
2799        };
2800        tracing::error!(
2801            pipeline = %pipeline_name,
2802            transition = %label,
2803            step = step_idx,
2804            error = ?err,
2805            "Transition fault"
2806        );
2807        bus.insert(ctx);
2808    }
2809
2810    let duration_ms = started.elapsed().as_millis() as u64;
2811    let exit_ts = now_ms();
2812
2813    if let Some(timeline) = bus.read_mut::<Timeline>() {
2814        timeline.push(TimelineEvent::NodeExit {
2815            node_id: node_id.to_string(),
2816            outcome_type: outcome_type_name(&result),
2817            duration_ms,
2818            timestamp: exit_ts,
2819        });
2820
2821        if let Outcome::Branch(branch_id, _) = &result {
2822            timeline.push(TimelineEvent::Branchtaken {
2823                branch_id: branch_id.clone(),
2824                timestamp: exit_ts,
2825            });
2826        }
2827    }
2828
2829    // Push to Saga Stack if Next outcome and snapshot taken
2830    if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot)
2831        && let Some(stack) = bus.read_mut::<SagaStack>()
2832    {
2833        stack.push(node_id.to_string(), label.clone(), snapshot);
2834    }
2835
2836    if let Some(handle) = bus.read::<PersistenceHandle>() {
2837        let trace_id = persistence_trace_id(bus);
2838        let circuit = bus
2839            .read::<ranvier_core::schematic::Schematic>()
2840            .map(|s| s.name.clone())
2841            .unwrap_or_default();
2842        let version = bus
2843            .read::<ranvier_core::schematic::Schematic>()
2844            .map(|s| s.schema_version.clone())
2845            .unwrap_or_default();
2846
2847        persist_execution_event(
2848            handle,
2849            &trace_id,
2850            &circuit,
2851            &version,
2852            step_idx,
2853            Some(node_id.to_string()),
2854            outcome_kind_name(&result),
2855            Some(result.to_json_value()),
2856        )
2857        .await;
2858    }
2859
2860    // DLQ reporting — only fires after all retries are exhausted (RetryThenDlq)
2861    // or immediately (SendToDlq). Drop policy skips entirely.
2862    if let Outcome::Fault(f) = &result {
2863        // Read policy and sink, then drop the borrows before mutable timeline access
2864        let dlq_action = {
2865            let policy = bus.read::<DlqPolicy>();
2866            let sink = bus.read::<Arc<dyn DlqSink>>();
2867            match (sink, policy) {
2868                (Some(s), Some(p)) if !matches!(p, DlqPolicy::Drop) => Some(s.clone()),
2869                _ => None,
2870            }
2871        };
2872
2873        if let Some(sink) = dlq_action {
2874            if let Some((max_attempts, _)) = dlq_retry_config
2875                && let Some(timeline) = bus.read_mut::<Timeline>()
2876            {
2877                timeline.push(TimelineEvent::DlqExhausted {
2878                    node_id: node_id.to_string(),
2879                    total_attempts: max_attempts,
2880                    timestamp: now_ms(),
2881                });
2882            }
2883
2884            let trace_id = persistence_trace_id(bus);
2885            let circuit = bus
2886                .read::<ranvier_core::schematic::Schematic>()
2887                .map(|s| s.name.clone())
2888                .unwrap_or_default();
2889            let _ = sink
2890                .store_dead_letter(
2891                    &trace_id,
2892                    &circuit,
2893                    node_id,
2894                    &format!("{:?}", f),
2895                    &serde_json::to_vec(&f).unwrap_or_default(),
2896                )
2897                .await;
2898        }
2899    }
2900
2901    result
2902}
2903
2904#[allow(clippy::too_many_arguments)]
2905async fn run_this_compensated_step<Out, Next, E, Res, Comp>(
2906    trans: &(impl Transition<Out, Next, Resources = Res, Error = E> + Clone + 'static),
2907    comp: &Comp,
2908    state: Out,
2909    res: &Res,
2910    bus: &mut Bus,
2911    node_id: &str,
2912    comp_node_id: &str,
2913    node_label: &str,
2914    bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2915    step_idx: u64,
2916) -> Outcome<Next, E>
2917where
2918    Out: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
2919    Next: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2920    E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2921    Res: ranvier_core::transition::ResourceRequirement,
2922    Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
2923{
2924    let label = trans.label();
2925
2926    // Debug pausing
2927    let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2928        debug.should_pause(node_id)
2929    } else {
2930        false
2931    };
2932
2933    if should_pause {
2934        let trace_id = persistence_trace_id(bus);
2935        tracing::event!(
2936            target: "ranvier.debugger",
2937            tracing::Level::INFO,
2938            trace_id = %trace_id,
2939            node_id = %node_id,
2940            "Node paused (compensated)"
2941        );
2942
2943        if let Some(timeline) = bus.read_mut::<Timeline>() {
2944            timeline.push(TimelineEvent::NodePaused {
2945                node_id: node_id.to_string(),
2946                timestamp: now_ms(),
2947            });
2948        }
2949        if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2950            debug.wait().await;
2951        }
2952    }
2953
2954    let enter_ts = now_ms();
2955    if let Some(timeline) = bus.read_mut::<Timeline>() {
2956        timeline.push(TimelineEvent::NodeEnter {
2957            node_id: node_id.to_string(),
2958            node_label: node_label.to_string(),
2959            timestamp: enter_ts,
2960        });
2961    }
2962
2963    // State capture for Saga - SERIALIZE BEFORE CONSUMPTION
2964    let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2965        Some(serde_json::to_vec(&state).unwrap_or_default())
2966    } else {
2967        None
2968    };
2969
2970    let node_span = tracing::info_span!("Node", ranvier.node = %label);
2971    bus.set_access_policy(label.clone(), bus_policy.clone());
2972    let result = trans
2973        .run(state.clone(), res, bus)
2974        .instrument(node_span)
2975        .await;
2976    bus.clear_access_policy();
2977
2978    let duration_ms = 0; // Simplified
2979    let exit_ts = now_ms();
2980
2981    if let Some(timeline) = bus.read_mut::<Timeline>() {
2982        timeline.push(TimelineEvent::NodeExit {
2983            node_id: node_id.to_string(),
2984            outcome_type: outcome_type_name(&result),
2985            duration_ms,
2986            timestamp: exit_ts,
2987        });
2988    }
2989
2990    // Automated Compensation Trigger
2991    if let Outcome::Fault(ref err) = result {
2992        if compensation_auto_trigger(bus) {
2993            tracing::info!(
2994                ranvier.node = %label,
2995                ranvier.compensation.trigger = "saga",
2996                error = ?err,
2997                "Saga compensation triggered"
2998            );
2999
3000            if let Some(timeline) = bus.read_mut::<Timeline>() {
3001                timeline.push(TimelineEvent::NodeEnter {
3002                    node_id: comp_node_id.to_string(),
3003                    node_label: format!("Compensate: {}", comp.label()),
3004                    timestamp: exit_ts,
3005                });
3006            }
3007
3008            // Run compensation
3009            let _ = comp.run(state, res, bus).await;
3010
3011            if let Some(timeline) = bus.read_mut::<Timeline>() {
3012                timeline.push(TimelineEvent::NodeExit {
3013                    node_id: comp_node_id.to_string(),
3014                    outcome_type: "Compensated".to_string(),
3015                    duration_ms: 0,
3016                    timestamp: now_ms(),
3017                });
3018            }
3019
3020            if let Some(handle) = bus.read::<PersistenceHandle>() {
3021                let trace_id = persistence_trace_id(bus);
3022                let circuit = bus
3023                    .read::<ranvier_core::schematic::Schematic>()
3024                    .map(|s| s.name.clone())
3025                    .unwrap_or_default();
3026                let version = bus
3027                    .read::<ranvier_core::schematic::Schematic>()
3028                    .map(|s| s.schema_version.clone())
3029                    .unwrap_or_default();
3030
3031                persist_execution_event(
3032                    handle,
3033                    &trace_id,
3034                    &circuit,
3035                    &version,
3036                    step_idx + 1, // Compensation node index
3037                    Some(comp_node_id.to_string()),
3038                    "Compensated",
3039                    None,
3040                )
3041                .await;
3042            }
3043        }
3044    } else if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot) {
3045        // Push to Saga Stack if Next outcome and snapshot taken
3046        if let Some(stack) = bus.read_mut::<SagaStack>() {
3047            stack.push(node_id.to_string(), label.clone(), snapshot);
3048        }
3049
3050        if let Some(handle) = bus.read::<PersistenceHandle>() {
3051            let trace_id = persistence_trace_id(bus);
3052            let circuit = bus
3053                .read::<ranvier_core::schematic::Schematic>()
3054                .map(|s| s.name.clone())
3055                .unwrap_or_default();
3056            let version = bus
3057                .read::<ranvier_core::schematic::Schematic>()
3058                .map(|s| s.schema_version.clone())
3059                .unwrap_or_default();
3060
3061            persist_execution_event(
3062                handle,
3063                &trace_id,
3064                &circuit,
3065                &version,
3066                step_idx,
3067                Some(node_id.to_string()),
3068                outcome_kind_name(&result),
3069                Some(result.to_json_value()),
3070            )
3071            .await;
3072        }
3073    }
3074
3075    // DLQ reporting for compensated steps
3076    if let Outcome::Fault(f) = &result
3077        && let (Some(sink), Some(policy)) =
3078            (bus.read::<Arc<dyn DlqSink>>(), bus.read::<DlqPolicy>())
3079    {
3080        let should_dlq = !matches!(policy, DlqPolicy::Drop);
3081        if should_dlq {
3082            let trace_id = persistence_trace_id(bus);
3083            let circuit = bus
3084                .read::<ranvier_core::schematic::Schematic>()
3085                .map(|s| s.name.clone())
3086                .unwrap_or_default();
3087            let _ = sink
3088                .store_dead_letter(
3089                    &trace_id,
3090                    &circuit,
3091                    node_id,
3092                    &format!("{:?}", f),
3093                    &serde_json::to_vec(&f).unwrap_or_default(),
3094                )
3095                .await;
3096        }
3097    }
3098
3099    result
3100}
3101
3102#[allow(clippy::too_many_arguments)]
3103pub async fn persist_execution_event(
3104    handle: &PersistenceHandle,
3105    trace_id: &str,
3106    circuit: &str,
3107    schematic_version: &str,
3108    step: u64,
3109    node_id: Option<String>,
3110    outcome_kind: &str,
3111    payload: Option<serde_json::Value>,
3112) {
3113    let store = handle.store();
3114    let envelope = PersistenceEnvelope {
3115        trace_id: trace_id.to_string(),
3116        circuit: circuit.to_string(),
3117        schematic_version: schematic_version.to_string(),
3118        step,
3119        node_id,
3120        outcome_kind: outcome_kind.to_string(),
3121        timestamp_ms: now_ms(),
3122        payload_hash: None,
3123        payload,
3124    };
3125
3126    if let Err(err) = store.append(envelope).await {
3127        tracing::warn!(
3128            trace_id = %trace_id,
3129            circuit = %circuit,
3130            step,
3131            outcome_kind = %outcome_kind,
3132            error = %err,
3133            "Failed to append persistence envelope"
3134        );
3135    }
3136}
3137
3138async fn persist_completion(
3139    handle: &PersistenceHandle,
3140    trace_id: &str,
3141    completion: CompletionState,
3142) {
3143    let store = handle.store();
3144    if let Err(err) = store.complete(trace_id, completion).await {
3145        tracing::warn!(
3146            trace_id = %trace_id,
3147            error = %err,
3148            "Failed to complete persistence trace"
3149        );
3150    }
3151}
3152
3153fn compensation_idempotency_key(context: &CompensationContext) -> String {
3154    format!(
3155        "{}:{}:{}",
3156        context.trace_id, context.circuit, context.fault_kind
3157    )
3158}
3159
3160async fn run_compensation(
3161    handle: &CompensationHandle,
3162    context: CompensationContext,
3163    retry_policy: CompensationRetryPolicy,
3164    idempotency: Option<CompensationIdempotencyHandle>,
3165) -> bool {
3166    let hook = handle.hook();
3167    let key = compensation_idempotency_key(&context);
3168
3169    if let Some(store_handle) = idempotency.as_ref() {
3170        let store = store_handle.store();
3171        match store.was_compensated(&key).await {
3172            Ok(true) => {
3173                tracing::info!(
3174                    trace_id = %context.trace_id,
3175                    circuit = %context.circuit,
3176                    key = %key,
3177                    "Compensation already recorded; skipping duplicate hook execution"
3178                );
3179                return true;
3180            }
3181            Ok(false) => {}
3182            Err(err) => {
3183                tracing::warn!(
3184                    trace_id = %context.trace_id,
3185                    key = %key,
3186                    error = %err,
3187                    "Failed to check compensation idempotency state"
3188                );
3189            }
3190        }
3191    }
3192
3193    let max_attempts = retry_policy.max_attempts.max(1);
3194    for attempt in 1..=max_attempts {
3195        match hook.compensate(context.clone()).await {
3196            Ok(()) => {
3197                if let Some(store_handle) = idempotency.as_ref() {
3198                    let store = store_handle.store();
3199                    if let Err(err) = store.mark_compensated(&key).await {
3200                        tracing::warn!(
3201                            trace_id = %context.trace_id,
3202                            key = %key,
3203                            error = %err,
3204                            "Failed to mark compensation idempotency state"
3205                        );
3206                    }
3207                }
3208                return true;
3209            }
3210            Err(err) => {
3211                let is_last = attempt == max_attempts;
3212                tracing::warn!(
3213                    trace_id = %context.trace_id,
3214                    circuit = %context.circuit,
3215                    fault_kind = %context.fault_kind,
3216                    fault_step = context.fault_step,
3217                    attempt,
3218                    max_attempts,
3219                    error = %err,
3220                    "Compensation hook attempt failed"
3221                );
3222                if !is_last && retry_policy.backoff_ms > 0 {
3223                    tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
3224                        .await;
3225                }
3226            }
3227        }
3228    }
3229    false
3230}
3231
3232fn ensure_timeline(bus: &mut Bus) -> bool {
3233    if bus.has::<Timeline>() {
3234        false
3235    } else {
3236        bus.insert(Timeline::new());
3237        true
3238    }
3239}
3240
3241fn should_attach_timeline(bus: &Bus) -> bool {
3242    // Respect explicitly provided timeline collector from caller.
3243    if bus.has::<Timeline>() {
3244        return true;
3245    }
3246
3247    // Attach timeline when runtime export path exists.
3248    has_timeline_output_path()
3249}
3250
3251fn has_timeline_output_path() -> bool {
3252    std::env::var("RANVIER_TIMELINE_OUTPUT")
3253        .ok()
3254        .map(|v| !v.trim().is_empty())
3255        .unwrap_or(false)
3256}
3257
3258fn timeline_sample_rate() -> f64 {
3259    std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
3260        .ok()
3261        .and_then(|v| v.parse::<f64>().ok())
3262        .map(|v| v.clamp(0.0, 1.0))
3263        .unwrap_or(1.0)
3264}
3265
3266fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
3267    if rate <= 0.0 {
3268        return false;
3269    }
3270    if rate >= 1.0 {
3271        return true;
3272    }
3273    let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
3274    bucket < rate
3275}
3276
3277fn timeline_adaptive_policy() -> String {
3278    std::env::var("RANVIER_TIMELINE_ADAPTIVE")
3279        .unwrap_or_else(|_| "fault_branch".to_string())
3280        .to_ascii_lowercase()
3281}
3282
3283fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
3284    match policy {
3285        "off" => false,
3286        "fault_only" => matches!(outcome, Outcome::Fault(_)),
3287        "fault_branch_emit" => {
3288            matches!(
3289                outcome,
3290                Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
3291            )
3292        }
3293        _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
3294    }
3295}
3296
3297#[derive(Default, Clone)]
3298struct SamplingStats {
3299    total_decisions: u64,
3300    exported: u64,
3301    skipped: u64,
3302    sampled_exports: u64,
3303    forced_exports: u64,
3304    last_mode: String,
3305    last_policy: String,
3306    last_updated_ms: u64,
3307}
3308
3309static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
3310
3311fn stats_cell() -> &'static Mutex<SamplingStats> {
3312    TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
3313}
3314
3315fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
3316    let snapshot = {
3317        let mut stats = match stats_cell().lock() {
3318            Ok(guard) => guard,
3319            Err(_) => return,
3320        };
3321
3322        stats.total_decisions += 1;
3323        if exported {
3324            stats.exported += 1;
3325        } else {
3326            stats.skipped += 1;
3327        }
3328        if sampled && exported {
3329            stats.sampled_exports += 1;
3330        }
3331        if forced && exported {
3332            stats.forced_exports += 1;
3333        }
3334        stats.last_mode = mode.to_string();
3335        stats.last_policy = policy.to_string();
3336        stats.last_updated_ms = now_ms();
3337        stats.clone()
3338    };
3339
3340    tracing::debug!(
3341        ranvier.timeline.total_decisions = snapshot.total_decisions,
3342        ranvier.timeline.exported = snapshot.exported,
3343        ranvier.timeline.skipped = snapshot.skipped,
3344        ranvier.timeline.sampled_exports = snapshot.sampled_exports,
3345        ranvier.timeline.forced_exports = snapshot.forced_exports,
3346        ranvier.timeline.mode = %snapshot.last_mode,
3347        ranvier.timeline.policy = %snapshot.last_policy,
3348        "Timeline sampling stats updated"
3349    );
3350
3351    if let Some(path) = timeline_stats_output_path() {
3352        let payload = serde_json::json!({
3353            "total_decisions": snapshot.total_decisions,
3354            "exported": snapshot.exported,
3355            "skipped": snapshot.skipped,
3356            "sampled_exports": snapshot.sampled_exports,
3357            "forced_exports": snapshot.forced_exports,
3358            "last_mode": snapshot.last_mode,
3359            "last_policy": snapshot.last_policy,
3360            "last_updated_ms": snapshot.last_updated_ms
3361        });
3362        if let Some(parent) = Path::new(&path).parent() {
3363            let _ = fs::create_dir_all(parent);
3364        }
3365        if let Err(err) = fs::write(&path, payload.to_string()) {
3366            tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
3367        }
3368    }
3369}
3370
3371fn timeline_stats_output_path() -> Option<String> {
3372    std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
3373        .ok()
3374        .filter(|v| !v.trim().is_empty())
3375}
3376
3377fn write_timeline_with_policy(
3378    path: &str,
3379    mode: &str,
3380    mut timeline: Timeline,
3381) -> Result<(), String> {
3382    match mode {
3383        "append" => {
3384            if Path::new(path).exists() {
3385                let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
3386                match serde_json::from_str::<Timeline>(&content) {
3387                    Ok(mut existing) => {
3388                        existing.events.append(&mut timeline.events);
3389                        existing.sort();
3390                        if let Some(max_events) = max_events_limit() {
3391                            truncate_timeline_events(&mut existing, max_events);
3392                        }
3393                        write_timeline_json(path, &existing)
3394                    }
3395                    Err(_) => {
3396                        // Fallback: if existing is invalid, replace with current timeline
3397                        if let Some(max_events) = max_events_limit() {
3398                            truncate_timeline_events(&mut timeline, max_events);
3399                        }
3400                        write_timeline_json(path, &timeline)
3401                    }
3402                }
3403            } else {
3404                if let Some(max_events) = max_events_limit() {
3405                    truncate_timeline_events(&mut timeline, max_events);
3406                }
3407                write_timeline_json(path, &timeline)
3408            }
3409        }
3410        "rotate" => {
3411            let rotated_path = rotated_path(path, now_ms());
3412            write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
3413            if let Some(keep) = rotate_keep_limit() {
3414                cleanup_rotated_files(path, keep)?;
3415            }
3416            Ok(())
3417        }
3418        _ => write_timeline_json(path, &timeline),
3419    }
3420}
3421
3422fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
3423    if let Some(parent) = Path::new(path).parent()
3424        && !parent.as_os_str().is_empty()
3425    {
3426        fs::create_dir_all(parent).map_err(|e| e.to_string())?;
3427    }
3428    let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
3429    fs::write(path, json).map_err(|e| e.to_string())
3430}
3431
3432fn rotated_path(path: &str, suffix: u64) -> PathBuf {
3433    let p = Path::new(path);
3434    let parent = p.parent().unwrap_or_else(|| Path::new(""));
3435    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
3436    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
3437    parent.join(format!("{}_{}.{}", stem, suffix, ext))
3438}
3439
3440fn max_events_limit() -> Option<usize> {
3441    std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
3442        .ok()
3443        .and_then(|v| v.parse::<usize>().ok())
3444        .filter(|v| *v > 0)
3445}
3446
3447fn rotate_keep_limit() -> Option<usize> {
3448    std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
3449        .ok()
3450        .and_then(|v| v.parse::<usize>().ok())
3451        .filter(|v| *v > 0)
3452}
3453
3454fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
3455    let len = timeline.events.len();
3456    if len > max_events {
3457        let keep_from = len - max_events;
3458        timeline.events = timeline.events.split_off(keep_from);
3459    }
3460}
3461
3462fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
3463    let p = Path::new(base_path);
3464    let parent = p.parent().unwrap_or_else(|| Path::new("."));
3465    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
3466    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
3467    let prefix = format!("{}_", stem);
3468    let suffix = format!(".{}", ext);
3469
3470    let mut files = fs::read_dir(parent)
3471        .map_err(|e| e.to_string())?
3472        .filter_map(|entry| entry.ok())
3473        .filter(|entry| {
3474            let name = entry.file_name();
3475            let name = name.to_string_lossy();
3476            name.starts_with(&prefix) && name.ends_with(&suffix)
3477        })
3478        .map(|entry| {
3479            let modified = entry
3480                .metadata()
3481                .ok()
3482                .and_then(|m| m.modified().ok())
3483                .unwrap_or(SystemTime::UNIX_EPOCH);
3484            (entry.path(), modified)
3485        })
3486        .collect::<Vec<_>>();
3487
3488    files.sort_by(|a, b| b.1.cmp(&a.1));
3489    for (path, _) in files.into_iter().skip(keep) {
3490        let _ = fs::remove_file(path);
3491    }
3492    Ok(())
3493}
3494
3495fn bus_capability_schema_from_policy(
3496    policy: Option<ranvier_core::bus::BusAccessPolicy>,
3497) -> Option<BusCapabilitySchema> {
3498    let policy = policy?;
3499
3500    let mut allow = policy
3501        .allow
3502        .unwrap_or_default()
3503        .into_iter()
3504        .map(|entry| entry.type_name.to_string())
3505        .collect::<Vec<_>>();
3506    let mut deny = policy
3507        .deny
3508        .into_iter()
3509        .map(|entry| entry.type_name.to_string())
3510        .collect::<Vec<_>>();
3511    allow.sort();
3512    deny.sort();
3513
3514    if allow.is_empty() && deny.is_empty() {
3515        return None;
3516    }
3517
3518    Some(BusCapabilitySchema { allow, deny })
3519}
3520
3521fn now_ms() -> u64 {
3522    SystemTime::now()
3523        .duration_since(UNIX_EPOCH)
3524        .map(|d| d.as_millis() as u64)
3525        .unwrap_or(0)
3526}
3527
3528#[cfg(test)]
3529mod tests {
3530    use super::{
3531        Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
3532        should_force_export,
3533    };
3534    use crate::persistence::{
3535        CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
3536        CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
3537        InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
3538        PersistenceHandle, PersistenceStore, PersistenceTraceId,
3539    };
3540    use anyhow::Result;
3541    use async_trait::async_trait;
3542    use ranvier_audit::{AuditError, AuditEvent, AuditSink};
3543    use ranvier_core::event::{DlqPolicy, DlqSink};
3544    use ranvier_core::saga::SagaStack;
3545    use ranvier_core::timeline::{Timeline, TimelineEvent};
3546    use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
3547    use serde::{Deserialize, Serialize};
3548    use std::sync::Arc;
3549    use tokio::sync::Mutex;
3550    use uuid::Uuid;
3551
3552    struct MockAuditSink {
3553        events: Arc<Mutex<Vec<AuditEvent>>>,
3554    }
3555
3556    #[async_trait]
3557    impl AuditSink for MockAuditSink {
3558        async fn append(&self, event: &AuditEvent) -> Result<(), AuditError> {
3559            self.events.lock().await.push(event.clone());
3560            Ok(())
3561        }
3562    }
3563
3564    #[tokio::test]
3565    async fn execute_logs_audit_events_for_intervention() {
3566        use ranvier_inspector::StateInspector;
3567
3568        let trace_id = "test-audit-trace";
3569        let store_impl = InMemoryPersistenceStore::new();
3570        let events = Arc::new(Mutex::new(Vec::new()));
3571        let sink = MockAuditSink {
3572            events: events.clone(),
3573        };
3574
3575        let axon = Axon::<i32, i32, TestInfallible>::start("AuditTest")
3576            .then(AddOne)
3577            .with_persistence_store(store_impl.clone())
3578            .with_audit_sink(sink);
3579
3580        let mut bus = Bus::new();
3581        bus.insert(PersistenceHandle::from_arc(Arc::new(store_impl.clone())));
3582        bus.insert(PersistenceTraceId::new(trace_id));
3583        let target_node_id = axon.schematic.nodes[0].id.clone();
3584
3585        // 0. Pre-requisite: Save an initial trace state so intervention has a target to attach to
3586        store_impl
3587            .append(crate::persistence::PersistenceEnvelope {
3588                trace_id: trace_id.to_string(),
3589                circuit: "AuditTest".to_string(),
3590                schematic_version: "v1.0".to_string(),
3591                step: 0,
3592                node_id: None,
3593                outcome_kind: "Next".to_string(),
3594                timestamp_ms: 0,
3595                payload_hash: None,
3596                payload: None,
3597            })
3598            .await
3599            .unwrap();
3600
3601        // 1. Trigger force_resume (should log ForceResume)
3602        axon.force_resume(trace_id, &target_node_id, None)
3603            .await
3604            .unwrap();
3605
3606        // 2. Execute (should log ApplyIntervention)
3607        axon.execute(10, &(), &mut bus).await;
3608
3609        let recorded = events.lock().await;
3610        assert_eq!(
3611            recorded.len(),
3612            2,
3613            "Should have 2 audit events: ForceResume and ApplyIntervention"
3614        );
3615        assert_eq!(recorded[0].action, "ForceResume");
3616        assert_eq!(recorded[0].target, trace_id);
3617        assert_eq!(recorded[1].action, "ApplyIntervention");
3618        assert_eq!(recorded[1].target, trace_id);
3619    }
3620
3621    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
3622    pub enum TestInfallible {}
3623
3624    #[test]
3625    fn inspector_enabled_flag_matrix() {
3626        assert!(inspector_enabled_from_value(None));
3627        assert!(inspector_enabled_from_value(Some("1")));
3628        assert!(inspector_enabled_from_value(Some("true")));
3629        assert!(inspector_enabled_from_value(Some("on")));
3630        assert!(!inspector_enabled_from_value(Some("0")));
3631        assert!(!inspector_enabled_from_value(Some("false")));
3632    }
3633
3634    #[test]
3635    fn inspector_dev_mode_matrix() {
3636        assert!(inspector_dev_mode_from_value(None));
3637        assert!(inspector_dev_mode_from_value(Some("dev")));
3638        assert!(inspector_dev_mode_from_value(Some("staging")));
3639        assert!(!inspector_dev_mode_from_value(Some("prod")));
3640        assert!(!inspector_dev_mode_from_value(Some("production")));
3641    }
3642
3643    #[test]
3644    fn adaptive_policy_force_export_matrix() {
3645        let next = Outcome::<(), &'static str>::Next(());
3646        let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
3647        let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
3648        let fault = Outcome::<(), &'static str>::Fault("boom");
3649
3650        assert!(!should_force_export(&next, "off"));
3651        assert!(!should_force_export(&fault, "off"));
3652
3653        assert!(!should_force_export(&branch, "fault_only"));
3654        assert!(should_force_export(&fault, "fault_only"));
3655
3656        assert!(should_force_export(&branch, "fault_branch"));
3657        assert!(!should_force_export(&emit, "fault_branch"));
3658        assert!(should_force_export(&fault, "fault_branch"));
3659
3660        assert!(should_force_export(&branch, "fault_branch_emit"));
3661        assert!(should_force_export(&emit, "fault_branch_emit"));
3662        assert!(should_force_export(&fault, "fault_branch_emit"));
3663    }
3664
3665    #[test]
3666    fn sampling_and_adaptive_combination_decisions() {
3667        let bus_id = Uuid::nil();
3668        let next = Outcome::<(), &'static str>::Next(());
3669        let fault = Outcome::<(), &'static str>::Fault("boom");
3670
3671        let sampled_never = sampled_by_bus_id(bus_id, 0.0);
3672        assert!(!sampled_never);
3673        assert!(!(sampled_never || should_force_export(&next, "off")));
3674        assert!(sampled_never || should_force_export(&fault, "fault_only"));
3675
3676        let sampled_always = sampled_by_bus_id(bus_id, 1.0);
3677        assert!(sampled_always);
3678        assert!(sampled_always || should_force_export(&next, "off"));
3679        assert!(sampled_always || should_force_export(&fault, "off"));
3680    }
3681
3682    #[derive(Clone)]
3683    struct AddOne;
3684
3685    #[async_trait]
3686    impl Transition<i32, i32> for AddOne {
3687        type Error = TestInfallible;
3688        type Resources = ();
3689
3690        async fn run(
3691            &self,
3692            state: i32,
3693            _resources: &Self::Resources,
3694            _bus: &mut Bus,
3695        ) -> Outcome<i32, Self::Error> {
3696            Outcome::Next(state + 1)
3697        }
3698    }
3699
3700    #[derive(Clone)]
3701    struct AlwaysFault;
3702
3703    #[async_trait]
3704    impl Transition<i32, i32> for AlwaysFault {
3705        type Error = String;
3706        type Resources = ();
3707
3708        async fn run(
3709            &self,
3710            _state: i32,
3711            _resources: &Self::Resources,
3712            _bus: &mut Bus,
3713        ) -> Outcome<i32, Self::Error> {
3714            Outcome::Fault("boom".to_string())
3715        }
3716    }
3717
3718    #[derive(Clone)]
3719    struct CapabilityGuarded;
3720
3721    #[async_trait]
3722    impl Transition<(), ()> for CapabilityGuarded {
3723        type Error = String;
3724        type Resources = ();
3725
3726        fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
3727            Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
3728        }
3729
3730        async fn run(
3731            &self,
3732            _state: (),
3733            _resources: &Self::Resources,
3734            bus: &mut Bus,
3735        ) -> Outcome<(), Self::Error> {
3736            match bus.get::<String>() {
3737                Ok(_) => Outcome::Next(()),
3738                Err(err) => Outcome::Fault(err.to_string()),
3739            }
3740        }
3741    }
3742
3743    #[derive(Clone)]
3744    struct RecordingCompensationHook {
3745        calls: Arc<Mutex<Vec<CompensationContext>>>,
3746        should_fail: bool,
3747    }
3748
3749    #[async_trait]
3750    impl CompensationHook for RecordingCompensationHook {
3751        async fn compensate(&self, context: CompensationContext) -> Result<()> {
3752            self.calls.lock().await.push(context);
3753            if self.should_fail {
3754                return Err(anyhow::anyhow!("compensation failed"));
3755            }
3756            Ok(())
3757        }
3758    }
3759
3760    #[derive(Clone)]
3761    struct FlakyCompensationHook {
3762        calls: Arc<Mutex<u32>>,
3763        failures_remaining: Arc<Mutex<u32>>,
3764    }
3765
3766    #[async_trait]
3767    impl CompensationHook for FlakyCompensationHook {
3768        async fn compensate(&self, _context: CompensationContext) -> Result<()> {
3769            {
3770                let mut calls = self.calls.lock().await;
3771                *calls += 1;
3772            }
3773            let mut failures_remaining = self.failures_remaining.lock().await;
3774            if *failures_remaining > 0 {
3775                *failures_remaining -= 1;
3776                return Err(anyhow::anyhow!("transient compensation failure"));
3777            }
3778            Ok(())
3779        }
3780    }
3781
3782    #[derive(Clone)]
3783    struct FailingCompensationIdempotencyStore {
3784        read_calls: Arc<Mutex<u32>>,
3785        write_calls: Arc<Mutex<u32>>,
3786    }
3787
3788    #[async_trait]
3789    impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
3790        async fn was_compensated(&self, _key: &str) -> Result<bool> {
3791            let mut read_calls = self.read_calls.lock().await;
3792            *read_calls += 1;
3793            Err(anyhow::anyhow!("forced idempotency read failure"))
3794        }
3795
3796        async fn mark_compensated(&self, _key: &str) -> Result<()> {
3797            let mut write_calls = self.write_calls.lock().await;
3798            *write_calls += 1;
3799            Err(anyhow::anyhow!("forced idempotency write failure"))
3800        }
3801    }
3802
3803    #[tokio::test]
3804    async fn execute_persists_success_trace_when_handle_exists() {
3805        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3806        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3807
3808        let mut bus = Bus::new();
3809        bus.insert(PersistenceHandle::from_arc(store_dyn));
3810        bus.insert(PersistenceTraceId::new("trace-success"));
3811
3812        let axon = Axon::<i32, i32, TestInfallible>::start("PersistSuccess").then(AddOne);
3813        let outcome = axon.execute(41, &(), &mut bus).await;
3814        assert!(matches!(outcome, Outcome::Next(42)));
3815
3816        let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
3817        assert_eq!(persisted.events.len(), 3); // Enter + step-level Next + final Next
3818        assert_eq!(persisted.events[0].outcome_kind, "Enter");
3819        assert_eq!(persisted.events[1].outcome_kind, "Next"); // step-level
3820        assert_eq!(persisted.events[2].outcome_kind, "Next"); // final
3821        assert_eq!(persisted.completion, Some(CompletionState::Success));
3822    }
3823
3824    #[tokio::test]
3825    async fn execute_persists_fault_completion_state() {
3826        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3827        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3828
3829        let mut bus = Bus::new();
3830        bus.insert(PersistenceHandle::from_arc(store_dyn));
3831        bus.insert(PersistenceTraceId::new("trace-fault"));
3832
3833        let axon = Axon::<i32, i32, String>::start("PersistFault").then(AlwaysFault);
3834        let outcome = axon.execute(41, &(), &mut bus).await;
3835        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3836
3837        let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
3838        assert_eq!(persisted.events.len(), 3); // Enter + step-level Fault + final Fault
3839        assert_eq!(persisted.events[1].outcome_kind, "Fault"); // step-level
3840        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3841        assert_eq!(persisted.completion, Some(CompletionState::Fault));
3842    }
3843
3844    #[tokio::test]
3845    async fn execute_respects_persistence_auto_complete_off() {
3846        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3847        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3848
3849        let mut bus = Bus::new();
3850        bus.insert(PersistenceHandle::from_arc(store_dyn));
3851        bus.insert(PersistenceTraceId::new("trace-no-complete"));
3852        bus.insert(PersistenceAutoComplete(false));
3853
3854        let axon = Axon::<i32, i32, TestInfallible>::start("PersistNoComplete").then(AddOne);
3855        let outcome = axon.execute(1, &(), &mut bus).await;
3856        assert!(matches!(outcome, Outcome::Next(2)));
3857
3858        let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
3859        assert_eq!(persisted.events.len(), 3); // Enter + step-level Next + final Next
3860        assert_eq!(persisted.completion, None);
3861    }
3862
3863    #[tokio::test]
3864    async fn fault_triggers_compensation_and_marks_compensated() {
3865        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3866        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3867        let calls = Arc::new(Mutex::new(Vec::new()));
3868        let compensation = RecordingCompensationHook {
3869            calls: calls.clone(),
3870            should_fail: false,
3871        };
3872
3873        let mut bus = Bus::new();
3874        bus.insert(PersistenceHandle::from_arc(store_dyn));
3875        bus.insert(PersistenceTraceId::new("trace-compensated"));
3876        bus.insert(CompensationHandle::from_hook(compensation));
3877
3878        let axon = Axon::<i32, i32, String>::start("CompensatedFault").then(AlwaysFault);
3879        let outcome = axon.execute(7, &(), &mut bus).await;
3880        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3881
3882        let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
3883        assert_eq!(persisted.events.len(), 4); // Enter + step-level Fault + final Fault + Compensated
3884        assert_eq!(persisted.events[0].outcome_kind, "Enter");
3885        assert_eq!(persisted.events[1].outcome_kind, "Fault"); // step-level
3886        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3887        assert_eq!(persisted.events[3].outcome_kind, "Compensated");
3888        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3889
3890        let recorded = calls.lock().await;
3891        assert_eq!(recorded.len(), 1);
3892        assert_eq!(recorded[0].trace_id, "trace-compensated");
3893        assert_eq!(recorded[0].fault_kind, "Fault");
3894    }
3895
3896    #[tokio::test]
3897    async fn failed_compensation_keeps_fault_completion() {
3898        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3899        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3900        let calls = Arc::new(Mutex::new(Vec::new()));
3901        let compensation = RecordingCompensationHook {
3902            calls: calls.clone(),
3903            should_fail: true,
3904        };
3905
3906        let mut bus = Bus::new();
3907        bus.insert(PersistenceHandle::from_arc(store_dyn));
3908        bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
3909        bus.insert(CompensationHandle::from_hook(compensation));
3910
3911        let axon = Axon::<i32, i32, String>::start("CompensationFails").then(AlwaysFault);
3912        let outcome = axon.execute(7, &(), &mut bus).await;
3913        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3914
3915        let persisted = store_impl
3916            .load("trace-compensation-failed")
3917            .await
3918            .unwrap()
3919            .unwrap();
3920        assert_eq!(persisted.events.len(), 3); // Enter + step-level Fault + final Fault
3921        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3922        assert_eq!(persisted.completion, Some(CompletionState::Fault));
3923
3924        let recorded = calls.lock().await;
3925        assert_eq!(recorded.len(), 1);
3926    }
3927
3928    #[tokio::test]
3929    async fn compensation_retry_policy_succeeds_after_retries() {
3930        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3931        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3932        let calls = Arc::new(Mutex::new(0u32));
3933        let failures_remaining = Arc::new(Mutex::new(2u32));
3934        let compensation = FlakyCompensationHook {
3935            calls: calls.clone(),
3936            failures_remaining,
3937        };
3938
3939        let mut bus = Bus::new();
3940        bus.insert(PersistenceHandle::from_arc(store_dyn));
3941        bus.insert(PersistenceTraceId::new("trace-retry-success"));
3942        bus.insert(CompensationHandle::from_hook(compensation));
3943        bus.insert(CompensationRetryPolicy {
3944            max_attempts: 3,
3945            backoff_ms: 0,
3946        });
3947
3948        let axon = Axon::<i32, i32, String>::start("CompensationRetry").then(AlwaysFault);
3949        let outcome = axon.execute(7, &(), &mut bus).await;
3950        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3951
3952        let persisted = store_impl
3953            .load("trace-retry-success")
3954            .await
3955            .unwrap()
3956            .unwrap();
3957        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3958        assert_eq!(
3959            persisted.events.last().map(|e| e.outcome_kind.as_str()),
3960            Some("Compensated")
3961        );
3962
3963        let attempt_count = calls.lock().await;
3964        assert_eq!(*attempt_count, 3);
3965    }
3966
3967    #[tokio::test]
3968    async fn compensation_idempotency_skips_duplicate_hook_execution() {
3969        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3970        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3971        let calls = Arc::new(Mutex::new(Vec::new()));
3972        let compensation = RecordingCompensationHook {
3973            calls: calls.clone(),
3974            should_fail: false,
3975        };
3976        let idempotency = InMemoryCompensationIdempotencyStore::new();
3977
3978        let mut bus = Bus::new();
3979        bus.insert(PersistenceHandle::from_arc(store_dyn));
3980        bus.insert(PersistenceTraceId::new("trace-idempotent"));
3981        bus.insert(PersistenceAutoComplete(false));
3982        bus.insert(CompensationHandle::from_hook(compensation));
3983        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3984
3985        let axon = Axon::<i32, i32, String>::start("CompensationIdempotency").then(AlwaysFault);
3986
3987        let outcome1 = axon.execute(7, &(), &mut bus).await;
3988        let outcome2 = axon.execute(8, &(), &mut bus).await;
3989        assert!(matches!(outcome1, Outcome::Fault(msg) if msg == "boom"));
3990        assert!(matches!(outcome2, Outcome::Fault(msg) if msg == "boom"));
3991
3992        let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
3993        assert_eq!(persisted.completion, None);
3994        // Verify that "Compensated" events are present for both executions
3995        let compensated_count = persisted
3996            .events
3997            .iter()
3998            .filter(|e| e.outcome_kind == "Compensated")
3999            .count();
4000        assert_eq!(
4001            compensated_count, 2,
4002            "Should have 2 Compensated events (one per execution)"
4003        );
4004
4005        let recorded = calls.lock().await;
4006        assert_eq!(recorded.len(), 1);
4007    }
4008
4009    #[tokio::test]
4010    async fn compensation_idempotency_store_failure_does_not_block_compensation() {
4011        let store_impl = Arc::new(InMemoryPersistenceStore::new());
4012        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
4013        let calls = Arc::new(Mutex::new(Vec::new()));
4014        let read_calls = Arc::new(Mutex::new(0u32));
4015        let write_calls = Arc::new(Mutex::new(0u32));
4016        let compensation = RecordingCompensationHook {
4017            calls: calls.clone(),
4018            should_fail: false,
4019        };
4020        let idempotency = FailingCompensationIdempotencyStore {
4021            read_calls: read_calls.clone(),
4022            write_calls: write_calls.clone(),
4023        };
4024
4025        let mut bus = Bus::new();
4026        bus.insert(PersistenceHandle::from_arc(store_dyn));
4027        bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
4028        bus.insert(CompensationHandle::from_hook(compensation));
4029        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
4030
4031        let axon = Axon::<i32, i32, String>::start("IdempotencyStoreFailure").then(AlwaysFault);
4032        let outcome = axon.execute(9, &(), &mut bus).await;
4033        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
4034
4035        let persisted = store_impl
4036            .load("trace-idempotency-store-failure")
4037            .await
4038            .unwrap()
4039            .unwrap();
4040        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
4041        assert_eq!(
4042            persisted.events.last().map(|e| e.outcome_kind.as_str()),
4043            Some("Compensated")
4044        );
4045
4046        let recorded = calls.lock().await;
4047        assert_eq!(recorded.len(), 1);
4048        assert_eq!(*read_calls.lock().await, 1);
4049        assert_eq!(*write_calls.lock().await, 1);
4050    }
4051
4052    #[tokio::test]
4053    async fn transition_bus_policy_blocks_unauthorized_resource_access() {
4054        let mut bus = Bus::new();
4055        bus.insert(1_i32);
4056        bus.insert("secret".to_string());
4057
4058        let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
4059        let outcome = axon.execute((), &(), &mut bus).await;
4060
4061        match outcome {
4062            Outcome::Fault(msg) => {
4063                assert!(msg.contains("Bus access denied"), "{msg}");
4064                assert!(msg.contains("CapabilityGuarded"), "{msg}");
4065                assert!(msg.contains("alloc::string::String"), "{msg}");
4066            }
4067            other => panic!("expected fault, got {other:?}"),
4068        }
4069    }
4070
4071    #[tokio::test]
4072    async fn execute_fails_on_version_mismatch_without_migration() {
4073        let store_impl = Arc::new(InMemoryPersistenceStore::new());
4074        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
4075
4076        let trace_id = "v-mismatch";
4077        // Create an existing trace with an older version
4078        let old_envelope = crate::persistence::PersistenceEnvelope {
4079            trace_id: trace_id.to_string(),
4080            circuit: "TestCircuit".to_string(),
4081            schematic_version: "0.9".to_string(),
4082            step: 0,
4083            node_id: None,
4084            outcome_kind: "Enter".to_string(),
4085            timestamp_ms: 0,
4086            payload_hash: None,
4087            payload: None,
4088        };
4089        store_impl.append(old_envelope).await.unwrap();
4090
4091        let mut bus = Bus::new();
4092        bus.insert(PersistenceHandle::from_arc(store_dyn));
4093        bus.insert(PersistenceTraceId::new(trace_id));
4094
4095        // Current axon is version 1.0
4096        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
4097        let outcome = axon.execute(10, &(), &mut bus).await;
4098
4099        if let Outcome::Emit(kind, _) = outcome {
4100            assert_eq!(kind, "execution.resumption.version_mismatch_failed");
4101        } else {
4102            panic!("Expected version mismatch emission, got {:?}", outcome);
4103        }
4104    }
4105
4106    #[tokio::test]
4107    async fn execute_resumes_from_start_on_migration_strategy() {
4108        let store_impl = Arc::new(InMemoryPersistenceStore::new());
4109        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
4110
4111        let trace_id = "v-migration";
4112        // Create an existing trace with an older version at step 5
4113        let old_envelope = crate::persistence::PersistenceEnvelope {
4114            trace_id: trace_id.to_string(),
4115            circuit: "TestCircuit".to_string(),
4116            schematic_version: "0.9".to_string(),
4117            step: 5,
4118            node_id: None,
4119            outcome_kind: "Next".to_string(),
4120            timestamp_ms: 0,
4121            payload_hash: None,
4122            payload: None,
4123        };
4124        store_impl.append(old_envelope).await.unwrap();
4125
4126        let mut registry = ranvier_core::schematic::MigrationRegistry::new("TestCircuit");
4127        registry.register(ranvier_core::schematic::SnapshotMigration {
4128            name: Some("v0.9 to v1.0".to_string()),
4129            from_version: "0.9".to_string(),
4130            to_version: "1.0".to_string(),
4131            default_strategy: ranvier_core::schematic::MigrationStrategy::ResumeFromStart,
4132            node_mapping: std::collections::HashMap::new(),
4133            payload_mapper: None,
4134        });
4135
4136        let mut bus = Bus::new();
4137        bus.insert(PersistenceHandle::from_arc(store_dyn));
4138        bus.insert(PersistenceTraceId::new(trace_id));
4139        bus.insert(registry);
4140
4141        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
4142        let outcome = axon.execute(10, &(), &mut bus).await;
4143
4144        // Should have resumed from start (step 0), resulting in 11
4145        assert!(matches!(outcome, Outcome::Next(11)));
4146
4147        // Verify new event has current version
4148        let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
4149        assert_eq!(persisted.schematic_version, "1.0");
4150    }
4151
4152    #[tokio::test]
4153    async fn execute_applies_manual_intervention_jump_and_payload() {
4154        let store_impl = Arc::new(InMemoryPersistenceStore::new());
4155        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
4156
4157        let trace_id = "intervention-test";
4158        // 1. Run a normal trace part-way
4159        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit")
4160            .then(AddOne)
4161            .then(AddOne);
4162
4163        let mut bus = Bus::new();
4164        bus.insert(PersistenceHandle::from_arc(store_dyn));
4165        bus.insert(PersistenceTraceId::new(trace_id));
4166
4167        // Save an intervention: Jump to the second 'AddOne' node (which has the label 'AddOne')
4168        // with a payload override of 100.
4169        // The first node is 'AddOne', the second is ALSO 'AddOne'.
4170        // Schematic position: 0=Ingress, 1=AddOne, 2=AddOne
4171        let _target_node_label = "AddOne";
4172        // To be precise, let's find the ID of the second node.
4173        let target_node_id = axon.schematic.nodes[2].id.clone();
4174
4175        // Pre-seed an initial trace entry so save_intervention doesn't fail
4176        store_impl
4177            .append(crate::persistence::PersistenceEnvelope {
4178                trace_id: trace_id.to_string(),
4179                circuit: "TestCircuit".to_string(),
4180                schematic_version: "1.0".to_string(),
4181                step: 0,
4182                node_id: None,
4183                outcome_kind: "Enter".to_string(),
4184                timestamp_ms: 0,
4185                payload_hash: None,
4186                payload: None,
4187            })
4188            .await
4189            .unwrap();
4190
4191        store_impl
4192            .save_intervention(
4193                trace_id,
4194                crate::persistence::Intervention {
4195                    target_node: target_node_id.clone(),
4196                    payload_override: Some(serde_json::json!(100)),
4197                    timestamp_ms: 0,
4198                },
4199            )
4200            .await
4201            .unwrap();
4202
4203        // 2. Execute. It should skip the first AddOne and use 100 for the second AddOne.
4204        // Result should be 100 + 1 = 101.
4205        let outcome = axon.execute(10, &(), &mut bus).await;
4206
4207        match outcome {
4208            Outcome::Next(val) => assert_eq!(val, 101, "Should have used payload 100 and added 1"),
4209            other => panic!("Expected Outcome::Next(101), got {:?}", other),
4210        }
4211
4212        // Verify the jump was logged in trace
4213        let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
4214        // The last event should be from the jump target's execution.
4215        assert_eq!(persisted.interventions.len(), 1);
4216        assert_eq!(persisted.interventions[0].target_node, target_node_id);
4217    }
4218
4219    // ── DLQ Retry Tests ──────────────────────────────────────────────
4220
4221    /// A transition that fails a configurable number of times before succeeding.
4222    #[derive(Clone)]
4223    struct FailNThenSucceed {
4224        remaining: Arc<tokio::sync::Mutex<u32>>,
4225    }
4226
4227    #[async_trait]
4228    impl Transition<i32, i32> for FailNThenSucceed {
4229        type Error = String;
4230        type Resources = ();
4231
4232        async fn run(
4233            &self,
4234            state: i32,
4235            _resources: &Self::Resources,
4236            _bus: &mut Bus,
4237        ) -> Outcome<i32, Self::Error> {
4238            let mut rem = self.remaining.lock().await;
4239            if *rem > 0 {
4240                *rem -= 1;
4241                Outcome::Fault("transient failure".to_string())
4242            } else {
4243                Outcome::Next(state + 1)
4244            }
4245        }
4246    }
4247
4248    /// A mock DLQ sink that records all dead letters.
4249    #[derive(Clone)]
4250    struct MockDlqSink {
4251        letters: Arc<tokio::sync::Mutex<Vec<String>>>,
4252    }
4253
4254    #[async_trait]
4255    impl DlqSink for MockDlqSink {
4256        async fn store_dead_letter(
4257            &self,
4258            workflow_id: &str,
4259            _circuit_label: &str,
4260            node_id: &str,
4261            error_msg: &str,
4262            _payload: &[u8],
4263        ) -> Result<(), String> {
4264            let entry = format!("{}:{}:{}", workflow_id, node_id, error_msg);
4265            self.letters.lock().await.push(entry);
4266            Ok(())
4267        }
4268    }
4269
4270    #[tokio::test]
4271    async fn retry_then_dlq_retries_and_succeeds_before_exhaustion() {
4272        // Fail 2 times, succeed on 3rd attempt. Policy allows 5 attempts.
4273        let remaining = Arc::new(tokio::sync::Mutex::new(2u32));
4274        let trans = FailNThenSucceed { remaining };
4275
4276        let dlq_sink = MockDlqSink {
4277            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4278        };
4279
4280        let mut bus = Bus::new();
4281        bus.insert(Timeline::new());
4282
4283        let axon = Axon::<i32, i32, String>::start("RetrySucceed")
4284            .then(trans)
4285            .with_dlq_policy(DlqPolicy::RetryThenDlq {
4286                max_attempts: 5,
4287                backoff_ms: 1,
4288            })
4289            .with_dlq_sink(dlq_sink.clone());
4290        let outcome = axon.execute(10, &(), &mut bus).await;
4291
4292        // Should succeed (10 + 1 = 11)
4293        assert!(
4294            matches!(outcome, Outcome::Next(11)),
4295            "Expected Next(11), got {:?}",
4296            outcome
4297        );
4298
4299        // No dead letters since it recovered
4300        let letters = dlq_sink.letters.lock().await;
4301        assert!(
4302            letters.is_empty(),
4303            "Should have 0 dead letters, got {}",
4304            letters.len()
4305        );
4306
4307        // Timeline should contain NodeRetry events
4308        let timeline = bus.read::<Timeline>().unwrap();
4309        let retry_count = timeline
4310            .events
4311            .iter()
4312            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4313            .count();
4314        assert_eq!(retry_count, 2, "Should have 2 retry events");
4315    }
4316
4317    #[tokio::test]
4318    async fn retry_then_dlq_exhausts_retries_and_sends_to_dlq() {
4319        // Always fails. Policy allows 3 attempts (1 initial + 2 retries).
4320        let mut bus = Bus::new();
4321        bus.insert(Timeline::new());
4322
4323        let dlq_sink = MockDlqSink {
4324            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4325        };
4326
4327        let axon = Axon::<i32, i32, String>::start("RetryExhaust")
4328            .then(AlwaysFault)
4329            .with_dlq_policy(DlqPolicy::RetryThenDlq {
4330                max_attempts: 3,
4331                backoff_ms: 1,
4332            })
4333            .with_dlq_sink(dlq_sink.clone());
4334        let outcome = axon.execute(42, &(), &mut bus).await;
4335
4336        assert!(
4337            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4338            "Expected Fault(boom), got {:?}",
4339            outcome
4340        );
4341
4342        // Should have exactly 1 dead letter
4343        let letters = dlq_sink.letters.lock().await;
4344        assert_eq!(letters.len(), 1, "Should have 1 dead letter");
4345
4346        // Timeline should have 2 retry events and 1 DlqExhausted event
4347        let timeline = bus.read::<Timeline>().unwrap();
4348        let retry_count = timeline
4349            .events
4350            .iter()
4351            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4352            .count();
4353        let dlq_count = timeline
4354            .events
4355            .iter()
4356            .filter(|e| matches!(e, TimelineEvent::DlqExhausted { .. }))
4357            .count();
4358        assert_eq!(
4359            retry_count, 2,
4360            "Should have 2 retry events (attempts 2 and 3)"
4361        );
4362        assert_eq!(dlq_count, 1, "Should have 1 DlqExhausted event");
4363    }
4364
4365    #[tokio::test]
4366    async fn send_to_dlq_policy_sends_immediately_without_retry() {
4367        let mut bus = Bus::new();
4368        bus.insert(Timeline::new());
4369
4370        let dlq_sink = MockDlqSink {
4371            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4372        };
4373
4374        let axon = Axon::<i32, i32, String>::start("SendDlq")
4375            .then(AlwaysFault)
4376            .with_dlq_policy(DlqPolicy::SendToDlq)
4377            .with_dlq_sink(dlq_sink.clone());
4378        let outcome = axon.execute(1, &(), &mut bus).await;
4379
4380        assert!(matches!(outcome, Outcome::Fault(_)));
4381
4382        // Should have exactly 1 dead letter (immediate, no retries)
4383        let letters = dlq_sink.letters.lock().await;
4384        assert_eq!(letters.len(), 1);
4385
4386        // No retry or DlqExhausted events
4387        let timeline = bus.read::<Timeline>().unwrap();
4388        let retry_count = timeline
4389            .events
4390            .iter()
4391            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4392            .count();
4393        assert_eq!(retry_count, 0);
4394    }
4395
4396    #[tokio::test]
4397    async fn drop_policy_does_not_send_to_dlq() {
4398        let mut bus = Bus::new();
4399
4400        let dlq_sink = MockDlqSink {
4401            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4402        };
4403
4404        let axon = Axon::<i32, i32, String>::start("DropDlq")
4405            .then(AlwaysFault)
4406            .with_dlq_policy(DlqPolicy::Drop)
4407            .with_dlq_sink(dlq_sink.clone());
4408        let outcome = axon.execute(1, &(), &mut bus).await;
4409
4410        assert!(matches!(outcome, Outcome::Fault(_)));
4411
4412        // No dead letters
4413        let letters = dlq_sink.letters.lock().await;
4414        assert!(letters.is_empty());
4415    }
4416
4417    #[tokio::test]
4418    async fn dynamic_policy_hot_reload_changes_dlq_behavior() {
4419        use ranvier_core::policy::DynamicPolicy;
4420
4421        // Start with Drop policy (no DLQ)
4422        let (tx, dynamic) = DynamicPolicy::new(DlqPolicy::Drop);
4423        let dlq_sink = MockDlqSink {
4424            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4425        };
4426
4427        let axon = Axon::<i32, i32, String>::start("DynamicDlq")
4428            .then(AlwaysFault)
4429            .with_dynamic_dlq_policy(dynamic)
4430            .with_dlq_sink(dlq_sink.clone());
4431
4432        // First execution: Drop policy → no dead letters
4433        let mut bus = Bus::new();
4434        let outcome = axon.execute(1, &(), &mut bus).await;
4435        assert!(matches!(outcome, Outcome::Fault(_)));
4436        assert!(
4437            dlq_sink.letters.lock().await.is_empty(),
4438            "Drop policy should produce no DLQ entries"
4439        );
4440
4441        // Hot-reload: switch to SendToDlq
4442        tx.send(DlqPolicy::SendToDlq).unwrap();
4443
4444        // Second execution: SendToDlq policy → dead letter captured
4445        let mut bus2 = Bus::new();
4446        let outcome2 = axon.execute(2, &(), &mut bus2).await;
4447        assert!(matches!(outcome2, Outcome::Fault(_)));
4448        assert_eq!(
4449            dlq_sink.letters.lock().await.len(),
4450            1,
4451            "SendToDlq policy should produce 1 DLQ entry"
4452        );
4453    }
4454
4455    #[tokio::test]
4456    async fn dynamic_saga_policy_hot_reload() {
4457        use ranvier_core::policy::DynamicPolicy;
4458        use ranvier_core::saga::SagaPolicy;
4459
4460        // Start with Disabled saga
4461        let (tx, dynamic) = DynamicPolicy::new(SagaPolicy::Disabled);
4462
4463        let axon = Axon::<i32, i32, TestInfallible>::start("DynamicSaga")
4464            .then(AddOne)
4465            .with_dynamic_saga_policy(dynamic);
4466
4467        // First execution: Disabled → no SagaStack in bus
4468        let mut bus = Bus::new();
4469        let _outcome = axon.execute(1, &(), &mut bus).await;
4470        assert!(
4471            bus.read::<SagaStack>().is_none() || bus.read::<SagaStack>().unwrap().is_empty(),
4472            "SagaStack should be absent or empty when disabled"
4473        );
4474
4475        // Hot-reload: enable saga
4476        tx.send(SagaPolicy::Enabled).unwrap();
4477
4478        // Second execution: Enabled → SagaStack populated
4479        let mut bus2 = Bus::new();
4480        let _outcome2 = axon.execute(10, &(), &mut bus2).await;
4481        assert!(
4482            bus2.read::<SagaStack>().is_some(),
4483            "SagaStack should exist when saga is enabled"
4484        );
4485    }
4486
4487    // ── IAM Boundary Tests ──────────────────────────────────────
4488
4489    mod iam_tests {
4490        use super::*;
4491        use ranvier_core::iam::{IamError, IamIdentity, IamPolicy, IamToken, IamVerifier};
4492
4493        /// Mock IamVerifier that returns a fixed identity.
4494        #[derive(Clone)]
4495        struct MockVerifier {
4496            identity: IamIdentity,
4497            should_fail: bool,
4498        }
4499
4500        #[async_trait]
4501        impl IamVerifier for MockVerifier {
4502            async fn verify(&self, _token: &str) -> Result<IamIdentity, IamError> {
4503                if self.should_fail {
4504                    Err(IamError::InvalidToken("mock verification failure".into()))
4505                } else {
4506                    Ok(self.identity.clone())
4507                }
4508            }
4509        }
4510
4511        #[tokio::test]
4512        async fn iam_require_identity_passes_with_valid_token() {
4513            let verifier = MockVerifier {
4514                identity: IamIdentity::new("alice").with_role("user"),
4515                should_fail: false,
4516            };
4517
4518            let axon = Axon::<i32, i32, TestInfallible>::new("IamTest")
4519                .with_iam(IamPolicy::RequireIdentity, verifier)
4520                .then(AddOne);
4521
4522            let mut bus = Bus::new();
4523            bus.insert(IamToken("valid-token".to_string()));
4524            let outcome = axon.execute(10, &(), &mut bus).await;
4525
4526            assert!(matches!(outcome, Outcome::Next(11)));
4527            // Verify IamIdentity was injected into Bus
4528            let identity = bus
4529                .read::<IamIdentity>()
4530                .expect("IamIdentity should be in Bus");
4531            assert_eq!(identity.subject, "alice");
4532        }
4533
4534        #[tokio::test]
4535        async fn iam_require_identity_rejects_missing_token() {
4536            let verifier = MockVerifier {
4537                identity: IamIdentity::new("ignored"),
4538                should_fail: false,
4539            };
4540
4541            let axon = Axon::<i32, i32, TestInfallible>::new("IamNoToken")
4542                .with_iam(IamPolicy::RequireIdentity, verifier)
4543                .then(AddOne);
4544
4545            let mut bus = Bus::new();
4546            // No IamToken inserted
4547            let outcome = axon.execute(10, &(), &mut bus).await;
4548
4549            // Should emit missing_token event
4550            match &outcome {
4551                Outcome::Emit(label, _) => {
4552                    assert_eq!(label, "iam.missing_token");
4553                }
4554                other => panic!("Expected Emit(iam.missing_token), got {:?}", other),
4555            }
4556        }
4557
4558        #[tokio::test]
4559        async fn iam_rejects_failed_verification() {
4560            let verifier = MockVerifier {
4561                identity: IamIdentity::new("ignored"),
4562                should_fail: true,
4563            };
4564
4565            let axon = Axon::<i32, i32, TestInfallible>::new("IamBadToken")
4566                .with_iam(IamPolicy::RequireIdentity, verifier)
4567                .then(AddOne);
4568
4569            let mut bus = Bus::new();
4570            bus.insert(IamToken("bad-token".to_string()));
4571            let outcome = axon.execute(10, &(), &mut bus).await;
4572
4573            match &outcome {
4574                Outcome::Emit(label, _) => {
4575                    assert_eq!(label, "iam.verification_failed");
4576                }
4577                other => panic!("Expected Emit(iam.verification_failed), got {:?}", other),
4578            }
4579        }
4580
4581        #[tokio::test]
4582        async fn iam_require_role_passes_with_matching_role() {
4583            let verifier = MockVerifier {
4584                identity: IamIdentity::new("bob").with_role("admin").with_role("user"),
4585                should_fail: false,
4586            };
4587
4588            let axon = Axon::<i32, i32, TestInfallible>::new("IamRole")
4589                .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
4590                .then(AddOne);
4591
4592            let mut bus = Bus::new();
4593            bus.insert(IamToken("token".to_string()));
4594            let outcome = axon.execute(5, &(), &mut bus).await;
4595
4596            assert!(matches!(outcome, Outcome::Next(6)));
4597        }
4598
4599        #[tokio::test]
4600        async fn iam_require_role_denies_without_role() {
4601            let verifier = MockVerifier {
4602                identity: IamIdentity::new("carol").with_role("user"),
4603                should_fail: false,
4604            };
4605
4606            let axon = Axon::<i32, i32, TestInfallible>::new("IamRoleDeny")
4607                .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
4608                .then(AddOne);
4609
4610            let mut bus = Bus::new();
4611            bus.insert(IamToken("token".to_string()));
4612            let outcome = axon.execute(5, &(), &mut bus).await;
4613
4614            match &outcome {
4615                Outcome::Emit(label, _) => {
4616                    assert_eq!(label, "iam.policy_denied");
4617                }
4618                other => panic!("Expected Emit(iam.policy_denied), got {:?}", other),
4619            }
4620        }
4621
4622        #[tokio::test]
4623        async fn iam_policy_none_skips_verification() {
4624            let verifier = MockVerifier {
4625                identity: IamIdentity::new("ignored"),
4626                should_fail: true, // would fail if actually called
4627            };
4628
4629            let axon = Axon::<i32, i32, TestInfallible>::new("IamNone")
4630                .with_iam(IamPolicy::None, verifier)
4631                .then(AddOne);
4632
4633            let mut bus = Bus::new();
4634            // No token needed when policy is None
4635            let outcome = axon.execute(10, &(), &mut bus).await;
4636
4637            assert!(matches!(outcome, Outcome::Next(11)));
4638        }
4639    }
4640
4641    // ── Schema Propagation Tests (M201-RQ9, RQ12) ──────────────────
4642
4643    #[derive(Clone)]
4644    struct SchemaTransition;
4645
4646    #[async_trait]
4647    impl Transition<String, String> for SchemaTransition {
4648        type Error = String;
4649        type Resources = ();
4650
4651        fn input_schema(&self) -> Option<serde_json::Value> {
4652            Some(serde_json::json!({
4653                "type": "object",
4654                "required": ["name"],
4655                "properties": {
4656                    "name": { "type": "string" }
4657                }
4658            }))
4659        }
4660
4661        async fn run(
4662            &self,
4663            state: String,
4664            _resources: &Self::Resources,
4665            _bus: &mut Bus,
4666        ) -> Outcome<String, Self::Error> {
4667            Outcome::Next(state)
4668        }
4669    }
4670
4671    #[test]
4672    fn then_auto_populates_input_schema_from_transition() {
4673        let axon = Axon::<String, String, String>::new("SchemaTest").then(SchemaTransition);
4674
4675        // The last node (added by .then()) should have input_schema
4676        let last_node = axon.schematic.nodes.last().unwrap();
4677        assert!(last_node.input_schema.is_some());
4678        let schema = last_node.input_schema.as_ref().unwrap();
4679        assert_eq!(schema["type"], "object");
4680        assert_eq!(schema["required"][0], "name");
4681    }
4682
4683    #[test]
4684    fn then_leaves_input_schema_none_when_not_provided() {
4685        let axon = Axon::<i32, i32, TestInfallible>::new("NoSchema").then(AddOne);
4686
4687        let last_node = axon.schematic.nodes.last().unwrap();
4688        assert!(last_node.input_schema.is_none());
4689    }
4690
4691    #[test]
4692    fn with_input_schema_value_sets_on_last_node() {
4693        let schema = serde_json::json!({"type": "integer"});
4694        let axon = Axon::<i32, i32, TestInfallible>::new("ManualSchema")
4695            .then(AddOne)
4696            .with_input_schema_value(schema.clone());
4697
4698        let last_node = axon.schematic.nodes.last().unwrap();
4699        assert_eq!(last_node.input_schema.as_ref().unwrap(), &schema);
4700    }
4701
4702    #[test]
4703    fn with_output_schema_value_sets_on_last_node() {
4704        let schema = serde_json::json!({"type": "integer"});
4705        let axon = Axon::<i32, i32, TestInfallible>::new("OutputSchema")
4706            .then(AddOne)
4707            .with_output_schema_value(schema.clone());
4708
4709        let last_node = axon.schematic.nodes.last().unwrap();
4710        assert_eq!(last_node.output_schema.as_ref().unwrap(), &schema);
4711    }
4712
4713    #[test]
4714    fn schematic_export_includes_schema_fields() {
4715        let axon = Axon::<String, String, String>::new("ExportTest")
4716            .then(SchemaTransition)
4717            .with_output_schema_value(serde_json::json!({"type": "string"}));
4718
4719        let json = serde_json::to_value(&axon.schematic).unwrap();
4720        let nodes = json["nodes"].as_array().unwrap();
4721        // Find the SchemaTransition node (last one)
4722        let last = nodes.last().unwrap();
4723        assert!(last.get("input_schema").is_some());
4724        assert_eq!(last["input_schema"]["type"], "object");
4725        assert_eq!(last["output_schema"]["type"], "string");
4726    }
4727
4728    #[test]
4729    fn schematic_export_omits_schema_fields_when_none() {
4730        let axon = Axon::<i32, i32, TestInfallible>::new("NoSchemaExport").then(AddOne);
4731
4732        let json = serde_json::to_value(&axon.schematic).unwrap();
4733        let nodes = json["nodes"].as_array().unwrap();
4734        let last = nodes.last().unwrap();
4735        let obj = last.as_object().unwrap();
4736        assert!(!obj.contains_key("input_schema"));
4737        assert!(!obj.contains_key("output_schema"));
4738    }
4739
4740    #[test]
4741    fn schematic_json_roundtrip_preserves_schemas() {
4742        let axon = Axon::<String, String, String>::new("Roundtrip")
4743            .then(SchemaTransition)
4744            .with_output_schema_value(serde_json::json!({"type": "string"}));
4745
4746        let json_str = serde_json::to_string(&axon.schematic).unwrap();
4747        let deserialized: ranvier_core::schematic::Schematic =
4748            serde_json::from_str(&json_str).unwrap();
4749
4750        let last = deserialized.nodes.last().unwrap();
4751        assert!(last.input_schema.is_some());
4752        assert!(last.output_schema.is_some());
4753        assert_eq!(last.input_schema.as_ref().unwrap()["required"][0], "name");
4754        assert_eq!(last.output_schema.as_ref().unwrap()["type"], "string");
4755    }
4756
4757    // Test transitions for new unit tests
4758    #[derive(Clone)]
4759    struct MultiplyByTwo;
4760
4761    #[async_trait]
4762    impl Transition<i32, i32> for MultiplyByTwo {
4763        type Error = TestInfallible;
4764        type Resources = ();
4765
4766        async fn run(
4767            &self,
4768            state: i32,
4769            _resources: &Self::Resources,
4770            _bus: &mut Bus,
4771        ) -> Outcome<i32, Self::Error> {
4772            Outcome::Next(state * 2)
4773        }
4774    }
4775
4776    #[derive(Clone)]
4777    struct AddTen;
4778
4779    #[async_trait]
4780    impl Transition<i32, i32> for AddTen {
4781        type Error = TestInfallible;
4782        type Resources = ();
4783
4784        async fn run(
4785            &self,
4786            state: i32,
4787            _resources: &Self::Resources,
4788            _bus: &mut Bus,
4789        ) -> Outcome<i32, Self::Error> {
4790            Outcome::Next(state + 10)
4791        }
4792    }
4793
4794    #[derive(Clone)]
4795    struct AddOneString;
4796
4797    #[async_trait]
4798    impl Transition<i32, i32> for AddOneString {
4799        type Error = String;
4800        type Resources = ();
4801
4802        async fn run(
4803            &self,
4804            state: i32,
4805            _resources: &Self::Resources,
4806            _bus: &mut Bus,
4807        ) -> Outcome<i32, Self::Error> {
4808            Outcome::Next(state + 1)
4809        }
4810    }
4811
4812    #[derive(Clone)]
4813    struct AddTenString;
4814
4815    #[async_trait]
4816    impl Transition<i32, i32> for AddTenString {
4817        type Error = String;
4818        type Resources = ();
4819
4820        async fn run(
4821            &self,
4822            state: i32,
4823            _resources: &Self::Resources,
4824            _bus: &mut Bus,
4825        ) -> Outcome<i32, Self::Error> {
4826            Outcome::Next(state + 10)
4827        }
4828    }
4829
4830    #[tokio::test]
4831    async fn axon_single_step_chain_executes_and_returns_next() {
4832        let mut bus = Bus::new();
4833        let axon = Axon::<i32, i32, TestInfallible>::start("SingleStep").then(AddOne);
4834
4835        let outcome = axon.execute(5, &(), &mut bus).await;
4836        assert!(matches!(outcome, Outcome::Next(6)));
4837    }
4838
4839    #[tokio::test]
4840    async fn axon_three_step_chain_executes_in_order() {
4841        let mut bus = Bus::new();
4842        let axon = Axon::<i32, i32, TestInfallible>::start("ThreeStep")
4843            .then(AddOne)
4844            .then(MultiplyByTwo)
4845            .then(AddTen);
4846
4847        // Starting with 5: AddOne -> 6, MultiplyByTwo -> 12, AddTen -> 22
4848        let outcome = axon.execute(5, &(), &mut bus).await;
4849        assert!(matches!(outcome, Outcome::Next(22)));
4850    }
4851
4852    #[tokio::test]
4853    async fn axon_with_fault_in_middle_step_propagates_error() {
4854        let mut bus = Bus::new();
4855
4856        // Create a 3-step chain where the middle step faults
4857        // Step 1: AddOneString (5 -> 6)
4858        // Step 2: AlwaysFault (should fault here)
4859        // Step 3: AddTenString (never reached)
4860        let axon = Axon::<i32, i32, String>::start("FaultInMiddle")
4861            .then(AddOneString)
4862            .then(AlwaysFault)
4863            .then(AddTenString);
4864
4865        let outcome = axon.execute(5, &(), &mut bus).await;
4866        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
4867    }
4868
4869    #[tokio::test]
4870    async fn fault_injects_transition_error_context_into_bus() {
4871        let mut bus = Bus::new();
4872
4873        // 3-step chain: AddOneString → AlwaysFault → AddTenString
4874        let axon = Axon::<i32, i32, String>::start("my-pipeline")
4875            .then(AddOneString)
4876            .then(AlwaysFault)
4877            .then(AddTenString);
4878
4879        let outcome = axon.execute(5, &(), &mut bus).await;
4880        assert!(matches!(outcome, Outcome::Fault(_)));
4881
4882        let ctx = bus
4883            .read::<ranvier_core::error::TransitionErrorContext>()
4884            .expect("TransitionErrorContext should be in Bus after fault");
4885        assert_eq!(ctx.pipeline_name, "my-pipeline");
4886        assert_eq!(ctx.transition_name, "AlwaysFault");
4887        assert_eq!(ctx.step_index, 2); // 0=ingress, 1=AddOneString, 2=AlwaysFault
4888    }
4889
4890    #[test]
4891    fn axon_schematic_has_correct_node_count_after_chaining() {
4892        let axon = Axon::<i32, i32, TestInfallible>::start("NodeCount")
4893            .then(AddOne)
4894            .then(MultiplyByTwo)
4895            .then(AddTen);
4896
4897        // Should have 4 nodes: ingress + 3 transitions
4898        assert_eq!(axon.schematic.nodes.len(), 4);
4899        assert_eq!(axon.schematic.name, "NodeCount");
4900    }
4901
4902    #[tokio::test]
4903    async fn axon_execution_records_timeline_events() {
4904        let mut bus = Bus::new();
4905        bus.insert(Timeline::new());
4906
4907        let axon = Axon::<i32, i32, TestInfallible>::start("TimelineTest")
4908            .then(AddOne)
4909            .then(MultiplyByTwo);
4910
4911        let outcome = axon.execute(3, &(), &mut bus).await;
4912        assert!(matches!(outcome, Outcome::Next(8))); // (3 + 1) * 2 = 8
4913
4914        let timeline = bus.read::<Timeline>().unwrap();
4915
4916        // Should have NodeEnter and NodeExit events
4917        let enter_count = timeline
4918            .events
4919            .iter()
4920            .filter(|e| matches!(e, TimelineEvent::NodeEnter { .. }))
4921            .count();
4922        let exit_count = timeline
4923            .events
4924            .iter()
4925            .filter(|e| matches!(e, TimelineEvent::NodeExit { .. }))
4926            .count();
4927
4928        // Expect at least 1 enter and 1 exit (ingress node)
4929        assert!(enter_count >= 1, "Should have at least 1 NodeEnter event");
4930        assert!(exit_count >= 1, "Should have at least 1 NodeExit event");
4931    }
4932
4933    // ── Parallel Step Tests (M231) ───────────────────────────────
4934
4935    #[tokio::test]
4936    async fn parallel_all_succeed_returns_first_next() {
4937        use super::ParallelStrategy;
4938
4939        let mut bus = Bus::new();
4940        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelAllSucceed")
4941            .parallel(
4942                vec![
4943                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4944                    Arc::new(MultiplyByTwo),
4945                ],
4946                ParallelStrategy::AllMustSucceed,
4947            );
4948
4949        // Input 5: AddOne -> 6, MultiplyByTwo -> 10.
4950        // AllMustSucceed returns the first Next (AddOne = 6).
4951        let outcome = axon.execute(5, &(), &mut bus).await;
4952        assert!(matches!(outcome, Outcome::Next(6)));
4953    }
4954
4955    #[tokio::test]
4956    async fn parallel_all_must_succeed_returns_fault_when_any_fails() {
4957        use super::ParallelStrategy;
4958
4959        let mut bus = Bus::new();
4960        let axon = Axon::<i32, i32, String>::start("ParallelAllFault")
4961            .parallel(
4962                vec![
4963                    Arc::new(AddOneString) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4964                    Arc::new(AlwaysFault),
4965                ],
4966                ParallelStrategy::AllMustSucceed,
4967            );
4968
4969        let outcome = axon.execute(5, &(), &mut bus).await;
4970        assert!(
4971            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4972            "Expected Fault(boom), got {:?}",
4973            outcome
4974        );
4975    }
4976
4977    #[tokio::test]
4978    async fn parallel_any_can_fail_returns_success_despite_fault() {
4979        use super::ParallelStrategy;
4980
4981        let mut bus = Bus::new();
4982        let axon = Axon::<i32, i32, String>::start("ParallelAnyCanFail")
4983            .parallel(
4984                vec![
4985                    Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4986                    Arc::new(AddOneString),
4987                ],
4988                ParallelStrategy::AnyCanFail,
4989            );
4990
4991        // AlwaysFault faults, but AddOneString succeeds (5 + 1 = 6).
4992        let outcome = axon.execute(5, &(), &mut bus).await;
4993        assert!(
4994            matches!(outcome, Outcome::Next(6)),
4995            "Expected Next(6), got {:?}",
4996            outcome
4997        );
4998    }
4999
5000    #[tokio::test]
5001    async fn parallel_any_can_fail_all_fault_returns_first_fault() {
5002        use super::ParallelStrategy;
5003
5004        #[derive(Clone)]
5005        struct AlwaysFault2;
5006        #[async_trait]
5007        impl Transition<i32, i32> for AlwaysFault2 {
5008            type Error = String;
5009            type Resources = ();
5010            async fn run(&self, _state: i32, _resources: &(), _bus: &mut Bus) -> Outcome<i32, String> {
5011                Outcome::Fault("boom2".to_string())
5012            }
5013        }
5014
5015        let mut bus = Bus::new();
5016        let axon = Axon::<i32, i32, String>::start("ParallelAllFault2")
5017            .parallel(
5018                vec![
5019                    Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
5020                    Arc::new(AlwaysFault2),
5021                ],
5022                ParallelStrategy::AnyCanFail,
5023            );
5024
5025        let outcome = axon.execute(5, &(), &mut bus).await;
5026        // Should return the first fault
5027        assert!(
5028            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
5029            "Expected Fault(boom), got {:?}",
5030            outcome
5031        );
5032    }
5033
5034    #[test]
5035    fn parallel_schematic_has_fanout_fanin_nodes() {
5036        use super::ParallelStrategy;
5037        use ranvier_core::schematic::{EdgeType, NodeKind};
5038
5039        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelSchematic")
5040            .parallel(
5041                vec![
5042                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
5043                    Arc::new(MultiplyByTwo),
5044                    Arc::new(AddTen),
5045                ],
5046                ParallelStrategy::AllMustSucceed,
5047            );
5048
5049        // Should have: Ingress + FanOut + 3 branch Atoms + FanIn = 6 nodes
5050        assert_eq!(axon.schematic.nodes.len(), 6);
5051        assert!(matches!(axon.schematic.nodes[1].kind, NodeKind::FanOut));
5052        assert!(matches!(axon.schematic.nodes[2].kind, NodeKind::Atom));
5053        assert!(matches!(axon.schematic.nodes[3].kind, NodeKind::Atom));
5054        assert!(matches!(axon.schematic.nodes[4].kind, NodeKind::Atom));
5055        assert!(matches!(axon.schematic.nodes[5].kind, NodeKind::FanIn));
5056
5057        // Check FanOut description
5058        assert!(axon.schematic.nodes[1]
5059            .description
5060            .as_ref()
5061            .unwrap()
5062            .contains("3 branches"));
5063
5064        // Check parallel edges from FanOut to branches
5065        let parallel_edges: Vec<_> = axon
5066            .schematic
5067            .edges
5068            .iter()
5069            .filter(|e| matches!(e.kind, EdgeType::Parallel))
5070            .collect();
5071        // 3 from FanOut->branches + 3 from branches->FanIn = 6
5072        assert_eq!(parallel_edges.len(), 6);
5073    }
5074
5075    #[tokio::test]
5076    async fn parallel_then_chain_composes_correctly() {
5077        use super::ParallelStrategy;
5078
5079        let mut bus = Bus::new();
5080        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelThenChain")
5081            .then(AddOne)
5082            .parallel(
5083                vec![
5084                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
5085                    Arc::new(MultiplyByTwo),
5086                ],
5087                ParallelStrategy::AllMustSucceed,
5088            )
5089            .then(AddTen);
5090
5091        // 5 -> AddOne -> 6 -> Parallel(AddOne=7, MultiplyByTwo=12) -> first=7 -> AddTen -> 17
5092        let outcome = axon.execute(5, &(), &mut bus).await;
5093        assert!(
5094            matches!(outcome, Outcome::Next(17)),
5095            "Expected Next(17), got {:?}",
5096            outcome
5097        );
5098    }
5099
5100    #[tokio::test]
5101    async fn parallel_records_timeline_events() {
5102        use super::ParallelStrategy;
5103        use ranvier_core::timeline::TimelineEvent;
5104
5105        let mut bus = Bus::new();
5106        bus.insert(Timeline::new());
5107
5108        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelTimeline")
5109            .parallel(
5110                vec![
5111                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
5112                    Arc::new(MultiplyByTwo),
5113                ],
5114                ParallelStrategy::AllMustSucceed,
5115            );
5116
5117        let outcome = axon.execute(3, &(), &mut bus).await;
5118        assert!(matches!(outcome, Outcome::Next(4)));
5119
5120        let timeline = bus.read::<Timeline>().unwrap();
5121
5122        // Check for FanOut enter/exit and FanIn enter/exit
5123        let fanout_enters = timeline
5124            .events
5125            .iter()
5126            .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanOut"))
5127            .count();
5128        let fanin_enters = timeline
5129            .events
5130            .iter()
5131            .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanIn"))
5132            .count();
5133
5134        assert_eq!(fanout_enters, 1, "Should have 1 FanOut enter");
5135        assert_eq!(fanin_enters, 1, "Should have 1 FanIn enter");
5136    }
5137
5138    // ── Axon::simple() convenience constructor ───────────────────────────────
5139
5140    #[derive(Clone)]
5141    struct Greet;
5142
5143    #[async_trait]
5144    impl Transition<(), String> for Greet {
5145        type Error = String;
5146        type Resources = ();
5147
5148        async fn run(
5149            &self,
5150            _state: (),
5151            _resources: &Self::Resources,
5152            _bus: &mut Bus,
5153        ) -> Outcome<String, Self::Error> {
5154            Outcome::Next("Hello from simple!".to_string())
5155        }
5156    }
5157
5158    #[tokio::test]
5159    async fn axon_simple_creates_pipeline() {
5160        let axon = Axon::simple::<String>("SimpleTest").then(Greet);
5161
5162        let mut bus = Bus::new();
5163        let result = axon.execute((), &(), &mut bus).await;
5164
5165        match result {
5166            Outcome::Next(msg) => assert_eq!(msg, "Hello from simple!"),
5167            other => panic!("Expected Outcome::Next, got {:?}", other),
5168        }
5169    }
5170
5171    #[tokio::test]
5172    async fn axon_simple_equivalent_to_explicit() {
5173        // Axon::simple::<E>("label") should behave identically to Axon::<(), (), E>::new("label")
5174        let simple = Axon::simple::<String>("Equiv").then(Greet);
5175        let explicit = Axon::<(), (), String>::new("Equiv").then(Greet);
5176
5177        let mut bus1 = Bus::new();
5178        let mut bus2 = Bus::new();
5179
5180        let r1 = simple.execute((), &(), &mut bus1).await;
5181        let r2 = explicit.execute((), &(), &mut bus2).await;
5182
5183        match (r1, r2) {
5184            (Outcome::Next(a), Outcome::Next(b)) => assert_eq!(a, b),
5185            _ => panic!("Both should produce Outcome::Next"),
5186        }
5187    }
5188
5189    #[tokio::test]
5190    async fn then_fn_closure_transition() {
5191        let axon = Axon::simple::<String>("ClosureTest")
5192            .then_fn("to_greeting", |_input: (), _bus: &mut Bus| {
5193                Outcome::next("hello from closure".to_string())
5194            });
5195
5196        let mut bus = Bus::new();
5197        let result = axon.execute((), &(), &mut bus).await;
5198
5199        match result {
5200            Outcome::Next(msg) => assert_eq!(msg, "hello from closure"),
5201            other => panic!("Expected Outcome::Next, got {:?}", other),
5202        }
5203    }
5204
5205    #[tokio::test]
5206    async fn then_fn_reads_bus() {
5207        let axon = Axon::simple::<String>("BusReadClosure")
5208            .then_fn("check_score", |_input: (), bus: &mut Bus| {
5209                let score = bus.read::<u32>().copied().unwrap_or(0);
5210                if score > 75 {
5211                    Outcome::next("REJECTED".to_string())
5212                } else {
5213                    Outcome::next("APPROVED".to_string())
5214                }
5215            });
5216
5217        let mut bus = Bus::new();
5218        bus.insert(80u32);
5219        let result = axon.execute((), &(), &mut bus).await;
5220        match result {
5221            Outcome::Next(msg) => assert_eq!(msg, "REJECTED"),
5222            other => panic!("Expected REJECTED, got {:?}", other),
5223        }
5224    }
5225
5226    #[tokio::test]
5227    async fn then_fn_mixed_with_transition() {
5228        // Closure and macro Transition in the same chain
5229        let axon = Axon::simple::<String>("MixedPipeline")
5230            .then(Greet)
5231            .then_fn("uppercase", |input: String, _bus: &mut Bus| {
5232                Outcome::next(input.to_uppercase())
5233            });
5234
5235        let mut bus = Bus::new();
5236        let result = axon.execute((), &(), &mut bus).await;
5237        match result {
5238            Outcome::Next(msg) => assert_eq!(msg, "HELLO FROM SIMPLE!"),
5239            other => panic!("Expected uppercase greeting, got {:?}", other),
5240        }
5241    }
5242
5243    #[tokio::test]
5244    async fn then_fn_schematic_label() {
5245        let axon = Axon::simple::<String>("SchematicTest")
5246            .then_fn("my_custom_label", |_: (), _: &mut Bus| {
5247                Outcome::next("ok".to_string())
5248            });
5249
5250        // Node 0 is the identity start node, node 1 is our closure
5251        assert_eq!(axon.schematic.nodes.len(), 2);
5252        assert_eq!(axon.schematic.nodes[1].label, "my_custom_label");
5253    }
5254}