Skip to main content

ranvier_runtime/
axon.rs

1//! # Axon: Executable Decision Tree
2//!
3//! The `Axon` is the **runtime execution path** of a Typed Decision Tree.
4//! It functions as a reusable Typed Decision flow (Axon<In, Out, E>).
5//!
6//! ## Design Philosophy
7//!
8//! * **Axon flows, Schematic shows**: Axon executes; Schematic describes
9//! * **Builder pattern**: `Axon::start().then().then()`
10//! * **Schematic extraction**: Every Axon carries its structural metadata
11//!
12//! "Axon is the flowing thing, Schematic is the visible thing."
13
14use crate::persistence::{
15    CompensationAutoTrigger, CompensationContext, CompensationHandle,
16    CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17    PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use async_trait::async_trait;
20use ranvier_audit::{AuditEvent, AuditSink};
21use ranvier_core::bus::Bus;
22use ranvier_core::cluster::DistributedLock;
23use ranvier_core::event::{DlqPolicy, DlqSink};
24use ranvier_core::outcome::Outcome;
25use ranvier_core::policy::DynamicPolicy;
26use ranvier_core::saga::{SagaPolicy, SagaStack};
27use ranvier_core::schematic::{
28    BusCapabilitySchema, Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation,
29};
30use ranvier_core::timeline::{Timeline, TimelineEvent};
31use ranvier_core::transition::Transition;
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use std::any::type_name;
35use std::ffi::OsString;
36use std::fs;
37use std::future::Future;
38use std::panic::Location;
39use std::path::{Path, PathBuf};
40use std::pin::Pin;
41use std::sync::{Arc, Mutex, OnceLock};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tracing::Instrument;
44
45/// Configuration for Execution Mode.
46#[derive(Clone)]
47pub enum ExecutionMode {
48    /// Normal, unpartitioned local execution.
49    Local,
50    /// Singleton execution, ensures only one instance runs across the entire cluster.
51    Singleton {
52        lock_key: String,
53        ttl_ms: u64,
54        lock_provider: Arc<dyn DistributedLock>,
55    },
56}
57
58/// Strategy for parallel step execution.
59///
60/// Controls how the Axon handles faults during parallel branch execution.
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum ParallelStrategy {
63    /// All parallel steps must succeed; if any faults, return the first fault.
64    AllMustSucceed,
65    /// Continue even if some steps fail; return first successful result.
66    /// If all branches fault, returns the first fault.
67    AnyCanFail,
68}
69
70/// Type alias for async boxed futures used in Axon execution.
71pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
72
73/// Executor type for Axon steps.
74/// Now takes an input state `In`, a resource bundle `Res`, and returns an `Outcome<Out, E>`.
75/// Must be Send + Sync to be reusable across threads and clones.
76pub type Executor<In, Out, E, Res> =
77    Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
78
79/// Manual intervention jump command injected into the Bus.
80#[derive(Debug, Clone)]
81pub struct ManualJump {
82    pub target_node: String,
83    pub payload_override: Option<serde_json::Value>,
84}
85
86/// Start step index for resumption, injected into the Bus.
87#[derive(Debug, Clone, Copy)]
88struct StartStep(u64);
89
90/// Persisted state for resumption, injected into the Bus.
91#[derive(Debug, Clone)]
92struct ResumptionState {
93    payload: Option<serde_json::Value>,
94}
95
96/// Helper to extract a readable type name from a type.
97fn type_name_of<T: ?Sized>() -> String {
98    let full = type_name::<T>();
99    full.split("::").last().unwrap_or(full).to_string()
100}
101
102/// The Axon Builder and Runtime.
103///
104/// `Axon` represents an executable decision tree.
105/// It is reusable and thread-safe.
106///
107/// ## Example
108///
109/// ```rust,ignore
110/// use ranvier_core::prelude::*;
111/// // ...
112/// // Start with an identity Axon (In -> In)
113/// let axon = Axon::<(), (), _>::new("My Axon")
114///     .then(StepA)
115///     .then(StepB);
116///
117/// // Execute multiple times
118/// let res1 = axon.execute((), &mut bus1).await;
119/// let res2 = axon.execute((), &mut bus2).await;
120/// ```
121pub struct Axon<In, Out, E, Res = ()> {
122    /// The static structure (for visualization/analysis)
123    pub schematic: Schematic,
124    /// The runtime executor
125    executor: Executor<In, Out, E, Res>,
126    /// How this Axon is executed across the cluster
127    pub execution_mode: ExecutionMode,
128    /// Optional persistence store for state inspection
129    pub persistence_store: Option<Arc<dyn crate::persistence::PersistenceStore>>,
130    /// Optional audit sink for tamper-evident logging of interventions
131    pub audit_sink: Option<Arc<dyn AuditSink>>,
132    /// Optional dead-letter queue sink for storing failed events
133    pub dlq_sink: Option<Arc<dyn DlqSink>>,
134    /// Policy for handling event failures
135    pub dlq_policy: DlqPolicy,
136    /// Optional dynamic (hot-reloadable) DLQ policy — takes precedence over static `dlq_policy`
137    pub dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>,
138    /// Policy for automated saga compensation
139    pub saga_policy: SagaPolicy,
140    /// Optional dynamic (hot-reloadable) Saga policy — takes precedence over static `saga_policy`
141    pub dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>,
142    /// Registry for Saga compensation handlers
143    pub saga_compensation_registry:
144        Arc<std::sync::RwLock<ranvier_core::saga::SagaCompensationRegistry<E, Res>>>,
145    /// Optional IAM handle for identity verification at the Schematic boundary
146    pub iam_handle: Option<ranvier_core::iam::IamHandle>,
147}
148
149/// Schematic export request derived from command-line args/env.
150#[derive(Debug, Clone)]
151pub struct SchematicExportRequest {
152    /// Optional output file path. If omitted, schematic is written to stdout.
153    pub output: Option<PathBuf>,
154}
155
156impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
157    fn clone(&self) -> Self {
158        Self {
159            schematic: self.schematic.clone(),
160            executor: self.executor.clone(),
161            execution_mode: self.execution_mode.clone(),
162            persistence_store: self.persistence_store.clone(),
163            audit_sink: self.audit_sink.clone(),
164            dlq_sink: self.dlq_sink.clone(),
165            dlq_policy: self.dlq_policy.clone(),
166            dynamic_dlq_policy: self.dynamic_dlq_policy.clone(),
167            saga_policy: self.saga_policy.clone(),
168            dynamic_saga_policy: self.dynamic_saga_policy.clone(),
169            saga_compensation_registry: self.saga_compensation_registry.clone(),
170            iam_handle: self.iam_handle.clone(),
171        }
172    }
173}
174
175impl<In, E, Res> Axon<In, In, E, Res>
176where
177    In: Send + Sync + Serialize + DeserializeOwned + 'static,
178    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
179    Res: ranvier_core::transition::ResourceRequirement,
180{
181    /// Create a new Axon flow with the given label.
182    /// This is the preferred entry point per Flat API guidelines.
183    #[track_caller]
184    pub fn new(label: &str) -> Self {
185        let caller = Location::caller();
186        Self::start_with_source(label, caller)
187    }
188
189    /// Start defining a new Axon flow.
190    /// This creates an Identity Axon (In -> In) with no initial resource requirements.
191    #[track_caller]
192    pub fn start(label: &str) -> Self {
193        let caller = Location::caller();
194        Self::start_with_source(label, caller)
195    }
196
197    fn start_with_source(label: &str, caller: &'static Location<'static>) -> Self {
198        let node_id = uuid::Uuid::new_v4().to_string();
199        let node = Node {
200            id: node_id,
201            kind: NodeKind::Ingress,
202            label: label.to_string(),
203            description: None,
204            input_type: "void".to_string(),
205            output_type: type_name_of::<In>(),
206            resource_type: type_name_of::<Res>(),
207            metadata: Default::default(),
208            bus_capability: None,
209            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
210            position: None,
211            compensation_node_id: None,
212            input_schema: None,
213            output_schema: None,
214        };
215
216        let mut schematic = Schematic::new(label);
217        schematic.nodes.push(node);
218
219        let executor: Executor<In, In, E, Res> =
220            Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
221
222        Self {
223            schematic,
224            executor,
225            execution_mode: ExecutionMode::Local,
226            persistence_store: None,
227            audit_sink: None,
228            dlq_sink: None,
229            dlq_policy: DlqPolicy::default(),
230            dynamic_dlq_policy: None,
231            saga_policy: SagaPolicy::default(),
232            dynamic_saga_policy: None,
233            saga_compensation_registry: Arc::new(std::sync::RwLock::new(
234                ranvier_core::saga::SagaCompensationRegistry::new(),
235            )),
236            iam_handle: None,
237        }
238    }
239}
240
241impl<In, Out, E, Res> Axon<In, Out, E, Res>
242where
243    In: Send + Sync + Serialize + DeserializeOwned + 'static,
244    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
245    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
246    Res: ranvier_core::transition::ResourceRequirement,
247{
248    /// Update the Execution Mode for this Axon (e.g., Local vs Singleton).
249    pub fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
250        self.execution_mode = mode;
251        self
252    }
253
254    /// Set the schematic version for this Axon.
255    pub fn with_version(mut self, version: impl Into<String>) -> Self {
256        self.schematic.schema_version = version.into();
257        self
258    }
259
260    /// Attach a persistence store to enable state inspection via the Inspector.
261    pub fn with_persistence_store<S>(mut self, store: S) -> Self
262    where
263        S: crate::persistence::PersistenceStore + 'static,
264    {
265        self.persistence_store = Some(Arc::new(store));
266        self
267    }
268
269    /// Attach an audit sink for tamper-evident logging.
270    pub fn with_audit_sink<S>(mut self, sink: S) -> Self
271    where
272        S: AuditSink + 'static,
273    {
274        self.audit_sink = Some(Arc::new(sink));
275        self
276    }
277
278    /// Set the Dead Letter Queue sink for this Axon.
279    pub fn with_dlq_sink<S>(mut self, sink: S) -> Self
280    where
281        S: DlqSink + 'static,
282    {
283        self.dlq_sink = Some(Arc::new(sink));
284        self
285    }
286
287    /// Set the Dead Letter Queue policy for this Axon.
288    pub fn with_dlq_policy(mut self, policy: DlqPolicy) -> Self {
289        self.dlq_policy = policy;
290        self
291    }
292
293    /// Set the Saga compensation policy for this Axon.
294    pub fn with_saga_policy(mut self, policy: SagaPolicy) -> Self {
295        self.saga_policy = policy;
296        self
297    }
298
299    /// Set a dynamic (hot-reloadable) DLQ policy. When set, the dynamic policy's
300    /// current value is read at each execution, overriding the static `dlq_policy`.
301    pub fn with_dynamic_dlq_policy(mut self, dynamic: DynamicPolicy<DlqPolicy>) -> Self {
302        self.dynamic_dlq_policy = Some(dynamic);
303        self
304    }
305
306    /// Set a dynamic (hot-reloadable) Saga policy. When set, the dynamic policy's
307    /// current value is read at each execution, overriding the static `saga_policy`.
308    pub fn with_dynamic_saga_policy(mut self, dynamic: DynamicPolicy<SagaPolicy>) -> Self {
309        self.dynamic_saga_policy = Some(dynamic);
310        self
311    }
312
313    /// Set an IAM policy and verifier for identity verification at the Axon boundary.
314    ///
315    /// When set, `execute()` will:
316    /// 1. Read `IamToken` from the Bus (injected by the HTTP layer or test harness)
317    /// 2. Verify the token using the provided verifier
318    /// 3. Enforce the policy against the verified identity
319    /// 4. Insert the resulting `IamIdentity` into the Bus for downstream Transitions
320    pub fn with_iam(
321        mut self,
322        policy: ranvier_core::iam::IamPolicy,
323        verifier: impl ranvier_core::iam::IamVerifier + 'static,
324    ) -> Self {
325        self.iam_handle = Some(ranvier_core::iam::IamHandle::new(
326            policy,
327            Arc::new(verifier),
328        ));
329        self
330    }
331
332    /// Attach a JSON Schema for the **last node's input type** in the schematic.
333    ///
334    /// Requires the `schema` feature and `T: schemars::JsonSchema`.
335    ///
336    /// ```rust,ignore
337    /// let axon = Axon::new("My Circuit")
338    ///     .then(ProcessStep)
339    ///     .with_input_schema::<CreateUserRequest>()
340    ///     .with_output_schema::<UserResponse>();
341    /// ```
342    #[cfg(feature = "schema")]
343    pub fn with_input_schema<T>(mut self) -> Self
344    where
345        T: schemars::JsonSchema,
346    {
347        if let Some(last_node) = self.schematic.nodes.last_mut() {
348            let schema = schemars::schema_for!(T);
349            last_node.input_schema =
350                Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
351        }
352        self
353    }
354
355    /// Attach a JSON Schema for the **last node's output type** in the schematic.
356    ///
357    /// Requires the `schema` feature and `T: schemars::JsonSchema`.
358    #[cfg(feature = "schema")]
359    pub fn with_output_schema<T>(mut self) -> Self
360    where
361        T: schemars::JsonSchema,
362    {
363        if let Some(last_node) = self.schematic.nodes.last_mut() {
364            let schema = schemars::schema_for!(T);
365            last_node.output_schema =
366                Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
367        }
368        self
369    }
370
371    /// Attach a raw JSON Schema value for the **last node's input type** in the schematic.
372    ///
373    /// Use this for pre-built schemas without requiring the `schema` feature.
374    pub fn with_input_schema_value(mut self, schema: serde_json::Value) -> Self {
375        if let Some(last_node) = self.schematic.nodes.last_mut() {
376            last_node.input_schema = Some(schema);
377        }
378        self
379    }
380
381    /// Attach a raw JSON Schema value for the **last node's output type** in the schematic.
382    pub fn with_output_schema_value(mut self, schema: serde_json::Value) -> Self {
383        if let Some(last_node) = self.schematic.nodes.last_mut() {
384            last_node.output_schema = Some(schema);
385        }
386        self
387    }
388}
389
390#[async_trait]
391impl<In, Out, E, Res> ranvier_inspector::StateInspector for Axon<In, Out, E, Res>
392where
393    In: Send + Sync + Serialize + DeserializeOwned + 'static,
394    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
395    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
396    Res: ranvier_core::transition::ResourceRequirement,
397{
398    async fn get_state(&self, trace_id: &str) -> Option<serde_json::Value> {
399        let store = self.persistence_store.as_ref()?;
400        let trace = store.load(trace_id).await.ok().flatten()?;
401        Some(serde_json::to_value(trace).unwrap_or(serde_json::Value::Null))
402    }
403
404    async fn force_resume(
405        &self,
406        trace_id: &str,
407        target_node: &str,
408        payload_override: Option<Value>,
409    ) -> Result<(), String> {
410        let store = self
411            .persistence_store
412            .as_ref()
413            .ok_or("No persistence store attached to Axon")?;
414
415        let intervention = crate::persistence::Intervention {
416            target_node: target_node.to_string(),
417            payload_override,
418            timestamp_ms: now_ms(),
419        };
420
421        store
422            .save_intervention(trace_id, intervention)
423            .await
424            .map_err(|e| format!("Failed to save intervention: {}", e))?;
425
426        if let Some(sink) = self.audit_sink.as_ref() {
427            let event = AuditEvent::new(
428                uuid::Uuid::new_v4().to_string(),
429                "Inspector".to_string(),
430                "ForceResume".to_string(),
431                trace_id.to_string(),
432            )
433            .with_metadata("target_node", target_node);
434
435            let _ = sink.append(&event).await;
436        }
437
438        tracing::info!(trace_id = %trace_id, target_node = %target_node, "Force resume requested via Inspector");
439        Ok(())
440    }
441}
442
443impl<In, Out, E, Res> Axon<In, Out, E, Res>
444where
445    In: Send + Sync + Serialize + DeserializeOwned + 'static,
446    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
447    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
448    Res: ranvier_core::transition::ResourceRequirement,
449{
450    /// Chain a transition to this Axon.
451    ///
452    /// Requires the transition to use the SAME resource bundle as the previous steps.
453    #[track_caller]
454    pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
455    where
456        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
457        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
458    {
459        let caller = Location::caller();
460        // Decompose self to avoid partial move issues
461        let Axon {
462            mut schematic,
463            executor: prev_executor,
464            execution_mode,
465            persistence_store,
466            audit_sink,
467            dlq_sink,
468            dlq_policy,
469            dynamic_dlq_policy,
470            saga_policy,
471            dynamic_saga_policy,
472            saga_compensation_registry,
473            iam_handle,
474        } = self;
475
476        // Update Schematic
477        let next_node_id = uuid::Uuid::new_v4().to_string();
478        let next_node = Node {
479            id: next_node_id.clone(),
480            kind: NodeKind::Atom,
481            label: transition.label(),
482            description: transition.description(),
483            input_type: type_name_of::<Out>(),
484            output_type: type_name_of::<Next>(),
485            resource_type: type_name_of::<Res>(),
486            metadata: Default::default(),
487            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
488            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
489            position: transition
490                .position()
491                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
492            compensation_node_id: None,
493            input_schema: transition.input_schema(),
494            output_schema: None,
495        };
496
497        let last_node_id = schematic
498            .nodes
499            .last()
500            .map(|n| n.id.clone())
501            .unwrap_or_default();
502
503        schematic.nodes.push(next_node);
504        schematic.edges.push(Edge {
505            from: last_node_id,
506            to: next_node_id.clone(),
507            kind: EdgeType::Linear,
508            label: Some("Next".to_string()),
509        });
510
511        // Compose Executor
512        let node_id_for_exec = next_node_id.clone();
513        let node_label_for_exec = transition.label();
514        let bus_policy_for_exec = transition.bus_access_policy();
515        let bus_policy_clone = bus_policy_for_exec.clone();
516        let current_step_idx = schematic.nodes.len() as u64 - 1;
517        let next_executor: Executor<In, Next, E, Res> = Arc::new(
518            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
519                let prev = prev_executor.clone();
520                let trans = transition.clone();
521                let timeline_node_id = node_id_for_exec.clone();
522                let timeline_node_label = node_label_for_exec.clone();
523                let transition_bus_policy = bus_policy_clone.clone();
524                let step_idx = current_step_idx;
525
526                Box::pin(async move {
527                    // Check for manual intervention jump
528                    if let Some(jump) = bus.read::<ManualJump>()
529                        && (jump.target_node == timeline_node_id
530                            || jump.target_node == timeline_node_label)
531                    {
532                        tracing::info!(
533                            node_id = %timeline_node_id,
534                            node_label = %timeline_node_label,
535                            "Manual jump target reached; skipping previous steps"
536                        );
537
538                        let state = if let Some(ow) = jump.payload_override.clone() {
539                            match serde_json::from_value::<Out>(ow) {
540                                Ok(s) => s,
541                                Err(e) => {
542                                    tracing::error!(
543                                        "Payload override deserialization failed: {}",
544                                        e
545                                    );
546                                    return Outcome::emit(
547                                        "execution.jump.payload_error",
548                                        Some(serde_json::json!({"error": e.to_string()})),
549                                    )
550                                    .map(|_: ()| unreachable!());
551                                }
552                            }
553                        } else {
554                            // Default back to the provided input if this is an identity jump or types match
555                            // For now, treat missing payload on a mid-flow jump as an avoidable error if Possible.
556                            // In a better implementation, we'd try to load the last persisted Out for the previous step.
557                            return Outcome::emit(
558                                "execution.jump.missing_payload",
559                                Some(serde_json::json!({"node_id": timeline_node_id})),
560                            );
561                        };
562
563                        // Skip prev() and continue with trans.run(state, ...)
564                        return run_this_step::<Out, Next, E, Res>(
565                            &trans,
566                            state,
567                            res,
568                            bus,
569                            &timeline_node_id,
570                            &timeline_node_label,
571                            &transition_bus_policy,
572                            step_idx,
573                        )
574                        .await;
575                    }
576
577                    // Handle resumption skip
578                    if let Some(start) = bus.read::<StartStep>()
579                        && step_idx == start.0
580                        && bus.read::<ResumptionState>().is_some()
581                    {
582                        // Prefer fresh input (In → Out via JSON round-trip).
583                        // The caller provides updated state (e.g., corrected data after a fault).
584                        // Falls back to persisted checkpoint state when types are incompatible.
585                        let fresh_state = serde_json::to_value(&input)
586                            .ok()
587                            .and_then(|v| serde_json::from_value::<Out>(v).ok());
588                        let persisted_state = bus
589                            .read::<ResumptionState>()
590                            .and_then(|r| r.payload.clone())
591                            .and_then(|p| serde_json::from_value::<Out>(p).ok());
592
593                        if let Some(s) = fresh_state.or(persisted_state) {
594                            tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint");
595                            return run_this_step::<Out, Next, E, Res>(
596                                &trans,
597                                s,
598                                res,
599                                bus,
600                                &timeline_node_id,
601                                &timeline_node_label,
602                                &transition_bus_policy,
603                                step_idx,
604                            )
605                            .await;
606                        }
607
608                        return Outcome::emit(
609                            "execution.resumption.payload_error",
610                            Some(serde_json::json!({"error": "no compatible resumption state"})),
611                        )
612                        .map(|_: ()| unreachable!());
613                    }
614
615                    // Run previous step
616                    let prev_result = prev(input, res, bus).await;
617
618                    // Unpack
619                    let state = match prev_result {
620                        Outcome::Next(t) => t,
621                        other => return other.map(|_| unreachable!()),
622                    };
623
624                    run_this_step::<Out, Next, E, Res>(
625                        &trans,
626                        state,
627                        res,
628                        bus,
629                        &timeline_node_id,
630                        &timeline_node_label,
631                        &transition_bus_policy,
632                        step_idx,
633                    )
634                    .await
635                })
636            },
637        );
638        Axon {
639            schematic,
640            executor: next_executor,
641            execution_mode,
642            persistence_store,
643            audit_sink,
644            dlq_sink,
645            dlq_policy,
646            dynamic_dlq_policy,
647            saga_policy,
648            dynamic_saga_policy,
649            saga_compensation_registry,
650            iam_handle,
651        }
652    }
653
654    /// Chain a transition with a retry policy.
655    ///
656    /// If the transition returns `Outcome::Fault`, it will be retried up to
657    /// `policy.max_retries` times with the configured backoff strategy.
658    /// Timeline events are recorded for each retry attempt.
659    ///
660    /// # Example
661    ///
662    /// ```rust,ignore
663    /// use ranvier_runtime::{Axon, RetryPolicy};
664    /// use std::time::Duration;
665    ///
666    /// let axon = Axon::new("pipeline")
667    ///     .then_with_retry(my_transition, RetryPolicy::fixed(3, Duration::from_millis(100)));
668    /// ```
669    #[track_caller]
670    pub fn then_with_retry<Next, Trans>(
671        self,
672        transition: Trans,
673        policy: crate::retry::RetryPolicy,
674    ) -> Axon<In, Next, E, Res>
675    where
676        Out: Clone,
677        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
678        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
679    {
680        let caller = Location::caller();
681        let Axon {
682            mut schematic,
683            executor: prev_executor,
684            execution_mode,
685            persistence_store,
686            audit_sink,
687            dlq_sink,
688            dlq_policy,
689            dynamic_dlq_policy,
690            saga_policy,
691            dynamic_saga_policy,
692            saga_compensation_registry,
693            iam_handle,
694        } = self;
695
696        let next_node_id = uuid::Uuid::new_v4().to_string();
697        let next_node = Node {
698            id: next_node_id.clone(),
699            kind: NodeKind::Atom,
700            label: transition.label(),
701            description: transition.description(),
702            input_type: type_name_of::<Out>(),
703            output_type: type_name_of::<Next>(),
704            resource_type: type_name_of::<Res>(),
705            metadata: Default::default(),
706            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
707            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
708            position: transition
709                .position()
710                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
711            compensation_node_id: None,
712            input_schema: transition.input_schema(),
713            output_schema: None,
714        };
715
716        let last_node_id = schematic
717            .nodes
718            .last()
719            .map(|n| n.id.clone())
720            .unwrap_or_default();
721
722        schematic.nodes.push(next_node);
723        schematic.edges.push(Edge {
724            from: last_node_id,
725            to: next_node_id.clone(),
726            kind: EdgeType::Linear,
727            label: Some("Next (retryable)".to_string()),
728        });
729
730        let node_id_for_exec = next_node_id.clone();
731        let node_label_for_exec = transition.label();
732        let bus_policy_for_exec = transition.bus_access_policy();
733        let bus_policy_clone = bus_policy_for_exec.clone();
734        let current_step_idx = schematic.nodes.len() as u64 - 1;
735        let next_executor: Executor<In, Next, E, Res> = Arc::new(
736            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
737                let prev = prev_executor.clone();
738                let trans = transition.clone();
739                let timeline_node_id = node_id_for_exec.clone();
740                let timeline_node_label = node_label_for_exec.clone();
741                let transition_bus_policy = bus_policy_clone.clone();
742                let step_idx = current_step_idx;
743                let retry_policy = policy.clone();
744
745                Box::pin(async move {
746                    // Run previous step
747                    let prev_result = prev(input, res, bus).await;
748                    let state = match prev_result {
749                        Outcome::Next(t) => t,
750                        other => return other.map(|_| unreachable!()),
751                    };
752
753                    // Attempt with retries
754                    let mut last_result = None;
755                    for attempt in 0..=retry_policy.max_retries {
756                        let attempt_state = state.clone();
757
758                        let result = run_this_step::<Out, Next, E, Res>(
759                            &trans,
760                            attempt_state,
761                            res,
762                            bus,
763                            &timeline_node_id,
764                            &timeline_node_label,
765                            &transition_bus_policy,
766                            step_idx,
767                        )
768                        .await;
769
770                        match &result {
771                            Outcome::Next(_) => return result,
772                            Outcome::Fault(_) if attempt < retry_policy.max_retries => {
773                                let delay = retry_policy.delay_for_attempt(attempt);
774                                tracing::warn!(
775                                    node_id = %timeline_node_id,
776                                    attempt = attempt + 1,
777                                    max = retry_policy.max_retries,
778                                    delay_ms = delay.as_millis() as u64,
779                                    "Transition failed, retrying"
780                                );
781                                if let Some(timeline) = bus.read_mut::<Timeline>() {
782                                    timeline.push(TimelineEvent::NodeRetry {
783                                        node_id: timeline_node_id.clone(),
784                                        attempt: attempt + 1,
785                                        max_attempts: retry_policy.max_retries,
786                                        backoff_ms: delay.as_millis() as u64,
787                                        timestamp: now_ms(),
788                                    });
789                                }
790                                tokio::time::sleep(delay).await;
791                            }
792                            _ => {
793                                last_result = Some(result);
794                                break;
795                            }
796                        }
797                    }
798
799                    last_result.unwrap_or_else(|| {
800                        Outcome::emit("execution.retry.exhausted", None)
801                    })
802                })
803            },
804        );
805        Axon {
806            schematic,
807            executor: next_executor,
808            execution_mode,
809            persistence_store,
810            audit_sink,
811            dlq_sink,
812            dlq_policy,
813            dynamic_dlq_policy,
814            saga_policy,
815            dynamic_saga_policy,
816            saga_compensation_registry,
817            iam_handle,
818        }
819    }
820
821    /// Chain a transition to this Axon with a Saga compensation step.
822    ///
823    /// If the transition fails, the compensation transition will be executed
824    /// automatically if `CompensationAutoTrigger` is enabled in the Bus.
825    #[track_caller]
826    pub fn then_compensated<Next, Trans, Comp>(
827        self,
828        transition: Trans,
829        compensation: Comp,
830    ) -> Axon<In, Next, E, Res>
831    where
832        Out: Clone,
833        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
834        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
835        Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
836    {
837        let caller = Location::caller();
838        let Axon {
839            mut schematic,
840            executor: prev_executor,
841            execution_mode,
842            persistence_store,
843            audit_sink,
844            dlq_sink,
845            dlq_policy,
846            dynamic_dlq_policy,
847            saga_policy,
848            dynamic_saga_policy,
849            saga_compensation_registry,
850            iam_handle,
851        } = self;
852
853        // 1. Add Primary Node
854        let next_node_id = uuid::Uuid::new_v4().to_string();
855        let comp_node_id = uuid::Uuid::new_v4().to_string();
856
857        let next_node = Node {
858            id: next_node_id.clone(),
859            kind: NodeKind::Atom,
860            label: transition.label(),
861            description: transition.description(),
862            input_type: type_name_of::<Out>(),
863            output_type: type_name_of::<Next>(),
864            resource_type: type_name_of::<Res>(),
865            metadata: Default::default(),
866            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
867            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
868            position: transition
869                .position()
870                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
871            compensation_node_id: Some(comp_node_id.clone()),
872            input_schema: None,
873            output_schema: None,
874        };
875
876        // 2. Add Compensation Node
877        let comp_node = Node {
878            id: comp_node_id.clone(),
879            kind: NodeKind::Atom,
880            label: format!("Compensate: {}", compensation.label()),
881            description: compensation.description(),
882            input_type: type_name_of::<Out>(),
883            output_type: "void".to_string(),
884            resource_type: type_name_of::<Res>(),
885            metadata: Default::default(),
886            bus_capability: None,
887            source_location: None,
888            position: compensation
889                .position()
890                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
891            compensation_node_id: None,
892            input_schema: None,
893            output_schema: None,
894        };
895
896        let last_node_id = schematic
897            .nodes
898            .last()
899            .map(|n| n.id.clone())
900            .unwrap_or_default();
901
902        schematic.nodes.push(next_node);
903        schematic.nodes.push(comp_node);
904        schematic.edges.push(Edge {
905            from: last_node_id,
906            to: next_node_id.clone(),
907            kind: EdgeType::Linear,
908            label: Some("Next".to_string()),
909        });
910
911        // 3. Compose Executor with Compensation Logic
912        let node_id_for_exec = next_node_id.clone();
913        let comp_id_for_exec = comp_node_id.clone();
914        let node_label_for_exec = transition.label();
915        let bus_policy_for_exec = transition.bus_access_policy();
916        let step_idx_for_exec = schematic.nodes.len() as u64 - 2;
917        let comp_for_exec = compensation.clone();
918        let bus_policy_for_executor = bus_policy_for_exec.clone();
919        let bus_policy_for_registry = bus_policy_for_exec.clone();
920        let next_executor: Executor<In, Next, E, Res> = Arc::new(
921            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
922                let prev = prev_executor.clone();
923                let trans = transition.clone();
924                let comp = comp_for_exec.clone();
925                let timeline_node_id = node_id_for_exec.clone();
926                let timeline_comp_id = comp_id_for_exec.clone();
927                let timeline_node_label = node_label_for_exec.clone();
928                let transition_bus_policy = bus_policy_for_executor.clone();
929                let step_idx = step_idx_for_exec;
930
931                Box::pin(async move {
932                    // Check for manual intervention jump
933                    if let Some(jump) = bus.read::<ManualJump>()
934                        && (jump.target_node == timeline_node_id
935                            || jump.target_node == timeline_node_label)
936                    {
937                        tracing::info!(
938                            node_id = %timeline_node_id,
939                            node_label = %timeline_node_label,
940                            "Manual jump target reached (compensated); skipping previous steps"
941                        );
942
943                        let state = if let Some(ow) = jump.payload_override.clone() {
944                            match serde_json::from_value::<Out>(ow) {
945                                Ok(s) => s,
946                                Err(e) => {
947                                    tracing::error!(
948                                        "Payload override deserialization failed: {}",
949                                        e
950                                    );
951                                    return Outcome::emit(
952                                        "execution.jump.payload_error",
953                                        Some(serde_json::json!({"error": e.to_string()})),
954                                    )
955                                    .map(|_: ()| unreachable!());
956                                }
957                            }
958                        } else {
959                            return Outcome::emit(
960                                "execution.jump.missing_payload",
961                                Some(serde_json::json!({"node_id": timeline_node_id})),
962                            )
963                            .map(|_: ()| unreachable!());
964                        };
965
966                        // Skip prev() and continue with trans.run(state, ...)
967                        return run_this_compensated_step::<Out, Next, E, Res, Comp>(
968                            &trans,
969                            &comp,
970                            state,
971                            res,
972                            bus,
973                            &timeline_node_id,
974                            &timeline_comp_id,
975                            &timeline_node_label,
976                            &transition_bus_policy,
977                            step_idx,
978                        )
979                        .await;
980                    }
981
982                    // Handle resumption skip
983                    if let Some(start) = bus.read::<StartStep>()
984                        && step_idx == start.0
985                        && bus.read::<ResumptionState>().is_some()
986                    {
987                        let fresh_state = serde_json::to_value(&input)
988                            .ok()
989                            .and_then(|v| serde_json::from_value::<Out>(v).ok());
990                        let persisted_state = bus
991                            .read::<ResumptionState>()
992                            .and_then(|r| r.payload.clone())
993                            .and_then(|p| serde_json::from_value::<Out>(p).ok());
994
995                        if let Some(s) = fresh_state.or(persisted_state) {
996                            tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint (compensated)");
997                            return run_this_compensated_step::<Out, Next, E, Res, Comp>(
998                                &trans,
999                                &comp,
1000                                s,
1001                                res,
1002                                bus,
1003                                &timeline_node_id,
1004                                &timeline_comp_id,
1005                                &timeline_node_label,
1006                                &transition_bus_policy,
1007                                step_idx,
1008                            )
1009                            .await;
1010                        }
1011
1012                        return Outcome::emit(
1013                            "execution.resumption.payload_error",
1014                            Some(serde_json::json!({"error": "no compatible resumption state"})),
1015                        )
1016                        .map(|_: ()| unreachable!());
1017                    }
1018
1019                    // Run previous step
1020                    let prev_result = prev(input, res, bus).await;
1021
1022                    // Unpack
1023                    let state = match prev_result {
1024                        Outcome::Next(t) => t,
1025                        other => return other.map(|_| unreachable!()),
1026                    };
1027
1028                    run_this_compensated_step::<Out, Next, E, Res, Comp>(
1029                        &trans,
1030                        &comp,
1031                        state,
1032                        res,
1033                        bus,
1034                        &timeline_node_id,
1035                        &timeline_comp_id,
1036                        &timeline_node_label,
1037                        &transition_bus_policy,
1038                        step_idx,
1039                    )
1040                    .await
1041                })
1042            },
1043        );
1044        // 4. Register Saga Compensation if enabled
1045        {
1046            let mut registry = saga_compensation_registry.write().expect("saga compensation registry lock poisoned");
1047            let comp_fn = compensation.clone();
1048            let transition_bus_policy = bus_policy_for_registry.clone();
1049
1050            let handler: ranvier_core::saga::SagaCompensationFn<E, Res> =
1051                Arc::new(move |input_data, res, bus| {
1052                    let comp = comp_fn.clone();
1053                    let bus_policy = transition_bus_policy.clone();
1054                    Box::pin(async move {
1055                        let input: Out = serde_json::from_slice(&input_data).expect("saga compensation input deserialization failed — type mismatch between snapshot and compensation handler");
1056                        bus.set_access_policy(comp.label(), bus_policy);
1057                        let res = comp.run(input, res, bus).await;
1058                        bus.clear_access_policy();
1059                        res
1060                    })
1061                });
1062            registry.register(next_node_id.clone(), handler);
1063        }
1064
1065        Axon {
1066            schematic,
1067            executor: next_executor,
1068            execution_mode,
1069            persistence_store,
1070            audit_sink,
1071            dlq_sink,
1072            dlq_policy,
1073            dynamic_dlq_policy,
1074            saga_policy,
1075            dynamic_saga_policy,
1076            saga_compensation_registry,
1077            iam_handle,
1078        }
1079    }
1080
1081    /// Attach a compensation transition to the previously added node.
1082    /// This establishes a Schematic-level Saga compensation mapping.
1083    #[track_caller]
1084    pub fn compensate_with<Comp>(mut self, transition: Comp) -> Self
1085    where
1086        Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1087    {
1088        // NOTE: This currently only updates the Schematic.
1089        // For runtime compensation behavior, use `then_compensated`.
1090        let caller = Location::caller();
1091        let comp_node_id = uuid::Uuid::new_v4().to_string();
1092
1093        let comp_node = Node {
1094            id: comp_node_id.clone(),
1095            kind: NodeKind::Atom,
1096            label: transition.label(),
1097            description: transition.description(),
1098            input_type: type_name_of::<Out>(),
1099            output_type: "void".to_string(),
1100            resource_type: type_name_of::<Res>(),
1101            metadata: Default::default(),
1102            bus_capability: None,
1103            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1104            position: transition
1105                .position()
1106                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1107            compensation_node_id: None,
1108            input_schema: None,
1109            output_schema: None,
1110        };
1111
1112        if let Some(last_node) = self.schematic.nodes.last_mut() {
1113            last_node.compensation_node_id = Some(comp_node_id.clone());
1114        }
1115
1116        self.schematic.nodes.push(comp_node);
1117        self
1118    }
1119
1120    /// Add a branch point
1121    #[track_caller]
1122    pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
1123        let caller = Location::caller();
1124        let branch_id_str = branch_id.into();
1125        let last_node_id = self
1126            .schematic
1127            .nodes
1128            .last()
1129            .map(|n| n.id.clone())
1130            .unwrap_or_default();
1131
1132        let branch_node = Node {
1133            id: uuid::Uuid::new_v4().to_string(),
1134            kind: NodeKind::Synapse,
1135            label: label.to_string(),
1136            description: None,
1137            input_type: type_name_of::<Out>(),
1138            output_type: type_name_of::<Out>(),
1139            resource_type: type_name_of::<Res>(),
1140            metadata: Default::default(),
1141            bus_capability: None,
1142            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1143            position: None,
1144            compensation_node_id: None,
1145            input_schema: None,
1146            output_schema: None,
1147        };
1148
1149        self.schematic.nodes.push(branch_node);
1150        self.schematic.edges.push(Edge {
1151            from: last_node_id,
1152            to: branch_id_str.clone(),
1153            kind: EdgeType::Branch(branch_id_str),
1154            label: Some("Branch".to_string()),
1155        });
1156
1157        self
1158    }
1159
1160    /// Run multiple transitions in parallel (fan-out / fan-in).
1161    ///
1162    /// Each transition receives a clone of the current step's output and runs
1163    /// concurrently via [`futures_util::future::join_all`]. The strategy controls
1164    /// how faults are handled:
1165    ///
1166    /// - [`ParallelStrategy::AllMustSucceed`]: All branches must produce `Next`.
1167    ///   If any branch returns `Fault`, the first fault is propagated.
1168    /// - [`ParallelStrategy::AnyCanFail`]: Branches that fault are ignored as
1169    ///   long as at least one succeeds. If all branches fault, the first fault
1170    ///   is returned.
1171    ///
1172    /// The **first successful `Next` value** is forwarded to the next step in
1173    /// the pipeline. A custom merge can be layered on top via a subsequent
1174    /// `.then()` step.
1175    ///
1176    /// Each parallel branch receives its own fresh [`Bus`] instance. Resources
1177    /// should be injected via the shared `Res` bundle rather than the Bus for
1178    /// parallel steps.
1179    ///
1180    /// ## Schematic
1181    ///
1182    /// The method emits a `FanOut` node, one `Atom` node per branch (connected
1183    /// via `Parallel` edges), and a `FanIn` join node.
1184    ///
1185    /// ## Example
1186    ///
1187    /// ```rust,ignore
1188    /// let axon = Axon::new("Pipeline")
1189    ///     .then(ParseInput)
1190    ///     .parallel(
1191    ///         vec![Arc::new(EnrichA), Arc::new(EnrichB)],
1192    ///         ParallelStrategy::AllMustSucceed,
1193    ///     )
1194    ///     .then(MergeResults);
1195    /// ```
1196    #[track_caller]
1197    pub fn parallel(
1198        self,
1199        transitions: Vec<Arc<dyn Transition<Out, Out, Resources = Res, Error = E> + Send + Sync>>,
1200        strategy: ParallelStrategy,
1201    ) -> Axon<In, Out, E, Res>
1202    where
1203        Out: Clone,
1204    {
1205        let caller = Location::caller();
1206        let Axon {
1207            mut schematic,
1208            executor: prev_executor,
1209            execution_mode,
1210            persistence_store,
1211            audit_sink,
1212            dlq_sink,
1213            dlq_policy,
1214            dynamic_dlq_policy,
1215            saga_policy,
1216            dynamic_saga_policy,
1217            saga_compensation_registry,
1218            iam_handle,
1219        } = self;
1220
1221        // ── Schematic: FanOut node ─────────────────────────────────
1222        let fanout_id = uuid::Uuid::new_v4().to_string();
1223        let fanin_id = uuid::Uuid::new_v4().to_string();
1224
1225        let last_node_id = schematic
1226            .nodes
1227            .last()
1228            .map(|n| n.id.clone())
1229            .unwrap_or_default();
1230
1231        let fanout_node = Node {
1232            id: fanout_id.clone(),
1233            kind: NodeKind::FanOut,
1234            label: "FanOut".to_string(),
1235            description: Some(format!(
1236                "Parallel split ({} branches, {:?})",
1237                transitions.len(),
1238                strategy
1239            )),
1240            input_type: type_name_of::<Out>(),
1241            output_type: type_name_of::<Out>(),
1242            resource_type: type_name_of::<Res>(),
1243            metadata: Default::default(),
1244            bus_capability: None,
1245            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1246            position: None,
1247            compensation_node_id: None,
1248            input_schema: None,
1249            output_schema: None,
1250        };
1251
1252        schematic.nodes.push(fanout_node);
1253        schematic.edges.push(Edge {
1254            from: last_node_id,
1255            to: fanout_id.clone(),
1256            kind: EdgeType::Linear,
1257            label: Some("Next".to_string()),
1258        });
1259
1260        // ── Schematic: one Atom node per parallel branch ───────────
1261        let mut branch_node_ids = Vec::with_capacity(transitions.len());
1262        for (i, trans) in transitions.iter().enumerate() {
1263            let branch_id = uuid::Uuid::new_v4().to_string();
1264            let branch_node = Node {
1265                id: branch_id.clone(),
1266                kind: NodeKind::Atom,
1267                label: trans.label(),
1268                description: trans.description(),
1269                input_type: type_name_of::<Out>(),
1270                output_type: type_name_of::<Out>(),
1271                resource_type: type_name_of::<Res>(),
1272                metadata: Default::default(),
1273                bus_capability: bus_capability_schema_from_policy(trans.bus_access_policy()),
1274                source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1275                position: None,
1276                compensation_node_id: None,
1277                input_schema: trans.input_schema(),
1278                output_schema: None,
1279            };
1280            schematic.nodes.push(branch_node);
1281            schematic.edges.push(Edge {
1282                from: fanout_id.clone(),
1283                to: branch_id.clone(),
1284                kind: EdgeType::Parallel,
1285                label: Some(format!("Branch {}", i)),
1286            });
1287            branch_node_ids.push(branch_id);
1288        }
1289
1290        // ── Schematic: FanIn node ──────────────────────────────────
1291        let fanin_node = Node {
1292            id: fanin_id.clone(),
1293            kind: NodeKind::FanIn,
1294            label: "FanIn".to_string(),
1295            description: Some(format!("Parallel join ({:?})", strategy)),
1296            input_type: type_name_of::<Out>(),
1297            output_type: type_name_of::<Out>(),
1298            resource_type: type_name_of::<Res>(),
1299            metadata: Default::default(),
1300            bus_capability: None,
1301            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1302            position: None,
1303            compensation_node_id: None,
1304            input_schema: None,
1305            output_schema: None,
1306        };
1307
1308        schematic.nodes.push(fanin_node);
1309        for branch_id in &branch_node_ids {
1310            schematic.edges.push(Edge {
1311                from: branch_id.clone(),
1312                to: fanin_id.clone(),
1313                kind: EdgeType::Parallel,
1314                label: Some("Join".to_string()),
1315            });
1316        }
1317
1318        // ── Executor: parallel composition ─────────────────────────
1319        let fanout_node_id = fanout_id.clone();
1320        let fanin_node_id = fanin_id.clone();
1321        let branch_ids_for_exec = branch_node_ids.clone();
1322        let step_idx = schematic.nodes.len() as u64 - 1;
1323
1324        let next_executor: Executor<In, Out, E, Res> = Arc::new(
1325            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Out, E>> {
1326                let prev = prev_executor.clone();
1327                let branches = transitions.clone();
1328                let fanout_id = fanout_node_id.clone();
1329                let fanin_id = fanin_node_id.clone();
1330                let branch_ids = branch_ids_for_exec.clone();
1331
1332                Box::pin(async move {
1333                    // Run previous steps
1334                    let prev_result = prev(input, res, bus).await;
1335                    let state = match prev_result {
1336                        Outcome::Next(t) => t,
1337                        other => return other.map(|_| unreachable!()),
1338                    };
1339
1340                    // Timeline: FanOut enter
1341                    let fanout_enter_ts = now_ms();
1342                    if let Some(timeline) = bus.read_mut::<Timeline>() {
1343                        timeline.push(TimelineEvent::NodeEnter {
1344                            node_id: fanout_id.clone(),
1345                            node_label: "FanOut".to_string(),
1346                            timestamp: fanout_enter_ts,
1347                        });
1348                    }
1349
1350                    // Build futures for all branches.
1351                    // Each branch gets its own Bus so they can run concurrently
1352                    // without &mut Bus aliasing. Resources are shared via &Res.
1353                    let futs: Vec<_> = branches
1354                        .iter()
1355                        .enumerate()
1356                        .map(|(i, trans)| {
1357                            let branch_state = state.clone();
1358                            let branch_node_id = branch_ids[i].clone();
1359                            let trans = trans.clone();
1360
1361                            async move {
1362                                let mut branch_bus = Bus::new();
1363                                let label = trans.label();
1364                                let bus_policy = trans.bus_access_policy();
1365
1366                                branch_bus.set_access_policy(label.clone(), bus_policy);
1367                                let result = trans.run(branch_state, res, &mut branch_bus).await;
1368                                branch_bus.clear_access_policy();
1369
1370                                (i, branch_node_id, label, result)
1371                            }
1372                        })
1373                        .collect();
1374
1375                    // Run all branches concurrently within the current task
1376                    let results: Vec<(usize, String, String, Outcome<Out, E>)> =
1377                        futures_util::future::join_all(futs).await;
1378
1379                    // Timeline: record each branch's enter/exit
1380                    for (_, branch_node_id, branch_label, outcome) in &results {
1381                        if let Some(timeline) = bus.read_mut::<Timeline>() {
1382                            let ts = now_ms();
1383                            timeline.push(TimelineEvent::NodeEnter {
1384                                node_id: branch_node_id.clone(),
1385                                node_label: branch_label.clone(),
1386                                timestamp: ts,
1387                            });
1388                            timeline.push(TimelineEvent::NodeExit {
1389                                node_id: branch_node_id.clone(),
1390                                outcome_type: outcome_type_name(outcome),
1391                                duration_ms: 0,
1392                                timestamp: ts,
1393                            });
1394                        }
1395                    }
1396
1397                    // Timeline: FanOut exit
1398                    if let Some(timeline) = bus.read_mut::<Timeline>() {
1399                        timeline.push(TimelineEvent::NodeExit {
1400                            node_id: fanout_id.clone(),
1401                            outcome_type: "Next".to_string(),
1402                            duration_ms: 0,
1403                            timestamp: now_ms(),
1404                        });
1405                    }
1406
1407                    // ── Apply strategy ─────────────────────────────
1408                    let combined = match strategy {
1409                        ParallelStrategy::AllMustSucceed => {
1410                            let mut first_fault = None;
1411                            let mut first_success = None;
1412
1413                            for (_, _, _, outcome) in results {
1414                                match outcome {
1415                                    Outcome::Fault(e) => {
1416                                        if first_fault.is_none() {
1417                                            first_fault = Some(Outcome::Fault(e));
1418                                        }
1419                                    }
1420                                    Outcome::Next(val) => {
1421                                        if first_success.is_none() {
1422                                            first_success = Some(Outcome::Next(val));
1423                                        }
1424                                    }
1425                                    other => {
1426                                        // Non-Next/non-Fault outcomes (Branch, Emit, Jump)
1427                                        // treated as non-success in AllMustSucceed
1428                                        if first_fault.is_none() {
1429                                            first_fault = Some(other);
1430                                        }
1431                                    }
1432                                }
1433                            }
1434
1435                            if let Some(fault) = first_fault {
1436                                fault
1437                            } else {
1438                                first_success.unwrap_or_else(|| {
1439                                    Outcome::emit("execution.parallel.no_results", None)
1440                                })
1441                            }
1442                        }
1443                        ParallelStrategy::AnyCanFail => {
1444                            let mut first_success = None;
1445                            let mut first_fault = None;
1446
1447                            for (_, _, _, outcome) in results {
1448                                match outcome {
1449                                    Outcome::Next(val) => {
1450                                        if first_success.is_none() {
1451                                            first_success = Some(Outcome::Next(val));
1452                                        }
1453                                    }
1454                                    Outcome::Fault(e) => {
1455                                        if first_fault.is_none() {
1456                                            first_fault = Some(Outcome::Fault(e));
1457                                        }
1458                                    }
1459                                    _ => {}
1460                                }
1461                            }
1462
1463                            first_success.unwrap_or_else(|| {
1464                                first_fault.unwrap_or_else(|| {
1465                                    Outcome::emit("execution.parallel.no_results", None)
1466                                })
1467                            })
1468                        }
1469                    };
1470
1471                    // Timeline: FanIn
1472                    let fanin_enter_ts = now_ms();
1473                    if let Some(timeline) = bus.read_mut::<Timeline>() {
1474                        timeline.push(TimelineEvent::NodeEnter {
1475                            node_id: fanin_id.clone(),
1476                            node_label: "FanIn".to_string(),
1477                            timestamp: fanin_enter_ts,
1478                        });
1479                        timeline.push(TimelineEvent::NodeExit {
1480                            node_id: fanin_id.clone(),
1481                            outcome_type: outcome_type_name(&combined),
1482                            duration_ms: 0,
1483                            timestamp: fanin_enter_ts,
1484                        });
1485                    }
1486
1487                    // Persistence
1488                    if let Some(handle) = bus.read::<PersistenceHandle>() {
1489                        let trace_id = persistence_trace_id(bus);
1490                        let circuit = bus
1491                            .read::<ranvier_core::schematic::Schematic>()
1492                            .map(|s| s.name.clone())
1493                            .unwrap_or_default();
1494                        let version = bus
1495                            .read::<ranvier_core::schematic::Schematic>()
1496                            .map(|s| s.schema_version.clone())
1497                            .unwrap_or_default();
1498
1499                        persist_execution_event(
1500                            handle,
1501                            &trace_id,
1502                            &circuit,
1503                            &version,
1504                            step_idx,
1505                            Some(fanin_id.clone()),
1506                            outcome_kind_name(&combined),
1507                            Some(combined.to_json_value()),
1508                        )
1509                        .await;
1510                    }
1511
1512                    combined
1513                })
1514            },
1515        );
1516
1517        Axon {
1518            schematic,
1519            executor: next_executor,
1520            execution_mode,
1521            persistence_store,
1522            audit_sink,
1523            dlq_sink,
1524            dlq_policy,
1525            dynamic_dlq_policy,
1526            saga_policy,
1527            dynamic_saga_policy,
1528            saga_compensation_registry,
1529            iam_handle,
1530        }
1531    }
1532
1533    /// Execute the Axon with the given input and resources.
1534    pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
1535        if let ExecutionMode::Singleton {
1536            lock_key,
1537            ttl_ms,
1538            lock_provider,
1539        } = &self.execution_mode
1540        {
1541            let trace_span = tracing::info_span!("Singleton Execution", key = %lock_key);
1542            let _enter = trace_span.enter();
1543            match lock_provider.try_acquire(lock_key, *ttl_ms).await {
1544                Ok(true) => {
1545                    tracing::debug!("Successfully acquired singleton lock: {}", lock_key);
1546                }
1547                Ok(false) => {
1548                    tracing::debug!(
1549                        "Singleton lock {} already held, aborting execution.",
1550                        lock_key
1551                    );
1552                    // Emit a specific event indicating skip
1553                    return Outcome::emit(
1554                        "execution.skipped.lock_held",
1555                        Some(serde_json::json!({
1556                            "lock_key": lock_key
1557                        })),
1558                    );
1559                }
1560                Err(e) => {
1561                    tracing::error!("Failed to check singleton lock {}: {:?}", lock_key, e);
1562                    return Outcome::emit(
1563                        "execution.skipped.lock_error",
1564                        Some(serde_json::json!({
1565                            "error": e.to_string()
1566                        })),
1567                    );
1568                }
1569            }
1570        }
1571
1572        // ── IAM Boundary Check ────────────────────────────────────
1573        if let Some(iam) = &self.iam_handle {
1574            use ranvier_core::iam::{IamPolicy, IamToken, enforce_policy};
1575
1576            if matches!(iam.policy, IamPolicy::None) {
1577                // No verification required — skip
1578            } else {
1579                let token = bus.read::<IamToken>().map(|t| t.0.clone());
1580
1581                match token {
1582                    Some(raw_token) => {
1583                        match iam.verifier.verify(&raw_token).await {
1584                            Ok(identity) => {
1585                                if let Err(e) = enforce_policy(&iam.policy, &identity) {
1586                                    tracing::warn!(
1587                                        policy = ?iam.policy,
1588                                        subject = %identity.subject,
1589                                        "IAM policy enforcement failed: {}",
1590                                        e
1591                                    );
1592                                    return Outcome::emit(
1593                                        "iam.policy_denied",
1594                                        Some(serde_json::json!({
1595                                            "error": e.to_string(),
1596                                            "subject": identity.subject,
1597                                        })),
1598                                    );
1599                                }
1600                                // Insert verified identity into Bus for downstream access
1601                                bus.insert(identity);
1602                            }
1603                            Err(e) => {
1604                                tracing::warn!("IAM token verification failed: {}", e);
1605                                return Outcome::emit(
1606                                    "iam.verification_failed",
1607                                    Some(serde_json::json!({
1608                                        "error": e.to_string()
1609                                    })),
1610                                );
1611                            }
1612                        }
1613                    }
1614                    None => {
1615                        tracing::warn!("IAM policy requires token but none found in Bus");
1616                        return Outcome::emit("iam.missing_token", None);
1617                    }
1618                }
1619            }
1620        }
1621
1622        let trace_id = persistence_trace_id(bus);
1623        let label = self.schematic.name.clone();
1624
1625        // Inject DLQ config into Bus for step-level reporting
1626        if let Some(sink) = &self.dlq_sink {
1627            bus.insert(sink.clone());
1628        }
1629        // Prefer dynamic (hot-reloadable) policy if available, otherwise use static
1630        let effective_dlq_policy = self
1631            .dynamic_dlq_policy
1632            .as_ref()
1633            .map(|d| d.current())
1634            .unwrap_or_else(|| self.dlq_policy.clone());
1635        bus.insert(effective_dlq_policy);
1636        bus.insert(self.schematic.clone());
1637        let effective_saga_policy = self
1638            .dynamic_saga_policy
1639            .as_ref()
1640            .map(|d| d.current())
1641            .unwrap_or_else(|| self.saga_policy.clone());
1642        bus.insert(effective_saga_policy.clone());
1643
1644        // Initialize Saga stack if enabled and not already present (e.g. from resumption)
1645        if effective_saga_policy == SagaPolicy::Enabled && bus.read::<SagaStack>().is_none() {
1646            bus.insert(SagaStack::new());
1647        }
1648
1649        let persistence_handle = bus.read::<PersistenceHandle>().cloned();
1650        let compensation_handle = bus.read::<CompensationHandle>().cloned();
1651        let compensation_retry_policy = compensation_retry_policy(bus);
1652        let compensation_idempotency = bus.read::<CompensationIdempotencyHandle>().cloned();
1653        let version = self.schematic.schema_version.clone();
1654        let migration_registry = bus
1655            .read::<ranvier_core::schematic::MigrationRegistry>()
1656            .cloned();
1657
1658        let persistence_start_step = if let Some(handle) = persistence_handle.as_ref() {
1659            let (mut start_step, trace_version, intervention, last_node_id, mut last_payload) =
1660                load_persistence_version(handle, &trace_id).await;
1661
1662            if let Some(interv) = intervention {
1663                tracing::info!(
1664                    trace_id = %trace_id,
1665                    target_node = %interv.target_node,
1666                    "Applying manual intervention command"
1667                );
1668
1669                // Find the step index for the target node
1670                if let Some(target_idx) = self
1671                    .schematic
1672                    .nodes
1673                    .iter()
1674                    .position(|n| n.id == interv.target_node || n.label == interv.target_node)
1675                {
1676                    tracing::info!(
1677                        trace_id = %trace_id,
1678                        target_node = %interv.target_node,
1679                        target_step = target_idx,
1680                        "Intervention: Jumping to target node"
1681                    );
1682                    start_step = target_idx as u64;
1683
1684                    // Inject ManualJump into the bus so executors can handle skipping/payload overrides
1685                    bus.insert(ManualJump {
1686                        target_node: interv.target_node.clone(),
1687                        payload_override: interv.payload_override.clone(),
1688                    });
1689
1690                    // Log audit event for intervention application
1691                    if let Some(sink) = self.audit_sink.as_ref() {
1692                        let event = AuditEvent::new(
1693                            uuid::Uuid::new_v4().to_string(),
1694                            "System".to_string(),
1695                            "ApplyIntervention".to_string(),
1696                            trace_id.to_string(),
1697                        )
1698                        .with_metadata("target_node", interv.target_node.clone())
1699                        .with_metadata("target_step", target_idx);
1700
1701                        let _ = sink.append(&event).await;
1702                    }
1703                } else {
1704                    tracing::warn!(
1705                        trace_id = %trace_id,
1706                        target_node = %interv.target_node,
1707                        "Intervention target node not found in schematic; ignoring jump"
1708                    );
1709                }
1710            }
1711
1712            if let Some(old_version) = trace_version
1713                && old_version != version
1714            {
1715                tracing::info!(
1716                    trace_id = %trace_id,
1717                    old_version = %old_version,
1718                    current_version = %version,
1719                    "Version mismatch detected during resumption"
1720                );
1721
1722                // Try multi-hop migration path first, fall back to direct lookup
1723                let migration_path = migration_registry
1724                    .as_ref()
1725                    .and_then(|r| r.find_migration_path(&old_version, &version));
1726
1727                let (final_migration, mapped_payload) = if let Some(path) = migration_path {
1728                    if path.is_empty() {
1729                        (None, last_payload.clone())
1730                    } else {
1731                        // Apply payload mappers along the migration chain
1732                        let mut payload = last_payload.clone();
1733                        for hop in &path {
1734                            if let (Some(mapper), Some(p)) = (&hop.payload_mapper, payload.as_ref())
1735                            {
1736                                match mapper.map_state(p) {
1737                                    Ok(mapped) => payload = Some(mapped),
1738                                    Err(e) => {
1739                                        tracing::error!(
1740                                            trace_id = %trace_id,
1741                                            from = %hop.from_version,
1742                                            to = %hop.to_version,
1743                                            error = %e,
1744                                            "Payload migration mapper failed"
1745                                        );
1746                                        return Outcome::emit(
1747                                            "execution.resumption.payload_migration_failed",
1748                                            Some(serde_json::json!({
1749                                                "trace_id": trace_id,
1750                                                "from": hop.from_version,
1751                                                "to": hop.to_version,
1752                                                "error": e.to_string()
1753                                            })),
1754                                        );
1755                                    }
1756                                }
1757                            }
1758                        }
1759                        let hops: Vec<String> = path
1760                            .iter()
1761                            .map(|h| format!("{}->{}", h.from_version, h.to_version))
1762                            .collect();
1763                        tracing::info!(trace_id = %trace_id, hops = ?hops, "Applied multi-hop migration path");
1764                        (path.last().copied(), payload)
1765                    }
1766                } else {
1767                    (None, last_payload.clone())
1768                };
1769
1770                // Use the final migration in the path to determine strategy
1771                let migration = final_migration.or_else(|| {
1772                    migration_registry
1773                        .as_ref()
1774                        .and_then(|r| r.find_migration(&old_version, &version))
1775                });
1776
1777                // Update last_payload with mapped version
1778                if mapped_payload.is_some() {
1779                    last_payload = mapped_payload;
1780                }
1781
1782                let strategy = if let (Some(m), Some(node_id)) = (migration, last_node_id.as_ref())
1783                {
1784                    m.node_mapping
1785                        .get(node_id)
1786                        .cloned()
1787                        .unwrap_or(m.default_strategy.clone())
1788                } else {
1789                    migration
1790                        .map(|m| m.default_strategy.clone())
1791                        .unwrap_or(ranvier_core::schematic::MigrationStrategy::Fail)
1792                };
1793
1794                match strategy {
1795                    ranvier_core::schematic::MigrationStrategy::ResumeFromStart => {
1796                        tracing::info!(trace_id = %trace_id, "Applying ResumeFromStart migration strategy");
1797                        start_step = 0;
1798                    }
1799                    ranvier_core::schematic::MigrationStrategy::MigrateActiveNode {
1800                        new_node_id,
1801                        ..
1802                    } => {
1803                        tracing::info!(trace_id = %trace_id, to_node = %new_node_id, "Applying MigrateActiveNode strategy");
1804                        if let Some(target_idx) = self
1805                            .schematic
1806                            .nodes
1807                            .iter()
1808                            .position(|n| n.id == new_node_id || n.label == new_node_id)
1809                        {
1810                            start_step = target_idx as u64;
1811                        } else {
1812                            tracing::warn!(trace_id = %trace_id, "MigrateActiveNode: target node {} not found", new_node_id);
1813                            return Outcome::emit(
1814                                "execution.resumption.migration_target_not_found",
1815                                Some(serde_json::json!({ "node_id": new_node_id })),
1816                            );
1817                        }
1818                    }
1819                    ranvier_core::schematic::MigrationStrategy::FallbackToNode(node_id) => {
1820                        tracing::info!(trace_id = %trace_id, to_node = %node_id, "Applying FallbackToNode strategy");
1821                        if let Some(target_idx) = self
1822                            .schematic
1823                            .nodes
1824                            .iter()
1825                            .position(|n| n.id == node_id || n.label == node_id)
1826                        {
1827                            start_step = target_idx as u64;
1828                        } else {
1829                            tracing::warn!(trace_id = %trace_id, "FallbackToNode: node {} not found", node_id);
1830                            return Outcome::emit(
1831                                "execution.resumption.migration_target_not_found",
1832                                Some(serde_json::json!({ "node_id": node_id })),
1833                            );
1834                        }
1835                    }
1836                    ranvier_core::schematic::MigrationStrategy::Fail => {
1837                        tracing::error!(trace_id = %trace_id, "Version mismatch: no migration path found. Failing resumption.");
1838                        return Outcome::emit(
1839                            "execution.resumption.version_mismatch_failed",
1840                            Some(serde_json::json!({
1841                                "trace_id": trace_id,
1842                                "old_version": old_version,
1843                                "current_version": version
1844                            })),
1845                        );
1846                    }
1847                    _ => {
1848                        tracing::error!(trace_id = %trace_id, "Unsupported migration strategy: {:?}", strategy);
1849                        return Outcome::emit(
1850                            "execution.resumption.unsupported_migration",
1851                            Some(serde_json::json!({
1852                                "trace_id": trace_id,
1853                                "strategy": format!("{:?}", strategy)
1854                            })),
1855                        );
1856                    }
1857                }
1858            }
1859
1860            let ingress_node_id = self.schematic.nodes.first().map(|n| n.id.clone());
1861            persist_execution_event(
1862                handle,
1863                &trace_id,
1864                &label,
1865                &version,
1866                start_step,
1867                ingress_node_id,
1868                "Enter",
1869                None,
1870            )
1871            .await;
1872
1873            bus.insert(StartStep(start_step));
1874            if start_step > 0 {
1875                bus.insert(ResumptionState {
1876                    payload: last_payload,
1877                });
1878            }
1879
1880            Some(start_step)
1881        } else {
1882            None
1883        };
1884
1885        let should_capture = should_attach_timeline(bus);
1886        let inserted_timeline = if should_capture {
1887            ensure_timeline(bus)
1888        } else {
1889            false
1890        };
1891        let ingress_started = std::time::Instant::now();
1892        let ingress_enter_ts = now_ms();
1893        if should_capture
1894            && let (Some(timeline), Some(ingress)) =
1895                (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1896        {
1897            timeline.push(TimelineEvent::NodeEnter {
1898                node_id: ingress.id.clone(),
1899                node_label: ingress.label.clone(),
1900                timestamp: ingress_enter_ts,
1901            });
1902        }
1903
1904        let circuit_span = tracing::info_span!(
1905            "Circuit",
1906            ranvier.circuit = %label,
1907            ranvier.outcome_kind = tracing::field::Empty,
1908            ranvier.outcome_target = tracing::field::Empty
1909        );
1910        let outcome = (self.executor)(input, resources, bus)
1911            .instrument(circuit_span.clone())
1912            .await;
1913        circuit_span.record("ranvier.outcome_kind", outcome_kind_name(&outcome));
1914        if let Some(target) = outcome_target(&outcome) {
1915            circuit_span.record("ranvier.outcome_target", tracing::field::display(&target));
1916        }
1917
1918        // Automated Saga Rollback (LIFO)
1919        if matches!(outcome, Outcome::Fault(_)) && self.saga_policy == SagaPolicy::Enabled {
1920            while let Some(task) = {
1921                let mut stack = bus.read_mut::<SagaStack>();
1922                stack.as_mut().and_then(|s| s.pop())
1923            } {
1924                tracing::info!(trace_id = %trace_id, node_id = %task.node_id, "Compensating step: {}", task.node_label);
1925
1926                let handler = {
1927                    let registry = self.saga_compensation_registry.read().expect("saga compensation registry lock poisoned");
1928                    registry.get(&task.node_id)
1929                };
1930                if let Some(handler) = handler {
1931                    let res = handler(task.input_snapshot, resources, bus).await;
1932                    if let Outcome::Fault(e) = res {
1933                        tracing::error!(trace_id = %trace_id, node_id = %task.node_id, "Saga compensation FAILED: {:?}", e);
1934                    }
1935                } else {
1936                    tracing::warn!(trace_id = %trace_id, node_id = %task.node_id, "No compensation handler found in registry for saga rollback");
1937                }
1938            }
1939            tracing::info!(trace_id = %trace_id, "Saga automated rollback completed");
1940        }
1941
1942        let ingress_exit_ts = now_ms();
1943        if should_capture
1944            && let (Some(timeline), Some(ingress)) =
1945                (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1946        {
1947            timeline.push(TimelineEvent::NodeExit {
1948                node_id: ingress.id.clone(),
1949                outcome_type: outcome_type_name(&outcome),
1950                duration_ms: ingress_started.elapsed().as_millis() as u64,
1951                timestamp: ingress_exit_ts,
1952            });
1953        }
1954
1955        if let Some(handle) = persistence_handle.as_ref() {
1956            let fault_step = persistence_start_step.map(|s| s + 1).unwrap_or(1);
1957            persist_execution_event(
1958                handle,
1959                &trace_id,
1960                &label,
1961                &version,
1962                fault_step,
1963                None, // Outcome-level events might not have a single node_id context here
1964                outcome_kind_name(&outcome),
1965                Some(outcome.to_json_value()),
1966            )
1967            .await;
1968
1969            let mut completion = completion_from_outcome(&outcome);
1970            if matches!(outcome, Outcome::Fault(_))
1971                && let Some(compensation) = compensation_handle.as_ref()
1972                && compensation_auto_trigger(bus)
1973            {
1974                let context = CompensationContext {
1975                    trace_id: trace_id.clone(),
1976                    circuit: label.clone(),
1977                    fault_kind: outcome_kind_name(&outcome).to_string(),
1978                    fault_step,
1979                    timestamp_ms: now_ms(),
1980                };
1981
1982                if run_compensation(
1983                    compensation,
1984                    context,
1985                    compensation_retry_policy,
1986                    compensation_idempotency.clone(),
1987                )
1988                .await
1989                {
1990                    persist_execution_event(
1991                        handle,
1992                        &trace_id,
1993                        &label,
1994                        &version,
1995                        fault_step.saturating_add(1),
1996                        None,
1997                        "Compensated",
1998                        None,
1999                    )
2000                    .await;
2001                    completion = CompletionState::Compensated;
2002                }
2003            }
2004
2005            if persistence_auto_complete(bus) {
2006                persist_completion(handle, &trace_id, completion).await;
2007            }
2008        }
2009
2010        if should_capture {
2011            maybe_export_timeline(bus, &outcome);
2012        }
2013        if inserted_timeline {
2014            let _ = bus.remove::<Timeline>();
2015        }
2016
2017        outcome
2018    }
2019
2020    /// Starts the Ranvier Inspector for this Axon on the specified port.
2021    /// This spawns a background task to serve the Schematic.
2022    pub fn serve_inspector(self, port: u16) -> Self {
2023        if !inspector_dev_mode_from_env() {
2024            tracing::info!("Inspector disabled because RANVIER_MODE is production");
2025            return self;
2026        }
2027        if !inspector_enabled_from_env() {
2028            tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
2029            return self;
2030        }
2031
2032        let schematic = self.schematic.clone();
2033        let axon_inspector = Arc::new(self.clone());
2034        tokio::spawn(async move {
2035            if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
2036                .with_projection_files_from_env()
2037                .with_mode_from_env()
2038                .with_auth_policy_from_env()
2039                .with_state_inspector(axon_inspector)
2040                .serve()
2041                .await
2042            {
2043                tracing::error!("Inspector server failed: {}", e);
2044            }
2045        });
2046        self
2047    }
2048
2049    /// Get a reference to the Schematic (structural view).
2050    pub fn schematic(&self) -> &Schematic {
2051        &self.schematic
2052    }
2053
2054    /// Consume and return the Schematic.
2055    pub fn into_schematic(self) -> Schematic {
2056        self.schematic
2057    }
2058
2059    /// Detect schematic export mode from runtime flags.
2060    ///
2061    /// Supported triggers:
2062    /// - `RANVIER_SCHEMATIC=1|true|on|yes`
2063    /// - `--schematic`
2064    ///
2065    /// Optional output path:
2066    /// - `RANVIER_SCHEMATIC_OUTPUT=<path>`
2067    /// - `--schematic-output <path>` / `--schematic-output=<path>`
2068    /// - `--output <path>` / `--output=<path>` (only relevant in schematic mode)
2069    pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
2070        schematic_export_request_from_process()
2071    }
2072
2073    /// Export schematic and return `true` when schematic mode is active.
2074    ///
2075    /// Use this once after circuit construction and before server/custom loops:
2076    ///
2077    /// ```rust,ignore
2078    /// let axon = build_axon();
2079    /// if axon.maybe_export_and_exit()? {
2080    ///     return Ok(());
2081    /// }
2082    /// // Normal runtime path...
2083    /// ```
2084    pub fn maybe_export_and_exit(&self) -> anyhow::Result<bool> {
2085        self.maybe_export_and_exit_with(|_| ())
2086    }
2087
2088    /// Same as [`Self::maybe_export_and_exit`] but allows a custom hook right before export/exit.
2089    ///
2090    /// This is useful when your app has custom loop/bootstrap behavior and you want
2091    /// to skip or cleanup that logic in schematic mode.
2092    pub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> anyhow::Result<bool>
2093    where
2094        F: FnOnce(&SchematicExportRequest),
2095    {
2096        let Some(request) = self.schematic_export_request() else {
2097            return Ok(false);
2098        };
2099        on_before_exit(&request);
2100        self.export_schematic(&request)?;
2101        Ok(true)
2102    }
2103
2104    /// Export schematic according to the provided request.
2105    pub fn export_schematic(&self, request: &SchematicExportRequest) -> anyhow::Result<()> {
2106        let json = serde_json::to_string_pretty(self.schematic())?;
2107        if let Some(path) = &request.output {
2108            if let Some(parent) = path.parent()
2109                && !parent.as_os_str().is_empty()
2110            {
2111                fs::create_dir_all(parent)?;
2112            }
2113            fs::write(path, json.as_bytes())?;
2114            return Ok(());
2115        }
2116        println!("{}", json);
2117        Ok(())
2118    }
2119}
2120
2121fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
2122    let args: Vec<OsString> = std::env::args_os().skip(1).collect();
2123    let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
2124    let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
2125
2126    let mut i = 0;
2127    while i < args.len() {
2128        let arg = args[i].to_string_lossy();
2129
2130        if arg == "--schematic" {
2131            enabled = true;
2132            i += 1;
2133            continue;
2134        }
2135
2136        if arg == "--schematic-output" || arg == "--output" {
2137            if let Some(next) = args.get(i + 1) {
2138                output = Some(PathBuf::from(next));
2139                i += 2;
2140                continue;
2141            }
2142        } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
2143            output = Some(PathBuf::from(value));
2144        } else if let Some(value) = arg.strip_prefix("--output=") {
2145            output = Some(PathBuf::from(value));
2146        }
2147
2148        i += 1;
2149    }
2150
2151    if enabled {
2152        Some(SchematicExportRequest { output })
2153    } else {
2154        None
2155    }
2156}
2157
2158fn env_flag_is_true(key: &str) -> bool {
2159    match std::env::var(key) {
2160        Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
2161        Err(_) => false,
2162    }
2163}
2164
2165fn inspector_enabled_from_env() -> bool {
2166    let raw = std::env::var("RANVIER_INSPECTOR").ok();
2167    inspector_enabled_from_value(raw.as_deref())
2168}
2169
2170fn inspector_enabled_from_value(value: Option<&str>) -> bool {
2171    match value {
2172        Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
2173        None => true,
2174    }
2175}
2176
2177fn inspector_dev_mode_from_env() -> bool {
2178    let raw = std::env::var("RANVIER_MODE").ok();
2179    inspector_dev_mode_from_value(raw.as_deref())
2180}
2181
2182fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
2183    !matches!(
2184        value.map(|v| v.to_ascii_lowercase()),
2185        Some(mode) if mode == "prod" || mode == "production"
2186    )
2187}
2188
2189fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
2190    let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
2191        Ok(v) if !v.trim().is_empty() => v,
2192        _ => return,
2193    };
2194
2195    let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
2196    let policy = timeline_adaptive_policy();
2197    let forced = should_force_export(outcome, &policy);
2198    let should_export = sampled || forced;
2199    if !should_export {
2200        record_sampling_stats(false, sampled, forced, "none", &policy);
2201        return;
2202    }
2203
2204    let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
2205    timeline.sort();
2206
2207    let mode = std::env::var("RANVIER_TIMELINE_MODE")
2208        .unwrap_or_else(|_| "overwrite".to_string())
2209        .to_ascii_lowercase();
2210
2211    if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
2212        tracing::warn!(
2213            "Failed to persist timeline file {} (mode={}): {}",
2214            path,
2215            mode,
2216            err
2217        );
2218        record_sampling_stats(false, sampled, forced, &mode, &policy);
2219    } else {
2220        record_sampling_stats(true, sampled, forced, &mode, &policy);
2221    }
2222}
2223
2224fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
2225    match outcome {
2226        Outcome::Next(_) => "Next".to_string(),
2227        Outcome::Branch(id, _) => format!("Branch:{}", id),
2228        Outcome::Jump(id, _) => format!("Jump:{}", id),
2229        Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
2230        Outcome::Fault(_) => "Fault".to_string(),
2231    }
2232}
2233
2234fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
2235    match outcome {
2236        Outcome::Next(_) => "Next",
2237        Outcome::Branch(_, _) => "Branch",
2238        Outcome::Jump(_, _) => "Jump",
2239        Outcome::Emit(_, _) => "Emit",
2240        Outcome::Fault(_) => "Fault",
2241    }
2242}
2243
2244fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
2245    match outcome {
2246        Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
2247        Outcome::Jump(node_id, _) => Some(node_id.to_string()),
2248        Outcome::Emit(event_type, _) => Some(event_type.clone()),
2249        Outcome::Next(_) | Outcome::Fault(_) => None,
2250    }
2251}
2252
2253fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
2254    match outcome {
2255        Outcome::Fault(_) => CompletionState::Fault,
2256        _ => CompletionState::Success,
2257    }
2258}
2259
2260fn persistence_trace_id(bus: &Bus) -> String {
2261    if let Some(explicit) = bus.read::<PersistenceTraceId>() {
2262        explicit.0.clone()
2263    } else {
2264        format!("{}:{}", bus.id, now_ms())
2265    }
2266}
2267
2268fn persistence_auto_complete(bus: &Bus) -> bool {
2269    bus.read::<PersistenceAutoComplete>()
2270        .map(|v| v.0)
2271        .unwrap_or(true)
2272}
2273
2274fn compensation_auto_trigger(bus: &Bus) -> bool {
2275    bus.read::<CompensationAutoTrigger>()
2276        .map(|v| v.0)
2277        .unwrap_or(true)
2278}
2279
2280fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
2281    bus.read::<CompensationRetryPolicy>()
2282        .copied()
2283        .unwrap_or_default()
2284}
2285
2286/// Unwrap the Outcome enum layer from a persisted event payload.
2287///
2288/// Events are stored with `outcome.to_json_value()` which serializes the full
2289/// Outcome enum, e.g. `{"Next": {"order_id": "1001", ...}}`. The resumption
2290/// handler expects the raw inner value, so we extract it here.
2291fn unwrap_outcome_payload(payload: Option<&serde_json::Value>) -> Option<serde_json::Value> {
2292    payload.map(|p| {
2293        p.get("Next")
2294            .or_else(|| p.get("Branch"))
2295            .or_else(|| p.get("Jump"))
2296            .cloned()
2297            .unwrap_or_else(|| p.clone())
2298    })
2299}
2300
2301async fn load_persistence_version(
2302    handle: &PersistenceHandle,
2303    trace_id: &str,
2304) -> (
2305    u64,
2306    Option<String>,
2307    Option<crate::persistence::Intervention>,
2308    Option<String>,
2309    Option<serde_json::Value>,
2310) {
2311    let store = handle.store();
2312    match store.load(trace_id).await {
2313        Ok(Some(trace)) => {
2314            let (next_step, last_node_id, last_payload) =
2315                if let Some(resume_from_step) = trace.resumed_from_step {
2316                    let anchor_event = trace
2317                        .events
2318                        .iter()
2319                        .rev()
2320                        .find(|event| {
2321                            event.step <= resume_from_step
2322                                && event.outcome_kind == "Next"
2323                                && event.payload.is_some()
2324                        })
2325                        .or_else(|| {
2326                            trace.events.iter().rev().find(|event| {
2327                                event.step <= resume_from_step
2328                                    && event.outcome_kind != "Fault"
2329                                    && event.payload.is_some()
2330                            })
2331                        })
2332                        .or_else(|| {
2333                            trace.events.iter().rev().find(|event| {
2334                                event.step <= resume_from_step && event.payload.is_some()
2335                            })
2336                        })
2337                        .or_else(|| trace.events.last());
2338
2339                    (
2340                        resume_from_step.saturating_add(1),
2341                        anchor_event.and_then(|event| event.node_id.clone()),
2342                        anchor_event.and_then(|event| {
2343                            unwrap_outcome_payload(event.payload.as_ref())
2344                        }),
2345                    )
2346                } else {
2347                    let last_event = trace.events.last();
2348                    (
2349                        last_event
2350                            .map(|event| event.step.saturating_add(1))
2351                            .unwrap_or(0),
2352                        last_event.and_then(|event| event.node_id.clone()),
2353                        last_event.and_then(|event| {
2354                            unwrap_outcome_payload(event.payload.as_ref())
2355                        }),
2356                    )
2357                };
2358
2359            (
2360                next_step,
2361                Some(trace.schematic_version),
2362                trace.interventions.last().cloned(),
2363                last_node_id,
2364                last_payload,
2365            )
2366        }
2367        Ok(None) => (0, None, None, None, None),
2368        Err(err) => {
2369            tracing::warn!(
2370                trace_id = %trace_id,
2371                error = %err,
2372                "Failed to load persistence trace; falling back to step=0"
2373            );
2374            (0, None, None, None, None)
2375        }
2376    }
2377}
2378
2379#[allow(clippy::too_many_arguments)]
2380async fn run_this_step<In, Out, E, Res>(
2381    trans: &(impl Transition<In, Out, Resources = Res, Error = E> + Clone + 'static),
2382    state: In,
2383    res: &Res,
2384    bus: &mut Bus,
2385    node_id: &str,
2386    node_label: &str,
2387    bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2388    step_idx: u64,
2389) -> Outcome<Out, E>
2390where
2391    In: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2392    Out: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2393    E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2394    Res: ranvier_core::transition::ResourceRequirement,
2395{
2396    let label = trans.label();
2397    let res_type = std::any::type_name::<Res>()
2398        .split("::")
2399        .last()
2400        .unwrap_or("unknown");
2401
2402    // Debug pausing
2403    let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2404        debug.should_pause(node_id)
2405    } else {
2406        false
2407    };
2408
2409    if should_pause {
2410        let trace_id = persistence_trace_id(bus);
2411        tracing::event!(
2412            target: "ranvier.debugger",
2413            tracing::Level::INFO,
2414            trace_id = %trace_id,
2415            node_id = %node_id,
2416            "Node paused"
2417        );
2418
2419        if let Some(timeline) = bus.read_mut::<Timeline>() {
2420            timeline.push(TimelineEvent::NodePaused {
2421                node_id: node_id.to_string(),
2422                timestamp: now_ms(),
2423            });
2424        }
2425        if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2426            debug.wait().await;
2427        }
2428    }
2429
2430    let enter_ts = now_ms();
2431    if let Some(timeline) = bus.read_mut::<Timeline>() {
2432        timeline.push(TimelineEvent::NodeEnter {
2433            node_id: node_id.to_string(),
2434            node_label: node_label.to_string(),
2435            timestamp: enter_ts,
2436        });
2437    }
2438
2439    // Check DLQ retry policy and pre-serialize state for potential retries
2440    let dlq_retry_config = bus.read::<DlqPolicy>().and_then(|p| {
2441        if let DlqPolicy::RetryThenDlq {
2442            max_attempts,
2443            backoff_ms,
2444        } = p
2445        {
2446            Some((*max_attempts, *backoff_ms))
2447        } else {
2448            None
2449        }
2450    });
2451    let retry_state_snapshot = if dlq_retry_config.is_some() {
2452        serde_json::to_value(&state).ok()
2453    } else {
2454        None
2455    };
2456
2457    // State capture for Saga - SERIALIZE BEFORE CONSUMPTION
2458    let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2459        Some(serde_json::to_vec(&state).unwrap_or_default())
2460    } else {
2461        None
2462    };
2463
2464    let node_span = tracing::info_span!(
2465        "Node",
2466        ranvier.node = %label,
2467        ranvier.resource_type = %res_type,
2468        ranvier.outcome_kind = tracing::field::Empty,
2469        ranvier.outcome_target = tracing::field::Empty
2470    );
2471    let started = std::time::Instant::now();
2472    bus.set_access_policy(label.clone(), bus_policy.clone());
2473    let result = trans
2474        .run(state, res, bus)
2475        .instrument(node_span.clone())
2476        .await;
2477    bus.clear_access_policy();
2478
2479    // DLQ Retry loop: if first attempt faulted and RetryThenDlq is configured,
2480    // retry with exponential backoff before giving up.
2481    let result = if let Outcome::Fault(_) = &result {
2482        if let (Some((max_attempts, backoff_ms)), Some(snapshot)) =
2483            (dlq_retry_config, &retry_state_snapshot)
2484        {
2485            let mut final_result = result;
2486            // attempt 1 already done; retry from 2..=max_attempts
2487            for attempt in 2..=max_attempts {
2488                let delay = backoff_ms.saturating_mul(2u64.saturating_pow(attempt - 2));
2489
2490                tracing::info!(
2491                    ranvier.node = %label,
2492                    attempt = attempt,
2493                    max_attempts = max_attempts,
2494                    backoff_ms = delay,
2495                    "Retrying faulted node"
2496                );
2497
2498                if let Some(timeline) = bus.read_mut::<Timeline>() {
2499                    timeline.push(TimelineEvent::NodeRetry {
2500                        node_id: node_id.to_string(),
2501                        attempt,
2502                        max_attempts,
2503                        backoff_ms: delay,
2504                        timestamp: now_ms(),
2505                    });
2506                }
2507
2508                tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
2509
2510                if let Ok(retry_state) = serde_json::from_value::<In>(snapshot.clone()) {
2511                    bus.set_access_policy(label.clone(), bus_policy.clone());
2512                    let retry_result = trans
2513                        .run(retry_state, res, bus)
2514                        .instrument(tracing::info_span!(
2515                            "NodeRetry",
2516                            ranvier.node = %label,
2517                            attempt = attempt
2518                        ))
2519                        .await;
2520                    bus.clear_access_policy();
2521
2522                    match &retry_result {
2523                        Outcome::Fault(_) => {
2524                            final_result = retry_result;
2525                        }
2526                        _ => {
2527                            final_result = retry_result;
2528                            break;
2529                        }
2530                    }
2531                }
2532            }
2533            final_result
2534        } else {
2535            result
2536        }
2537    } else {
2538        result
2539    };
2540
2541    node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
2542    if let Some(target) = outcome_target(&result) {
2543        node_span.record("ranvier.outcome_target", tracing::field::display(&target));
2544    }
2545    let duration_ms = started.elapsed().as_millis() as u64;
2546    let exit_ts = now_ms();
2547
2548    if let Some(timeline) = bus.read_mut::<Timeline>() {
2549        timeline.push(TimelineEvent::NodeExit {
2550            node_id: node_id.to_string(),
2551            outcome_type: outcome_type_name(&result),
2552            duration_ms,
2553            timestamp: exit_ts,
2554        });
2555
2556        if let Outcome::Branch(branch_id, _) = &result {
2557            timeline.push(TimelineEvent::Branchtaken {
2558                branch_id: branch_id.clone(),
2559                timestamp: exit_ts,
2560            });
2561        }
2562    }
2563
2564    // Push to Saga Stack if Next outcome and snapshot taken
2565    if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot)
2566        && let Some(stack) = bus.read_mut::<SagaStack>()
2567    {
2568        stack.push(node_id.to_string(), label.clone(), snapshot);
2569    }
2570
2571    if let Some(handle) = bus.read::<PersistenceHandle>() {
2572        let trace_id = persistence_trace_id(bus);
2573        let circuit = bus
2574            .read::<ranvier_core::schematic::Schematic>()
2575            .map(|s| s.name.clone())
2576            .unwrap_or_default();
2577        let version = bus
2578            .read::<ranvier_core::schematic::Schematic>()
2579            .map(|s| s.schema_version.clone())
2580            .unwrap_or_default();
2581
2582        persist_execution_event(
2583            handle,
2584            &trace_id,
2585            &circuit,
2586            &version,
2587            step_idx,
2588            Some(node_id.to_string()),
2589            outcome_kind_name(&result),
2590            Some(result.to_json_value()),
2591        )
2592        .await;
2593    }
2594
2595    // DLQ reporting — only fires after all retries are exhausted (RetryThenDlq)
2596    // or immediately (SendToDlq). Drop policy skips entirely.
2597    if let Outcome::Fault(f) = &result {
2598        // Read policy and sink, then drop the borrows before mutable timeline access
2599        let dlq_action = {
2600            let policy = bus.read::<DlqPolicy>();
2601            let sink = bus.read::<Arc<dyn DlqSink>>();
2602            match (sink, policy) {
2603                (Some(s), Some(p)) if !matches!(p, DlqPolicy::Drop) => Some(s.clone()),
2604                _ => None,
2605            }
2606        };
2607
2608        if let Some(sink) = dlq_action {
2609            if let Some((max_attempts, _)) = dlq_retry_config
2610                && let Some(timeline) = bus.read_mut::<Timeline>()
2611            {
2612                timeline.push(TimelineEvent::DlqExhausted {
2613                    node_id: node_id.to_string(),
2614                    total_attempts: max_attempts,
2615                    timestamp: now_ms(),
2616                });
2617            }
2618
2619            let trace_id = persistence_trace_id(bus);
2620            let circuit = bus
2621                .read::<ranvier_core::schematic::Schematic>()
2622                .map(|s| s.name.clone())
2623                .unwrap_or_default();
2624            let _ = sink
2625                .store_dead_letter(
2626                    &trace_id,
2627                    &circuit,
2628                    node_id,
2629                    &format!("{:?}", f),
2630                    &serde_json::to_vec(&f).unwrap_or_default(),
2631                )
2632                .await;
2633        }
2634    }
2635
2636    result
2637}
2638
2639#[allow(clippy::too_many_arguments)]
2640async fn run_this_compensated_step<Out, Next, E, Res, Comp>(
2641    trans: &(impl Transition<Out, Next, Resources = Res, Error = E> + Clone + 'static),
2642    comp: &Comp,
2643    state: Out,
2644    res: &Res,
2645    bus: &mut Bus,
2646    node_id: &str,
2647    comp_node_id: &str,
2648    node_label: &str,
2649    bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2650    step_idx: u64,
2651) -> Outcome<Next, E>
2652where
2653    Out: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
2654    Next: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2655    E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2656    Res: ranvier_core::transition::ResourceRequirement,
2657    Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
2658{
2659    let label = trans.label();
2660
2661    // Debug pausing
2662    let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2663        debug.should_pause(node_id)
2664    } else {
2665        false
2666    };
2667
2668    if should_pause {
2669        let trace_id = persistence_trace_id(bus);
2670        tracing::event!(
2671            target: "ranvier.debugger",
2672            tracing::Level::INFO,
2673            trace_id = %trace_id,
2674            node_id = %node_id,
2675            "Node paused (compensated)"
2676        );
2677
2678        if let Some(timeline) = bus.read_mut::<Timeline>() {
2679            timeline.push(TimelineEvent::NodePaused {
2680                node_id: node_id.to_string(),
2681                timestamp: now_ms(),
2682            });
2683        }
2684        if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2685            debug.wait().await;
2686        }
2687    }
2688
2689    let enter_ts = now_ms();
2690    if let Some(timeline) = bus.read_mut::<Timeline>() {
2691        timeline.push(TimelineEvent::NodeEnter {
2692            node_id: node_id.to_string(),
2693            node_label: node_label.to_string(),
2694            timestamp: enter_ts,
2695        });
2696    }
2697
2698    // State capture for Saga - SERIALIZE BEFORE CONSUMPTION
2699    let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2700        Some(serde_json::to_vec(&state).unwrap_or_default())
2701    } else {
2702        None
2703    };
2704
2705    let node_span = tracing::info_span!("Node", ranvier.node = %label);
2706    bus.set_access_policy(label.clone(), bus_policy.clone());
2707    let result = trans
2708        .run(state.clone(), res, bus)
2709        .instrument(node_span)
2710        .await;
2711    bus.clear_access_policy();
2712
2713    let duration_ms = 0; // Simplified
2714    let exit_ts = now_ms();
2715
2716    if let Some(timeline) = bus.read_mut::<Timeline>() {
2717        timeline.push(TimelineEvent::NodeExit {
2718            node_id: node_id.to_string(),
2719            outcome_type: outcome_type_name(&result),
2720            duration_ms,
2721            timestamp: exit_ts,
2722        });
2723    }
2724
2725    // Automated Compensation Trigger
2726    if let Outcome::Fault(ref err) = result {
2727        if compensation_auto_trigger(bus) {
2728            tracing::info!(
2729                ranvier.node = %label,
2730                ranvier.compensation.trigger = "saga",
2731                error = ?err,
2732                "Saga compensation triggered"
2733            );
2734
2735            if let Some(timeline) = bus.read_mut::<Timeline>() {
2736                timeline.push(TimelineEvent::NodeEnter {
2737                    node_id: comp_node_id.to_string(),
2738                    node_label: format!("Compensate: {}", comp.label()),
2739                    timestamp: exit_ts,
2740                });
2741            }
2742
2743            // Run compensation
2744            let _ = comp.run(state, res, bus).await;
2745
2746            if let Some(timeline) = bus.read_mut::<Timeline>() {
2747                timeline.push(TimelineEvent::NodeExit {
2748                    node_id: comp_node_id.to_string(),
2749                    outcome_type: "Compensated".to_string(),
2750                    duration_ms: 0,
2751                    timestamp: now_ms(),
2752                });
2753            }
2754
2755            if let Some(handle) = bus.read::<PersistenceHandle>() {
2756                let trace_id = persistence_trace_id(bus);
2757                let circuit = bus
2758                    .read::<ranvier_core::schematic::Schematic>()
2759                    .map(|s| s.name.clone())
2760                    .unwrap_or_default();
2761                let version = bus
2762                    .read::<ranvier_core::schematic::Schematic>()
2763                    .map(|s| s.schema_version.clone())
2764                    .unwrap_or_default();
2765
2766                persist_execution_event(
2767                    handle,
2768                    &trace_id,
2769                    &circuit,
2770                    &version,
2771                    step_idx + 1, // Compensation node index
2772                    Some(comp_node_id.to_string()),
2773                    "Compensated",
2774                    None,
2775                )
2776                .await;
2777            }
2778        }
2779    } else if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot) {
2780        // Push to Saga Stack if Next outcome and snapshot taken
2781        if let Some(stack) = bus.read_mut::<SagaStack>() {
2782            stack.push(node_id.to_string(), label.clone(), snapshot);
2783        }
2784
2785        if let Some(handle) = bus.read::<PersistenceHandle>() {
2786            let trace_id = persistence_trace_id(bus);
2787            let circuit = bus
2788                .read::<ranvier_core::schematic::Schematic>()
2789                .map(|s| s.name.clone())
2790                .unwrap_or_default();
2791            let version = bus
2792                .read::<ranvier_core::schematic::Schematic>()
2793                .map(|s| s.schema_version.clone())
2794                .unwrap_or_default();
2795
2796            persist_execution_event(
2797                handle,
2798                &trace_id,
2799                &circuit,
2800                &version,
2801                step_idx,
2802                Some(node_id.to_string()),
2803                outcome_kind_name(&result),
2804                Some(result.to_json_value()),
2805            )
2806            .await;
2807        }
2808    }
2809
2810    // DLQ reporting for compensated steps
2811    if let Outcome::Fault(f) = &result
2812        && let (Some(sink), Some(policy)) =
2813            (bus.read::<Arc<dyn DlqSink>>(), bus.read::<DlqPolicy>())
2814    {
2815        let should_dlq = !matches!(policy, DlqPolicy::Drop);
2816        if should_dlq {
2817            let trace_id = persistence_trace_id(bus);
2818            let circuit = bus
2819                .read::<ranvier_core::schematic::Schematic>()
2820                .map(|s| s.name.clone())
2821                .unwrap_or_default();
2822            let _ = sink
2823                .store_dead_letter(
2824                    &trace_id,
2825                    &circuit,
2826                    node_id,
2827                    &format!("{:?}", f),
2828                    &serde_json::to_vec(&f).unwrap_or_default(),
2829                )
2830                .await;
2831        }
2832    }
2833
2834    result
2835}
2836
2837#[allow(clippy::too_many_arguments)]
2838pub async fn persist_execution_event(
2839    handle: &PersistenceHandle,
2840    trace_id: &str,
2841    circuit: &str,
2842    schematic_version: &str,
2843    step: u64,
2844    node_id: Option<String>,
2845    outcome_kind: &str,
2846    payload: Option<serde_json::Value>,
2847) {
2848    let store = handle.store();
2849    let envelope = PersistenceEnvelope {
2850        trace_id: trace_id.to_string(),
2851        circuit: circuit.to_string(),
2852        schematic_version: schematic_version.to_string(),
2853        step,
2854        node_id,
2855        outcome_kind: outcome_kind.to_string(),
2856        timestamp_ms: now_ms(),
2857        payload_hash: None,
2858        payload,
2859    };
2860
2861    if let Err(err) = store.append(envelope).await {
2862        tracing::warn!(
2863            trace_id = %trace_id,
2864            circuit = %circuit,
2865            step,
2866            outcome_kind = %outcome_kind,
2867            error = %err,
2868            "Failed to append persistence envelope"
2869        );
2870    }
2871}
2872
2873async fn persist_completion(
2874    handle: &PersistenceHandle,
2875    trace_id: &str,
2876    completion: CompletionState,
2877) {
2878    let store = handle.store();
2879    if let Err(err) = store.complete(trace_id, completion).await {
2880        tracing::warn!(
2881            trace_id = %trace_id,
2882            error = %err,
2883            "Failed to complete persistence trace"
2884        );
2885    }
2886}
2887
2888fn compensation_idempotency_key(context: &CompensationContext) -> String {
2889    format!(
2890        "{}:{}:{}",
2891        context.trace_id, context.circuit, context.fault_kind
2892    )
2893}
2894
2895async fn run_compensation(
2896    handle: &CompensationHandle,
2897    context: CompensationContext,
2898    retry_policy: CompensationRetryPolicy,
2899    idempotency: Option<CompensationIdempotencyHandle>,
2900) -> bool {
2901    let hook = handle.hook();
2902    let key = compensation_idempotency_key(&context);
2903
2904    if let Some(store_handle) = idempotency.as_ref() {
2905        let store = store_handle.store();
2906        match store.was_compensated(&key).await {
2907            Ok(true) => {
2908                tracing::info!(
2909                    trace_id = %context.trace_id,
2910                    circuit = %context.circuit,
2911                    key = %key,
2912                    "Compensation already recorded; skipping duplicate hook execution"
2913                );
2914                return true;
2915            }
2916            Ok(false) => {}
2917            Err(err) => {
2918                tracing::warn!(
2919                    trace_id = %context.trace_id,
2920                    key = %key,
2921                    error = %err,
2922                    "Failed to check compensation idempotency state"
2923                );
2924            }
2925        }
2926    }
2927
2928    let max_attempts = retry_policy.max_attempts.max(1);
2929    for attempt in 1..=max_attempts {
2930        match hook.compensate(context.clone()).await {
2931            Ok(()) => {
2932                if let Some(store_handle) = idempotency.as_ref() {
2933                    let store = store_handle.store();
2934                    if let Err(err) = store.mark_compensated(&key).await {
2935                        tracing::warn!(
2936                            trace_id = %context.trace_id,
2937                            key = %key,
2938                            error = %err,
2939                            "Failed to mark compensation idempotency state"
2940                        );
2941                    }
2942                }
2943                return true;
2944            }
2945            Err(err) => {
2946                let is_last = attempt == max_attempts;
2947                tracing::warn!(
2948                    trace_id = %context.trace_id,
2949                    circuit = %context.circuit,
2950                    fault_kind = %context.fault_kind,
2951                    fault_step = context.fault_step,
2952                    attempt,
2953                    max_attempts,
2954                    error = %err,
2955                    "Compensation hook attempt failed"
2956                );
2957                if !is_last && retry_policy.backoff_ms > 0 {
2958                    tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
2959                        .await;
2960                }
2961            }
2962        }
2963    }
2964    false
2965}
2966
2967fn ensure_timeline(bus: &mut Bus) -> bool {
2968    if bus.has::<Timeline>() {
2969        false
2970    } else {
2971        bus.insert(Timeline::new());
2972        true
2973    }
2974}
2975
2976fn should_attach_timeline(bus: &Bus) -> bool {
2977    // Respect explicitly provided timeline collector from caller.
2978    if bus.has::<Timeline>() {
2979        return true;
2980    }
2981
2982    // Attach timeline when runtime export path exists.
2983    has_timeline_output_path()
2984}
2985
2986fn has_timeline_output_path() -> bool {
2987    std::env::var("RANVIER_TIMELINE_OUTPUT")
2988        .ok()
2989        .map(|v| !v.trim().is_empty())
2990        .unwrap_or(false)
2991}
2992
2993fn timeline_sample_rate() -> f64 {
2994    std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
2995        .ok()
2996        .and_then(|v| v.parse::<f64>().ok())
2997        .map(|v| v.clamp(0.0, 1.0))
2998        .unwrap_or(1.0)
2999}
3000
3001fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
3002    if rate <= 0.0 {
3003        return false;
3004    }
3005    if rate >= 1.0 {
3006        return true;
3007    }
3008    let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
3009    bucket < rate
3010}
3011
3012fn timeline_adaptive_policy() -> String {
3013    std::env::var("RANVIER_TIMELINE_ADAPTIVE")
3014        .unwrap_or_else(|_| "fault_branch".to_string())
3015        .to_ascii_lowercase()
3016}
3017
3018fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
3019    match policy {
3020        "off" => false,
3021        "fault_only" => matches!(outcome, Outcome::Fault(_)),
3022        "fault_branch_emit" => {
3023            matches!(
3024                outcome,
3025                Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
3026            )
3027        }
3028        _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
3029    }
3030}
3031
3032#[derive(Default, Clone)]
3033struct SamplingStats {
3034    total_decisions: u64,
3035    exported: u64,
3036    skipped: u64,
3037    sampled_exports: u64,
3038    forced_exports: u64,
3039    last_mode: String,
3040    last_policy: String,
3041    last_updated_ms: u64,
3042}
3043
3044static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
3045
3046fn stats_cell() -> &'static Mutex<SamplingStats> {
3047    TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
3048}
3049
3050fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
3051    let snapshot = {
3052        let mut stats = match stats_cell().lock() {
3053            Ok(guard) => guard,
3054            Err(_) => return,
3055        };
3056
3057        stats.total_decisions += 1;
3058        if exported {
3059            stats.exported += 1;
3060        } else {
3061            stats.skipped += 1;
3062        }
3063        if sampled && exported {
3064            stats.sampled_exports += 1;
3065        }
3066        if forced && exported {
3067            stats.forced_exports += 1;
3068        }
3069        stats.last_mode = mode.to_string();
3070        stats.last_policy = policy.to_string();
3071        stats.last_updated_ms = now_ms();
3072        stats.clone()
3073    };
3074
3075    tracing::debug!(
3076        ranvier.timeline.total_decisions = snapshot.total_decisions,
3077        ranvier.timeline.exported = snapshot.exported,
3078        ranvier.timeline.skipped = snapshot.skipped,
3079        ranvier.timeline.sampled_exports = snapshot.sampled_exports,
3080        ranvier.timeline.forced_exports = snapshot.forced_exports,
3081        ranvier.timeline.mode = %snapshot.last_mode,
3082        ranvier.timeline.policy = %snapshot.last_policy,
3083        "Timeline sampling stats updated"
3084    );
3085
3086    if let Some(path) = timeline_stats_output_path() {
3087        let payload = serde_json::json!({
3088            "total_decisions": snapshot.total_decisions,
3089            "exported": snapshot.exported,
3090            "skipped": snapshot.skipped,
3091            "sampled_exports": snapshot.sampled_exports,
3092            "forced_exports": snapshot.forced_exports,
3093            "last_mode": snapshot.last_mode,
3094            "last_policy": snapshot.last_policy,
3095            "last_updated_ms": snapshot.last_updated_ms
3096        });
3097        if let Some(parent) = Path::new(&path).parent() {
3098            let _ = fs::create_dir_all(parent);
3099        }
3100        if let Err(err) = fs::write(&path, payload.to_string()) {
3101            tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
3102        }
3103    }
3104}
3105
3106fn timeline_stats_output_path() -> Option<String> {
3107    std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
3108        .ok()
3109        .filter(|v| !v.trim().is_empty())
3110}
3111
3112fn write_timeline_with_policy(
3113    path: &str,
3114    mode: &str,
3115    mut timeline: Timeline,
3116) -> Result<(), String> {
3117    match mode {
3118        "append" => {
3119            if Path::new(path).exists() {
3120                let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
3121                match serde_json::from_str::<Timeline>(&content) {
3122                    Ok(mut existing) => {
3123                        existing.events.append(&mut timeline.events);
3124                        existing.sort();
3125                        if let Some(max_events) = max_events_limit() {
3126                            truncate_timeline_events(&mut existing, max_events);
3127                        }
3128                        write_timeline_json(path, &existing)
3129                    }
3130                    Err(_) => {
3131                        // Fallback: if existing is invalid, replace with current timeline
3132                        if let Some(max_events) = max_events_limit() {
3133                            truncate_timeline_events(&mut timeline, max_events);
3134                        }
3135                        write_timeline_json(path, &timeline)
3136                    }
3137                }
3138            } else {
3139                if let Some(max_events) = max_events_limit() {
3140                    truncate_timeline_events(&mut timeline, max_events);
3141                }
3142                write_timeline_json(path, &timeline)
3143            }
3144        }
3145        "rotate" => {
3146            let rotated_path = rotated_path(path, now_ms());
3147            write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
3148            if let Some(keep) = rotate_keep_limit() {
3149                cleanup_rotated_files(path, keep)?;
3150            }
3151            Ok(())
3152        }
3153        _ => write_timeline_json(path, &timeline),
3154    }
3155}
3156
3157fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
3158    if let Some(parent) = Path::new(path).parent()
3159        && !parent.as_os_str().is_empty()
3160    {
3161        fs::create_dir_all(parent).map_err(|e| e.to_string())?;
3162    }
3163    let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
3164    fs::write(path, json).map_err(|e| e.to_string())
3165}
3166
3167fn rotated_path(path: &str, suffix: u64) -> PathBuf {
3168    let p = Path::new(path);
3169    let parent = p.parent().unwrap_or_else(|| Path::new(""));
3170    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
3171    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
3172    parent.join(format!("{}_{}.{}", stem, suffix, ext))
3173}
3174
3175fn max_events_limit() -> Option<usize> {
3176    std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
3177        .ok()
3178        .and_then(|v| v.parse::<usize>().ok())
3179        .filter(|v| *v > 0)
3180}
3181
3182fn rotate_keep_limit() -> Option<usize> {
3183    std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
3184        .ok()
3185        .and_then(|v| v.parse::<usize>().ok())
3186        .filter(|v| *v > 0)
3187}
3188
3189fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
3190    let len = timeline.events.len();
3191    if len > max_events {
3192        let keep_from = len - max_events;
3193        timeline.events = timeline.events.split_off(keep_from);
3194    }
3195}
3196
3197fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
3198    let p = Path::new(base_path);
3199    let parent = p.parent().unwrap_or_else(|| Path::new("."));
3200    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
3201    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
3202    let prefix = format!("{}_", stem);
3203    let suffix = format!(".{}", ext);
3204
3205    let mut files = fs::read_dir(parent)
3206        .map_err(|e| e.to_string())?
3207        .filter_map(|entry| entry.ok())
3208        .filter(|entry| {
3209            let name = entry.file_name();
3210            let name = name.to_string_lossy();
3211            name.starts_with(&prefix) && name.ends_with(&suffix)
3212        })
3213        .map(|entry| {
3214            let modified = entry
3215                .metadata()
3216                .ok()
3217                .and_then(|m| m.modified().ok())
3218                .unwrap_or(SystemTime::UNIX_EPOCH);
3219            (entry.path(), modified)
3220        })
3221        .collect::<Vec<_>>();
3222
3223    files.sort_by(|a, b| b.1.cmp(&a.1));
3224    for (path, _) in files.into_iter().skip(keep) {
3225        let _ = fs::remove_file(path);
3226    }
3227    Ok(())
3228}
3229
3230fn bus_capability_schema_from_policy(
3231    policy: Option<ranvier_core::bus::BusAccessPolicy>,
3232) -> Option<BusCapabilitySchema> {
3233    let policy = policy?;
3234
3235    let mut allow = policy
3236        .allow
3237        .unwrap_or_default()
3238        .into_iter()
3239        .map(|entry| entry.type_name.to_string())
3240        .collect::<Vec<_>>();
3241    let mut deny = policy
3242        .deny
3243        .into_iter()
3244        .map(|entry| entry.type_name.to_string())
3245        .collect::<Vec<_>>();
3246    allow.sort();
3247    deny.sort();
3248
3249    if allow.is_empty() && deny.is_empty() {
3250        return None;
3251    }
3252
3253    Some(BusCapabilitySchema { allow, deny })
3254}
3255
3256fn now_ms() -> u64 {
3257    SystemTime::now()
3258        .duration_since(UNIX_EPOCH)
3259        .map(|d| d.as_millis() as u64)
3260        .unwrap_or(0)
3261}
3262
3263#[cfg(test)]
3264mod tests {
3265    use super::{
3266        Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
3267        should_force_export,
3268    };
3269    use crate::persistence::{
3270        CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
3271        CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
3272        InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
3273        PersistenceHandle, PersistenceStore, PersistenceTraceId,
3274    };
3275    use anyhow::Result;
3276    use async_trait::async_trait;
3277    use ranvier_audit::{AuditError, AuditEvent, AuditSink};
3278    use ranvier_core::event::{DlqPolicy, DlqSink};
3279    use ranvier_core::saga::SagaStack;
3280    use ranvier_core::timeline::{Timeline, TimelineEvent};
3281    use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
3282    use serde::{Deserialize, Serialize};
3283    use std::sync::Arc;
3284    use tokio::sync::Mutex;
3285    use uuid::Uuid;
3286
3287    struct MockAuditSink {
3288        events: Arc<Mutex<Vec<AuditEvent>>>,
3289    }
3290
3291    #[async_trait]
3292    impl AuditSink for MockAuditSink {
3293        async fn append(&self, event: &AuditEvent) -> Result<(), AuditError> {
3294            self.events.lock().await.push(event.clone());
3295            Ok(())
3296        }
3297    }
3298
3299    #[tokio::test]
3300    async fn execute_logs_audit_events_for_intervention() {
3301        use ranvier_inspector::StateInspector;
3302
3303        let trace_id = "test-audit-trace";
3304        let store_impl = InMemoryPersistenceStore::new();
3305        let events = Arc::new(Mutex::new(Vec::new()));
3306        let sink = MockAuditSink {
3307            events: events.clone(),
3308        };
3309
3310        let axon = Axon::<i32, i32, TestInfallible>::start("AuditTest")
3311            .then(AddOne)
3312            .with_persistence_store(store_impl.clone())
3313            .with_audit_sink(sink);
3314
3315        let mut bus = Bus::new();
3316        bus.insert(PersistenceHandle::from_arc(Arc::new(store_impl.clone())));
3317        bus.insert(PersistenceTraceId::new(trace_id));
3318        let target_node_id = axon.schematic.nodes[0].id.clone();
3319
3320        // 0. Pre-requisite: Save an initial trace state so intervention has a target to attach to
3321        store_impl
3322            .append(crate::persistence::PersistenceEnvelope {
3323                trace_id: trace_id.to_string(),
3324                circuit: "AuditTest".to_string(),
3325                schematic_version: "v1.0".to_string(),
3326                step: 0,
3327                node_id: None,
3328                outcome_kind: "Next".to_string(),
3329                timestamp_ms: 0,
3330                payload_hash: None,
3331                payload: None,
3332            })
3333            .await
3334            .unwrap();
3335
3336        // 1. Trigger force_resume (should log ForceResume)
3337        axon.force_resume(trace_id, &target_node_id, None)
3338            .await
3339            .unwrap();
3340
3341        // 2. Execute (should log ApplyIntervention)
3342        axon.execute(10, &(), &mut bus).await;
3343
3344        let recorded = events.lock().await;
3345        assert_eq!(
3346            recorded.len(),
3347            2,
3348            "Should have 2 audit events: ForceResume and ApplyIntervention"
3349        );
3350        assert_eq!(recorded[0].action, "ForceResume");
3351        assert_eq!(recorded[0].target, trace_id);
3352        assert_eq!(recorded[1].action, "ApplyIntervention");
3353        assert_eq!(recorded[1].target, trace_id);
3354    }
3355
3356    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
3357    pub enum TestInfallible {}
3358
3359    #[test]
3360    fn inspector_enabled_flag_matrix() {
3361        assert!(inspector_enabled_from_value(None));
3362        assert!(inspector_enabled_from_value(Some("1")));
3363        assert!(inspector_enabled_from_value(Some("true")));
3364        assert!(inspector_enabled_from_value(Some("on")));
3365        assert!(!inspector_enabled_from_value(Some("0")));
3366        assert!(!inspector_enabled_from_value(Some("false")));
3367    }
3368
3369    #[test]
3370    fn inspector_dev_mode_matrix() {
3371        assert!(inspector_dev_mode_from_value(None));
3372        assert!(inspector_dev_mode_from_value(Some("dev")));
3373        assert!(inspector_dev_mode_from_value(Some("staging")));
3374        assert!(!inspector_dev_mode_from_value(Some("prod")));
3375        assert!(!inspector_dev_mode_from_value(Some("production")));
3376    }
3377
3378    #[test]
3379    fn adaptive_policy_force_export_matrix() {
3380        let next = Outcome::<(), &'static str>::Next(());
3381        let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
3382        let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
3383        let fault = Outcome::<(), &'static str>::Fault("boom");
3384
3385        assert!(!should_force_export(&next, "off"));
3386        assert!(!should_force_export(&fault, "off"));
3387
3388        assert!(!should_force_export(&branch, "fault_only"));
3389        assert!(should_force_export(&fault, "fault_only"));
3390
3391        assert!(should_force_export(&branch, "fault_branch"));
3392        assert!(!should_force_export(&emit, "fault_branch"));
3393        assert!(should_force_export(&fault, "fault_branch"));
3394
3395        assert!(should_force_export(&branch, "fault_branch_emit"));
3396        assert!(should_force_export(&emit, "fault_branch_emit"));
3397        assert!(should_force_export(&fault, "fault_branch_emit"));
3398    }
3399
3400    #[test]
3401    fn sampling_and_adaptive_combination_decisions() {
3402        let bus_id = Uuid::nil();
3403        let next = Outcome::<(), &'static str>::Next(());
3404        let fault = Outcome::<(), &'static str>::Fault("boom");
3405
3406        let sampled_never = sampled_by_bus_id(bus_id, 0.0);
3407        assert!(!sampled_never);
3408        assert!(!(sampled_never || should_force_export(&next, "off")));
3409        assert!(sampled_never || should_force_export(&fault, "fault_only"));
3410
3411        let sampled_always = sampled_by_bus_id(bus_id, 1.0);
3412        assert!(sampled_always);
3413        assert!(sampled_always || should_force_export(&next, "off"));
3414        assert!(sampled_always || should_force_export(&fault, "off"));
3415    }
3416
3417    #[derive(Clone)]
3418    struct AddOne;
3419
3420    #[async_trait]
3421    impl Transition<i32, i32> for AddOne {
3422        type Error = TestInfallible;
3423        type Resources = ();
3424
3425        async fn run(
3426            &self,
3427            state: i32,
3428            _resources: &Self::Resources,
3429            _bus: &mut Bus,
3430        ) -> Outcome<i32, Self::Error> {
3431            Outcome::Next(state + 1)
3432        }
3433    }
3434
3435    #[derive(Clone)]
3436    struct AlwaysFault;
3437
3438    #[async_trait]
3439    impl Transition<i32, i32> for AlwaysFault {
3440        type Error = String;
3441        type Resources = ();
3442
3443        async fn run(
3444            &self,
3445            _state: i32,
3446            _resources: &Self::Resources,
3447            _bus: &mut Bus,
3448        ) -> Outcome<i32, Self::Error> {
3449            Outcome::Fault("boom".to_string())
3450        }
3451    }
3452
3453    #[derive(Clone)]
3454    struct CapabilityGuarded;
3455
3456    #[async_trait]
3457    impl Transition<(), ()> for CapabilityGuarded {
3458        type Error = String;
3459        type Resources = ();
3460
3461        fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
3462            Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
3463        }
3464
3465        async fn run(
3466            &self,
3467            _state: (),
3468            _resources: &Self::Resources,
3469            bus: &mut Bus,
3470        ) -> Outcome<(), Self::Error> {
3471            match bus.get::<String>() {
3472                Ok(_) => Outcome::Next(()),
3473                Err(err) => Outcome::Fault(err.to_string()),
3474            }
3475        }
3476    }
3477
3478    #[derive(Clone)]
3479    struct RecordingCompensationHook {
3480        calls: Arc<Mutex<Vec<CompensationContext>>>,
3481        should_fail: bool,
3482    }
3483
3484    #[async_trait]
3485    impl CompensationHook for RecordingCompensationHook {
3486        async fn compensate(&self, context: CompensationContext) -> Result<()> {
3487            self.calls.lock().await.push(context);
3488            if self.should_fail {
3489                return Err(anyhow::anyhow!("compensation failed"));
3490            }
3491            Ok(())
3492        }
3493    }
3494
3495    #[derive(Clone)]
3496    struct FlakyCompensationHook {
3497        calls: Arc<Mutex<u32>>,
3498        failures_remaining: Arc<Mutex<u32>>,
3499    }
3500
3501    #[async_trait]
3502    impl CompensationHook for FlakyCompensationHook {
3503        async fn compensate(&self, _context: CompensationContext) -> Result<()> {
3504            {
3505                let mut calls = self.calls.lock().await;
3506                *calls += 1;
3507            }
3508            let mut failures_remaining = self.failures_remaining.lock().await;
3509            if *failures_remaining > 0 {
3510                *failures_remaining -= 1;
3511                return Err(anyhow::anyhow!("transient compensation failure"));
3512            }
3513            Ok(())
3514        }
3515    }
3516
3517    #[derive(Clone)]
3518    struct FailingCompensationIdempotencyStore {
3519        read_calls: Arc<Mutex<u32>>,
3520        write_calls: Arc<Mutex<u32>>,
3521    }
3522
3523    #[async_trait]
3524    impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
3525        async fn was_compensated(&self, _key: &str) -> Result<bool> {
3526            let mut read_calls = self.read_calls.lock().await;
3527            *read_calls += 1;
3528            Err(anyhow::anyhow!("forced idempotency read failure"))
3529        }
3530
3531        async fn mark_compensated(&self, _key: &str) -> Result<()> {
3532            let mut write_calls = self.write_calls.lock().await;
3533            *write_calls += 1;
3534            Err(anyhow::anyhow!("forced idempotency write failure"))
3535        }
3536    }
3537
3538    #[tokio::test]
3539    async fn execute_persists_success_trace_when_handle_exists() {
3540        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3541        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3542
3543        let mut bus = Bus::new();
3544        bus.insert(PersistenceHandle::from_arc(store_dyn));
3545        bus.insert(PersistenceTraceId::new("trace-success"));
3546
3547        let axon = Axon::<i32, i32, TestInfallible>::start("PersistSuccess").then(AddOne);
3548        let outcome = axon.execute(41, &(), &mut bus).await;
3549        assert!(matches!(outcome, Outcome::Next(42)));
3550
3551        let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
3552        assert_eq!(persisted.events.len(), 3); // Enter + step-level Next + final Next
3553        assert_eq!(persisted.events[0].outcome_kind, "Enter");
3554        assert_eq!(persisted.events[1].outcome_kind, "Next"); // step-level
3555        assert_eq!(persisted.events[2].outcome_kind, "Next"); // final
3556        assert_eq!(persisted.completion, Some(CompletionState::Success));
3557    }
3558
3559    #[tokio::test]
3560    async fn execute_persists_fault_completion_state() {
3561        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3562        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3563
3564        let mut bus = Bus::new();
3565        bus.insert(PersistenceHandle::from_arc(store_dyn));
3566        bus.insert(PersistenceTraceId::new("trace-fault"));
3567
3568        let axon = Axon::<i32, i32, String>::start("PersistFault").then(AlwaysFault);
3569        let outcome = axon.execute(41, &(), &mut bus).await;
3570        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3571
3572        let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
3573        assert_eq!(persisted.events.len(), 3); // Enter + step-level Fault + final Fault
3574        assert_eq!(persisted.events[1].outcome_kind, "Fault"); // step-level
3575        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3576        assert_eq!(persisted.completion, Some(CompletionState::Fault));
3577    }
3578
3579    #[tokio::test]
3580    async fn execute_respects_persistence_auto_complete_off() {
3581        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3582        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3583
3584        let mut bus = Bus::new();
3585        bus.insert(PersistenceHandle::from_arc(store_dyn));
3586        bus.insert(PersistenceTraceId::new("trace-no-complete"));
3587        bus.insert(PersistenceAutoComplete(false));
3588
3589        let axon = Axon::<i32, i32, TestInfallible>::start("PersistNoComplete").then(AddOne);
3590        let outcome = axon.execute(1, &(), &mut bus).await;
3591        assert!(matches!(outcome, Outcome::Next(2)));
3592
3593        let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
3594        assert_eq!(persisted.events.len(), 3); // Enter + step-level Next + final Next
3595        assert_eq!(persisted.completion, None);
3596    }
3597
3598    #[tokio::test]
3599    async fn fault_triggers_compensation_and_marks_compensated() {
3600        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3601        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3602        let calls = Arc::new(Mutex::new(Vec::new()));
3603        let compensation = RecordingCompensationHook {
3604            calls: calls.clone(),
3605            should_fail: false,
3606        };
3607
3608        let mut bus = Bus::new();
3609        bus.insert(PersistenceHandle::from_arc(store_dyn));
3610        bus.insert(PersistenceTraceId::new("trace-compensated"));
3611        bus.insert(CompensationHandle::from_hook(compensation));
3612
3613        let axon = Axon::<i32, i32, String>::start("CompensatedFault").then(AlwaysFault);
3614        let outcome = axon.execute(7, &(), &mut bus).await;
3615        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3616
3617        let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
3618        assert_eq!(persisted.events.len(), 4); // Enter + step-level Fault + final Fault + Compensated
3619        assert_eq!(persisted.events[0].outcome_kind, "Enter");
3620        assert_eq!(persisted.events[1].outcome_kind, "Fault"); // step-level
3621        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3622        assert_eq!(persisted.events[3].outcome_kind, "Compensated");
3623        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3624
3625        let recorded = calls.lock().await;
3626        assert_eq!(recorded.len(), 1);
3627        assert_eq!(recorded[0].trace_id, "trace-compensated");
3628        assert_eq!(recorded[0].fault_kind, "Fault");
3629    }
3630
3631    #[tokio::test]
3632    async fn failed_compensation_keeps_fault_completion() {
3633        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3634        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3635        let calls = Arc::new(Mutex::new(Vec::new()));
3636        let compensation = RecordingCompensationHook {
3637            calls: calls.clone(),
3638            should_fail: true,
3639        };
3640
3641        let mut bus = Bus::new();
3642        bus.insert(PersistenceHandle::from_arc(store_dyn));
3643        bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
3644        bus.insert(CompensationHandle::from_hook(compensation));
3645
3646        let axon = Axon::<i32, i32, String>::start("CompensationFails").then(AlwaysFault);
3647        let outcome = axon.execute(7, &(), &mut bus).await;
3648        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3649
3650        let persisted = store_impl
3651            .load("trace-compensation-failed")
3652            .await
3653            .unwrap()
3654            .unwrap();
3655        assert_eq!(persisted.events.len(), 3); // Enter + step-level Fault + final Fault
3656        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3657        assert_eq!(persisted.completion, Some(CompletionState::Fault));
3658
3659        let recorded = calls.lock().await;
3660        assert_eq!(recorded.len(), 1);
3661    }
3662
3663    #[tokio::test]
3664    async fn compensation_retry_policy_succeeds_after_retries() {
3665        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3666        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3667        let calls = Arc::new(Mutex::new(0u32));
3668        let failures_remaining = Arc::new(Mutex::new(2u32));
3669        let compensation = FlakyCompensationHook {
3670            calls: calls.clone(),
3671            failures_remaining,
3672        };
3673
3674        let mut bus = Bus::new();
3675        bus.insert(PersistenceHandle::from_arc(store_dyn));
3676        bus.insert(PersistenceTraceId::new("trace-retry-success"));
3677        bus.insert(CompensationHandle::from_hook(compensation));
3678        bus.insert(CompensationRetryPolicy {
3679            max_attempts: 3,
3680            backoff_ms: 0,
3681        });
3682
3683        let axon = Axon::<i32, i32, String>::start("CompensationRetry").then(AlwaysFault);
3684        let outcome = axon.execute(7, &(), &mut bus).await;
3685        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3686
3687        let persisted = store_impl
3688            .load("trace-retry-success")
3689            .await
3690            .unwrap()
3691            .unwrap();
3692        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3693        assert_eq!(
3694            persisted.events.last().map(|e| e.outcome_kind.as_str()),
3695            Some("Compensated")
3696        );
3697
3698        let attempt_count = calls.lock().await;
3699        assert_eq!(*attempt_count, 3);
3700    }
3701
3702    #[tokio::test]
3703    async fn compensation_idempotency_skips_duplicate_hook_execution() {
3704        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3705        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3706        let calls = Arc::new(Mutex::new(Vec::new()));
3707        let compensation = RecordingCompensationHook {
3708            calls: calls.clone(),
3709            should_fail: false,
3710        };
3711        let idempotency = InMemoryCompensationIdempotencyStore::new();
3712
3713        let mut bus = Bus::new();
3714        bus.insert(PersistenceHandle::from_arc(store_dyn));
3715        bus.insert(PersistenceTraceId::new("trace-idempotent"));
3716        bus.insert(PersistenceAutoComplete(false));
3717        bus.insert(CompensationHandle::from_hook(compensation));
3718        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3719
3720        let axon = Axon::<i32, i32, String>::start("CompensationIdempotency").then(AlwaysFault);
3721
3722        let outcome1 = axon.execute(7, &(), &mut bus).await;
3723        let outcome2 = axon.execute(8, &(), &mut bus).await;
3724        assert!(matches!(outcome1, Outcome::Fault(msg) if msg == "boom"));
3725        assert!(matches!(outcome2, Outcome::Fault(msg) if msg == "boom"));
3726
3727        let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
3728        assert_eq!(persisted.completion, None);
3729        // Verify that "Compensated" events are present for both executions
3730        let compensated_count = persisted
3731            .events
3732            .iter()
3733            .filter(|e| e.outcome_kind == "Compensated")
3734            .count();
3735        assert_eq!(
3736            compensated_count, 2,
3737            "Should have 2 Compensated events (one per execution)"
3738        );
3739
3740        let recorded = calls.lock().await;
3741        assert_eq!(recorded.len(), 1);
3742    }
3743
3744    #[tokio::test]
3745    async fn compensation_idempotency_store_failure_does_not_block_compensation() {
3746        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3747        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3748        let calls = Arc::new(Mutex::new(Vec::new()));
3749        let read_calls = Arc::new(Mutex::new(0u32));
3750        let write_calls = Arc::new(Mutex::new(0u32));
3751        let compensation = RecordingCompensationHook {
3752            calls: calls.clone(),
3753            should_fail: false,
3754        };
3755        let idempotency = FailingCompensationIdempotencyStore {
3756            read_calls: read_calls.clone(),
3757            write_calls: write_calls.clone(),
3758        };
3759
3760        let mut bus = Bus::new();
3761        bus.insert(PersistenceHandle::from_arc(store_dyn));
3762        bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
3763        bus.insert(CompensationHandle::from_hook(compensation));
3764        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3765
3766        let axon = Axon::<i32, i32, String>::start("IdempotencyStoreFailure").then(AlwaysFault);
3767        let outcome = axon.execute(9, &(), &mut bus).await;
3768        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3769
3770        let persisted = store_impl
3771            .load("trace-idempotency-store-failure")
3772            .await
3773            .unwrap()
3774            .unwrap();
3775        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3776        assert_eq!(
3777            persisted.events.last().map(|e| e.outcome_kind.as_str()),
3778            Some("Compensated")
3779        );
3780
3781        let recorded = calls.lock().await;
3782        assert_eq!(recorded.len(), 1);
3783        assert_eq!(*read_calls.lock().await, 1);
3784        assert_eq!(*write_calls.lock().await, 1);
3785    }
3786
3787    #[tokio::test]
3788    async fn transition_bus_policy_blocks_unauthorized_resource_access() {
3789        let mut bus = Bus::new();
3790        bus.insert(1_i32);
3791        bus.insert("secret".to_string());
3792
3793        let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
3794        let outcome = axon.execute((), &(), &mut bus).await;
3795
3796        match outcome {
3797            Outcome::Fault(msg) => {
3798                assert!(msg.contains("Bus access denied"), "{msg}");
3799                assert!(msg.contains("CapabilityGuarded"), "{msg}");
3800                assert!(msg.contains("alloc::string::String"), "{msg}");
3801            }
3802            other => panic!("expected fault, got {other:?}"),
3803        }
3804    }
3805
3806    #[tokio::test]
3807    async fn execute_fails_on_version_mismatch_without_migration() {
3808        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3809        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3810
3811        let trace_id = "v-mismatch";
3812        // Create an existing trace with an older version
3813        let old_envelope = crate::persistence::PersistenceEnvelope {
3814            trace_id: trace_id.to_string(),
3815            circuit: "TestCircuit".to_string(),
3816            schematic_version: "0.9".to_string(),
3817            step: 0,
3818            node_id: None,
3819            outcome_kind: "Enter".to_string(),
3820            timestamp_ms: 0,
3821            payload_hash: None,
3822            payload: None,
3823        };
3824        store_impl.append(old_envelope).await.unwrap();
3825
3826        let mut bus = Bus::new();
3827        bus.insert(PersistenceHandle::from_arc(store_dyn));
3828        bus.insert(PersistenceTraceId::new(trace_id));
3829
3830        // Current axon is version 1.0
3831        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3832        let outcome = axon.execute(10, &(), &mut bus).await;
3833
3834        if let Outcome::Emit(kind, _) = outcome {
3835            assert_eq!(kind, "execution.resumption.version_mismatch_failed");
3836        } else {
3837            panic!("Expected version mismatch emission, got {:?}", outcome);
3838        }
3839    }
3840
3841    #[tokio::test]
3842    async fn execute_resumes_from_start_on_migration_strategy() {
3843        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3844        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3845
3846        let trace_id = "v-migration";
3847        // Create an existing trace with an older version at step 5
3848        let old_envelope = crate::persistence::PersistenceEnvelope {
3849            trace_id: trace_id.to_string(),
3850            circuit: "TestCircuit".to_string(),
3851            schematic_version: "0.9".to_string(),
3852            step: 5,
3853            node_id: None,
3854            outcome_kind: "Next".to_string(),
3855            timestamp_ms: 0,
3856            payload_hash: None,
3857            payload: None,
3858        };
3859        store_impl.append(old_envelope).await.unwrap();
3860
3861        let mut registry = ranvier_core::schematic::MigrationRegistry::new("TestCircuit");
3862        registry.register(ranvier_core::schematic::SnapshotMigration {
3863            name: Some("v0.9 to v1.0".to_string()),
3864            from_version: "0.9".to_string(),
3865            to_version: "1.0".to_string(),
3866            default_strategy: ranvier_core::schematic::MigrationStrategy::ResumeFromStart,
3867            node_mapping: std::collections::HashMap::new(),
3868            payload_mapper: None,
3869        });
3870
3871        let mut bus = Bus::new();
3872        bus.insert(PersistenceHandle::from_arc(store_dyn));
3873        bus.insert(PersistenceTraceId::new(trace_id));
3874        bus.insert(registry);
3875
3876        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3877        let outcome = axon.execute(10, &(), &mut bus).await;
3878
3879        // Should have resumed from start (step 0), resulting in 11
3880        assert!(matches!(outcome, Outcome::Next(11)));
3881
3882        // Verify new event has current version
3883        let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3884        assert_eq!(persisted.schematic_version, "1.0");
3885    }
3886
3887    #[tokio::test]
3888    async fn execute_applies_manual_intervention_jump_and_payload() {
3889        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3890        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3891
3892        let trace_id = "intervention-test";
3893        // 1. Run a normal trace part-way
3894        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit")
3895            .then(AddOne)
3896            .then(AddOne);
3897
3898        let mut bus = Bus::new();
3899        bus.insert(PersistenceHandle::from_arc(store_dyn));
3900        bus.insert(PersistenceTraceId::new(trace_id));
3901
3902        // Save an intervention: Jump to the second 'AddOne' node (which has the label 'AddOne')
3903        // with a payload override of 100.
3904        // The first node is 'AddOne', the second is ALSO 'AddOne'.
3905        // Schematic position: 0=Ingress, 1=AddOne, 2=AddOne
3906        let _target_node_label = "AddOne";
3907        // To be precise, let's find the ID of the second node.
3908        let target_node_id = axon.schematic.nodes[2].id.clone();
3909
3910        // Pre-seed an initial trace entry so save_intervention doesn't fail
3911        store_impl
3912            .append(crate::persistence::PersistenceEnvelope {
3913                trace_id: trace_id.to_string(),
3914                circuit: "TestCircuit".to_string(),
3915                schematic_version: "1.0".to_string(),
3916                step: 0,
3917                node_id: None,
3918                outcome_kind: "Enter".to_string(),
3919                timestamp_ms: 0,
3920                payload_hash: None,
3921                payload: None,
3922            })
3923            .await
3924            .unwrap();
3925
3926        store_impl
3927            .save_intervention(
3928                trace_id,
3929                crate::persistence::Intervention {
3930                    target_node: target_node_id.clone(),
3931                    payload_override: Some(serde_json::json!(100)),
3932                    timestamp_ms: 0,
3933                },
3934            )
3935            .await
3936            .unwrap();
3937
3938        // 2. Execute. It should skip the first AddOne and use 100 for the second AddOne.
3939        // Result should be 100 + 1 = 101.
3940        let outcome = axon.execute(10, &(), &mut bus).await;
3941
3942        match outcome {
3943            Outcome::Next(val) => assert_eq!(val, 101, "Should have used payload 100 and added 1"),
3944            other => panic!("Expected Outcome::Next(101), got {:?}", other),
3945        }
3946
3947        // Verify the jump was logged in trace
3948        let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3949        // The last event should be from the jump target's execution.
3950        assert_eq!(persisted.interventions.len(), 1);
3951        assert_eq!(persisted.interventions[0].target_node, target_node_id);
3952    }
3953
3954    // ── DLQ Retry Tests ──────────────────────────────────────────────
3955
3956    /// A transition that fails a configurable number of times before succeeding.
3957    #[derive(Clone)]
3958    struct FailNThenSucceed {
3959        remaining: Arc<tokio::sync::Mutex<u32>>,
3960    }
3961
3962    #[async_trait]
3963    impl Transition<i32, i32> for FailNThenSucceed {
3964        type Error = String;
3965        type Resources = ();
3966
3967        async fn run(
3968            &self,
3969            state: i32,
3970            _resources: &Self::Resources,
3971            _bus: &mut Bus,
3972        ) -> Outcome<i32, Self::Error> {
3973            let mut rem = self.remaining.lock().await;
3974            if *rem > 0 {
3975                *rem -= 1;
3976                Outcome::Fault("transient failure".to_string())
3977            } else {
3978                Outcome::Next(state + 1)
3979            }
3980        }
3981    }
3982
3983    /// A mock DLQ sink that records all dead letters.
3984    #[derive(Clone)]
3985    struct MockDlqSink {
3986        letters: Arc<tokio::sync::Mutex<Vec<String>>>,
3987    }
3988
3989    #[async_trait]
3990    impl DlqSink for MockDlqSink {
3991        async fn store_dead_letter(
3992            &self,
3993            workflow_id: &str,
3994            _circuit_label: &str,
3995            node_id: &str,
3996            error_msg: &str,
3997            _payload: &[u8],
3998        ) -> Result<(), String> {
3999            let entry = format!("{}:{}:{}", workflow_id, node_id, error_msg);
4000            self.letters.lock().await.push(entry);
4001            Ok(())
4002        }
4003    }
4004
4005    #[tokio::test]
4006    async fn retry_then_dlq_retries_and_succeeds_before_exhaustion() {
4007        // Fail 2 times, succeed on 3rd attempt. Policy allows 5 attempts.
4008        let remaining = Arc::new(tokio::sync::Mutex::new(2u32));
4009        let trans = FailNThenSucceed { remaining };
4010
4011        let dlq_sink = MockDlqSink {
4012            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4013        };
4014
4015        let mut bus = Bus::new();
4016        bus.insert(Timeline::new());
4017
4018        let axon = Axon::<i32, i32, String>::start("RetrySucceed")
4019            .then(trans)
4020            .with_dlq_policy(DlqPolicy::RetryThenDlq {
4021                max_attempts: 5,
4022                backoff_ms: 1,
4023            })
4024            .with_dlq_sink(dlq_sink.clone());
4025        let outcome = axon.execute(10, &(), &mut bus).await;
4026
4027        // Should succeed (10 + 1 = 11)
4028        assert!(
4029            matches!(outcome, Outcome::Next(11)),
4030            "Expected Next(11), got {:?}",
4031            outcome
4032        );
4033
4034        // No dead letters since it recovered
4035        let letters = dlq_sink.letters.lock().await;
4036        assert!(
4037            letters.is_empty(),
4038            "Should have 0 dead letters, got {}",
4039            letters.len()
4040        );
4041
4042        // Timeline should contain NodeRetry events
4043        let timeline = bus.read::<Timeline>().unwrap();
4044        let retry_count = timeline
4045            .events
4046            .iter()
4047            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4048            .count();
4049        assert_eq!(retry_count, 2, "Should have 2 retry events");
4050    }
4051
4052    #[tokio::test]
4053    async fn retry_then_dlq_exhausts_retries_and_sends_to_dlq() {
4054        // Always fails. Policy allows 3 attempts (1 initial + 2 retries).
4055        let mut bus = Bus::new();
4056        bus.insert(Timeline::new());
4057
4058        let dlq_sink = MockDlqSink {
4059            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4060        };
4061
4062        let axon = Axon::<i32, i32, String>::start("RetryExhaust")
4063            .then(AlwaysFault)
4064            .with_dlq_policy(DlqPolicy::RetryThenDlq {
4065                max_attempts: 3,
4066                backoff_ms: 1,
4067            })
4068            .with_dlq_sink(dlq_sink.clone());
4069        let outcome = axon.execute(42, &(), &mut bus).await;
4070
4071        assert!(
4072            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4073            "Expected Fault(boom), got {:?}",
4074            outcome
4075        );
4076
4077        // Should have exactly 1 dead letter
4078        let letters = dlq_sink.letters.lock().await;
4079        assert_eq!(letters.len(), 1, "Should have 1 dead letter");
4080
4081        // Timeline should have 2 retry events and 1 DlqExhausted event
4082        let timeline = bus.read::<Timeline>().unwrap();
4083        let retry_count = timeline
4084            .events
4085            .iter()
4086            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4087            .count();
4088        let dlq_count = timeline
4089            .events
4090            .iter()
4091            .filter(|e| matches!(e, TimelineEvent::DlqExhausted { .. }))
4092            .count();
4093        assert_eq!(
4094            retry_count, 2,
4095            "Should have 2 retry events (attempts 2 and 3)"
4096        );
4097        assert_eq!(dlq_count, 1, "Should have 1 DlqExhausted event");
4098    }
4099
4100    #[tokio::test]
4101    async fn send_to_dlq_policy_sends_immediately_without_retry() {
4102        let mut bus = Bus::new();
4103        bus.insert(Timeline::new());
4104
4105        let dlq_sink = MockDlqSink {
4106            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4107        };
4108
4109        let axon = Axon::<i32, i32, String>::start("SendDlq")
4110            .then(AlwaysFault)
4111            .with_dlq_policy(DlqPolicy::SendToDlq)
4112            .with_dlq_sink(dlq_sink.clone());
4113        let outcome = axon.execute(1, &(), &mut bus).await;
4114
4115        assert!(matches!(outcome, Outcome::Fault(_)));
4116
4117        // Should have exactly 1 dead letter (immediate, no retries)
4118        let letters = dlq_sink.letters.lock().await;
4119        assert_eq!(letters.len(), 1);
4120
4121        // No retry or DlqExhausted events
4122        let timeline = bus.read::<Timeline>().unwrap();
4123        let retry_count = timeline
4124            .events
4125            .iter()
4126            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4127            .count();
4128        assert_eq!(retry_count, 0);
4129    }
4130
4131    #[tokio::test]
4132    async fn drop_policy_does_not_send_to_dlq() {
4133        let mut bus = Bus::new();
4134
4135        let dlq_sink = MockDlqSink {
4136            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4137        };
4138
4139        let axon = Axon::<i32, i32, String>::start("DropDlq")
4140            .then(AlwaysFault)
4141            .with_dlq_policy(DlqPolicy::Drop)
4142            .with_dlq_sink(dlq_sink.clone());
4143        let outcome = axon.execute(1, &(), &mut bus).await;
4144
4145        assert!(matches!(outcome, Outcome::Fault(_)));
4146
4147        // No dead letters
4148        let letters = dlq_sink.letters.lock().await;
4149        assert!(letters.is_empty());
4150    }
4151
4152    #[tokio::test]
4153    async fn dynamic_policy_hot_reload_changes_dlq_behavior() {
4154        use ranvier_core::policy::DynamicPolicy;
4155
4156        // Start with Drop policy (no DLQ)
4157        let (tx, dynamic) = DynamicPolicy::new(DlqPolicy::Drop);
4158        let dlq_sink = MockDlqSink {
4159            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4160        };
4161
4162        let axon = Axon::<i32, i32, String>::start("DynamicDlq")
4163            .then(AlwaysFault)
4164            .with_dynamic_dlq_policy(dynamic)
4165            .with_dlq_sink(dlq_sink.clone());
4166
4167        // First execution: Drop policy → no dead letters
4168        let mut bus = Bus::new();
4169        let outcome = axon.execute(1, &(), &mut bus).await;
4170        assert!(matches!(outcome, Outcome::Fault(_)));
4171        assert!(
4172            dlq_sink.letters.lock().await.is_empty(),
4173            "Drop policy should produce no DLQ entries"
4174        );
4175
4176        // Hot-reload: switch to SendToDlq
4177        tx.send(DlqPolicy::SendToDlq).unwrap();
4178
4179        // Second execution: SendToDlq policy → dead letter captured
4180        let mut bus2 = Bus::new();
4181        let outcome2 = axon.execute(2, &(), &mut bus2).await;
4182        assert!(matches!(outcome2, Outcome::Fault(_)));
4183        assert_eq!(
4184            dlq_sink.letters.lock().await.len(),
4185            1,
4186            "SendToDlq policy should produce 1 DLQ entry"
4187        );
4188    }
4189
4190    #[tokio::test]
4191    async fn dynamic_saga_policy_hot_reload() {
4192        use ranvier_core::policy::DynamicPolicy;
4193        use ranvier_core::saga::SagaPolicy;
4194
4195        // Start with Disabled saga
4196        let (tx, dynamic) = DynamicPolicy::new(SagaPolicy::Disabled);
4197
4198        let axon = Axon::<i32, i32, TestInfallible>::start("DynamicSaga")
4199            .then(AddOne)
4200            .with_dynamic_saga_policy(dynamic);
4201
4202        // First execution: Disabled → no SagaStack in bus
4203        let mut bus = Bus::new();
4204        let _outcome = axon.execute(1, &(), &mut bus).await;
4205        assert!(
4206            bus.read::<SagaStack>().is_none() || bus.read::<SagaStack>().unwrap().is_empty(),
4207            "SagaStack should be absent or empty when disabled"
4208        );
4209
4210        // Hot-reload: enable saga
4211        tx.send(SagaPolicy::Enabled).unwrap();
4212
4213        // Second execution: Enabled → SagaStack populated
4214        let mut bus2 = Bus::new();
4215        let _outcome2 = axon.execute(10, &(), &mut bus2).await;
4216        assert!(
4217            bus2.read::<SagaStack>().is_some(),
4218            "SagaStack should exist when saga is enabled"
4219        );
4220    }
4221
4222    // ── IAM Boundary Tests ──────────────────────────────────────
4223
4224    mod iam_tests {
4225        use super::*;
4226        use ranvier_core::iam::{IamError, IamIdentity, IamPolicy, IamToken, IamVerifier};
4227
4228        /// Mock IamVerifier that returns a fixed identity.
4229        #[derive(Clone)]
4230        struct MockVerifier {
4231            identity: IamIdentity,
4232            should_fail: bool,
4233        }
4234
4235        #[async_trait]
4236        impl IamVerifier for MockVerifier {
4237            async fn verify(&self, _token: &str) -> Result<IamIdentity, IamError> {
4238                if self.should_fail {
4239                    Err(IamError::InvalidToken("mock verification failure".into()))
4240                } else {
4241                    Ok(self.identity.clone())
4242                }
4243            }
4244        }
4245
4246        #[tokio::test]
4247        async fn iam_require_identity_passes_with_valid_token() {
4248            let verifier = MockVerifier {
4249                identity: IamIdentity::new("alice").with_role("user"),
4250                should_fail: false,
4251            };
4252
4253            let axon = Axon::<i32, i32, TestInfallible>::new("IamTest")
4254                .with_iam(IamPolicy::RequireIdentity, verifier)
4255                .then(AddOne);
4256
4257            let mut bus = Bus::new();
4258            bus.insert(IamToken("valid-token".to_string()));
4259            let outcome = axon.execute(10, &(), &mut bus).await;
4260
4261            assert!(matches!(outcome, Outcome::Next(11)));
4262            // Verify IamIdentity was injected into Bus
4263            let identity = bus
4264                .read::<IamIdentity>()
4265                .expect("IamIdentity should be in Bus");
4266            assert_eq!(identity.subject, "alice");
4267        }
4268
4269        #[tokio::test]
4270        async fn iam_require_identity_rejects_missing_token() {
4271            let verifier = MockVerifier {
4272                identity: IamIdentity::new("ignored"),
4273                should_fail: false,
4274            };
4275
4276            let axon = Axon::<i32, i32, TestInfallible>::new("IamNoToken")
4277                .with_iam(IamPolicy::RequireIdentity, verifier)
4278                .then(AddOne);
4279
4280            let mut bus = Bus::new();
4281            // No IamToken inserted
4282            let outcome = axon.execute(10, &(), &mut bus).await;
4283
4284            // Should emit missing_token event
4285            match &outcome {
4286                Outcome::Emit(label, _) => {
4287                    assert_eq!(label, "iam.missing_token");
4288                }
4289                other => panic!("Expected Emit(iam.missing_token), got {:?}", other),
4290            }
4291        }
4292
4293        #[tokio::test]
4294        async fn iam_rejects_failed_verification() {
4295            let verifier = MockVerifier {
4296                identity: IamIdentity::new("ignored"),
4297                should_fail: true,
4298            };
4299
4300            let axon = Axon::<i32, i32, TestInfallible>::new("IamBadToken")
4301                .with_iam(IamPolicy::RequireIdentity, verifier)
4302                .then(AddOne);
4303
4304            let mut bus = Bus::new();
4305            bus.insert(IamToken("bad-token".to_string()));
4306            let outcome = axon.execute(10, &(), &mut bus).await;
4307
4308            match &outcome {
4309                Outcome::Emit(label, _) => {
4310                    assert_eq!(label, "iam.verification_failed");
4311                }
4312                other => panic!("Expected Emit(iam.verification_failed), got {:?}", other),
4313            }
4314        }
4315
4316        #[tokio::test]
4317        async fn iam_require_role_passes_with_matching_role() {
4318            let verifier = MockVerifier {
4319                identity: IamIdentity::new("bob").with_role("admin").with_role("user"),
4320                should_fail: false,
4321            };
4322
4323            let axon = Axon::<i32, i32, TestInfallible>::new("IamRole")
4324                .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
4325                .then(AddOne);
4326
4327            let mut bus = Bus::new();
4328            bus.insert(IamToken("token".to_string()));
4329            let outcome = axon.execute(5, &(), &mut bus).await;
4330
4331            assert!(matches!(outcome, Outcome::Next(6)));
4332        }
4333
4334        #[tokio::test]
4335        async fn iam_require_role_denies_without_role() {
4336            let verifier = MockVerifier {
4337                identity: IamIdentity::new("carol").with_role("user"),
4338                should_fail: false,
4339            };
4340
4341            let axon = Axon::<i32, i32, TestInfallible>::new("IamRoleDeny")
4342                .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
4343                .then(AddOne);
4344
4345            let mut bus = Bus::new();
4346            bus.insert(IamToken("token".to_string()));
4347            let outcome = axon.execute(5, &(), &mut bus).await;
4348
4349            match &outcome {
4350                Outcome::Emit(label, _) => {
4351                    assert_eq!(label, "iam.policy_denied");
4352                }
4353                other => panic!("Expected Emit(iam.policy_denied), got {:?}", other),
4354            }
4355        }
4356
4357        #[tokio::test]
4358        async fn iam_policy_none_skips_verification() {
4359            let verifier = MockVerifier {
4360                identity: IamIdentity::new("ignored"),
4361                should_fail: true, // would fail if actually called
4362            };
4363
4364            let axon = Axon::<i32, i32, TestInfallible>::new("IamNone")
4365                .with_iam(IamPolicy::None, verifier)
4366                .then(AddOne);
4367
4368            let mut bus = Bus::new();
4369            // No token needed when policy is None
4370            let outcome = axon.execute(10, &(), &mut bus).await;
4371
4372            assert!(matches!(outcome, Outcome::Next(11)));
4373        }
4374    }
4375
4376    // ── Schema Propagation Tests (M201-RQ9, RQ12) ──────────────────
4377
4378    #[derive(Clone)]
4379    struct SchemaTransition;
4380
4381    #[async_trait]
4382    impl Transition<String, String> for SchemaTransition {
4383        type Error = String;
4384        type Resources = ();
4385
4386        fn input_schema(&self) -> Option<serde_json::Value> {
4387            Some(serde_json::json!({
4388                "type": "object",
4389                "required": ["name"],
4390                "properties": {
4391                    "name": { "type": "string" }
4392                }
4393            }))
4394        }
4395
4396        async fn run(
4397            &self,
4398            state: String,
4399            _resources: &Self::Resources,
4400            _bus: &mut Bus,
4401        ) -> Outcome<String, Self::Error> {
4402            Outcome::Next(state)
4403        }
4404    }
4405
4406    #[test]
4407    fn then_auto_populates_input_schema_from_transition() {
4408        let axon = Axon::<String, String, String>::new("SchemaTest").then(SchemaTransition);
4409
4410        // The last node (added by .then()) should have input_schema
4411        let last_node = axon.schematic.nodes.last().unwrap();
4412        assert!(last_node.input_schema.is_some());
4413        let schema = last_node.input_schema.as_ref().unwrap();
4414        assert_eq!(schema["type"], "object");
4415        assert_eq!(schema["required"][0], "name");
4416    }
4417
4418    #[test]
4419    fn then_leaves_input_schema_none_when_not_provided() {
4420        let axon = Axon::<i32, i32, TestInfallible>::new("NoSchema").then(AddOne);
4421
4422        let last_node = axon.schematic.nodes.last().unwrap();
4423        assert!(last_node.input_schema.is_none());
4424    }
4425
4426    #[test]
4427    fn with_input_schema_value_sets_on_last_node() {
4428        let schema = serde_json::json!({"type": "integer"});
4429        let axon = Axon::<i32, i32, TestInfallible>::new("ManualSchema")
4430            .then(AddOne)
4431            .with_input_schema_value(schema.clone());
4432
4433        let last_node = axon.schematic.nodes.last().unwrap();
4434        assert_eq!(last_node.input_schema.as_ref().unwrap(), &schema);
4435    }
4436
4437    #[test]
4438    fn with_output_schema_value_sets_on_last_node() {
4439        let schema = serde_json::json!({"type": "integer"});
4440        let axon = Axon::<i32, i32, TestInfallible>::new("OutputSchema")
4441            .then(AddOne)
4442            .with_output_schema_value(schema.clone());
4443
4444        let last_node = axon.schematic.nodes.last().unwrap();
4445        assert_eq!(last_node.output_schema.as_ref().unwrap(), &schema);
4446    }
4447
4448    #[test]
4449    fn schematic_export_includes_schema_fields() {
4450        let axon = Axon::<String, String, String>::new("ExportTest")
4451            .then(SchemaTransition)
4452            .with_output_schema_value(serde_json::json!({"type": "string"}));
4453
4454        let json = serde_json::to_value(&axon.schematic).unwrap();
4455        let nodes = json["nodes"].as_array().unwrap();
4456        // Find the SchemaTransition node (last one)
4457        let last = nodes.last().unwrap();
4458        assert!(last.get("input_schema").is_some());
4459        assert_eq!(last["input_schema"]["type"], "object");
4460        assert_eq!(last["output_schema"]["type"], "string");
4461    }
4462
4463    #[test]
4464    fn schematic_export_omits_schema_fields_when_none() {
4465        let axon = Axon::<i32, i32, TestInfallible>::new("NoSchemaExport").then(AddOne);
4466
4467        let json = serde_json::to_value(&axon.schematic).unwrap();
4468        let nodes = json["nodes"].as_array().unwrap();
4469        let last = nodes.last().unwrap();
4470        let obj = last.as_object().unwrap();
4471        assert!(!obj.contains_key("input_schema"));
4472        assert!(!obj.contains_key("output_schema"));
4473    }
4474
4475    #[test]
4476    fn schematic_json_roundtrip_preserves_schemas() {
4477        let axon = Axon::<String, String, String>::new("Roundtrip")
4478            .then(SchemaTransition)
4479            .with_output_schema_value(serde_json::json!({"type": "string"}));
4480
4481        let json_str = serde_json::to_string(&axon.schematic).unwrap();
4482        let deserialized: ranvier_core::schematic::Schematic =
4483            serde_json::from_str(&json_str).unwrap();
4484
4485        let last = deserialized.nodes.last().unwrap();
4486        assert!(last.input_schema.is_some());
4487        assert!(last.output_schema.is_some());
4488        assert_eq!(last.input_schema.as_ref().unwrap()["required"][0], "name");
4489        assert_eq!(last.output_schema.as_ref().unwrap()["type"], "string");
4490    }
4491
4492    // Test transitions for new unit tests
4493    #[derive(Clone)]
4494    struct MultiplyByTwo;
4495
4496    #[async_trait]
4497    impl Transition<i32, i32> for MultiplyByTwo {
4498        type Error = TestInfallible;
4499        type Resources = ();
4500
4501        async fn run(
4502            &self,
4503            state: i32,
4504            _resources: &Self::Resources,
4505            _bus: &mut Bus,
4506        ) -> Outcome<i32, Self::Error> {
4507            Outcome::Next(state * 2)
4508        }
4509    }
4510
4511    #[derive(Clone)]
4512    struct AddTen;
4513
4514    #[async_trait]
4515    impl Transition<i32, i32> for AddTen {
4516        type Error = TestInfallible;
4517        type Resources = ();
4518
4519        async fn run(
4520            &self,
4521            state: i32,
4522            _resources: &Self::Resources,
4523            _bus: &mut Bus,
4524        ) -> Outcome<i32, Self::Error> {
4525            Outcome::Next(state + 10)
4526        }
4527    }
4528
4529    #[derive(Clone)]
4530    struct AddOneString;
4531
4532    #[async_trait]
4533    impl Transition<i32, i32> for AddOneString {
4534        type Error = String;
4535        type Resources = ();
4536
4537        async fn run(
4538            &self,
4539            state: i32,
4540            _resources: &Self::Resources,
4541            _bus: &mut Bus,
4542        ) -> Outcome<i32, Self::Error> {
4543            Outcome::Next(state + 1)
4544        }
4545    }
4546
4547    #[derive(Clone)]
4548    struct AddTenString;
4549
4550    #[async_trait]
4551    impl Transition<i32, i32> for AddTenString {
4552        type Error = String;
4553        type Resources = ();
4554
4555        async fn run(
4556            &self,
4557            state: i32,
4558            _resources: &Self::Resources,
4559            _bus: &mut Bus,
4560        ) -> Outcome<i32, Self::Error> {
4561            Outcome::Next(state + 10)
4562        }
4563    }
4564
4565    #[tokio::test]
4566    async fn axon_single_step_chain_executes_and_returns_next() {
4567        let mut bus = Bus::new();
4568        let axon = Axon::<i32, i32, TestInfallible>::start("SingleStep").then(AddOne);
4569
4570        let outcome = axon.execute(5, &(), &mut bus).await;
4571        assert!(matches!(outcome, Outcome::Next(6)));
4572    }
4573
4574    #[tokio::test]
4575    async fn axon_three_step_chain_executes_in_order() {
4576        let mut bus = Bus::new();
4577        let axon = Axon::<i32, i32, TestInfallible>::start("ThreeStep")
4578            .then(AddOne)
4579            .then(MultiplyByTwo)
4580            .then(AddTen);
4581
4582        // Starting with 5: AddOne -> 6, MultiplyByTwo -> 12, AddTen -> 22
4583        let outcome = axon.execute(5, &(), &mut bus).await;
4584        assert!(matches!(outcome, Outcome::Next(22)));
4585    }
4586
4587    #[tokio::test]
4588    async fn axon_with_fault_in_middle_step_propagates_error() {
4589        let mut bus = Bus::new();
4590
4591        // Create a 3-step chain where the middle step faults
4592        // Step 1: AddOneString (5 -> 6)
4593        // Step 2: AlwaysFault (should fault here)
4594        // Step 3: AddTenString (never reached)
4595        let axon = Axon::<i32, i32, String>::start("FaultInMiddle")
4596            .then(AddOneString)
4597            .then(AlwaysFault)
4598            .then(AddTenString);
4599
4600        let outcome = axon.execute(5, &(), &mut bus).await;
4601        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
4602    }
4603
4604    #[test]
4605    fn axon_schematic_has_correct_node_count_after_chaining() {
4606        let axon = Axon::<i32, i32, TestInfallible>::start("NodeCount")
4607            .then(AddOne)
4608            .then(MultiplyByTwo)
4609            .then(AddTen);
4610
4611        // Should have 4 nodes: ingress + 3 transitions
4612        assert_eq!(axon.schematic.nodes.len(), 4);
4613        assert_eq!(axon.schematic.name, "NodeCount");
4614    }
4615
4616    #[tokio::test]
4617    async fn axon_execution_records_timeline_events() {
4618        let mut bus = Bus::new();
4619        bus.insert(Timeline::new());
4620
4621        let axon = Axon::<i32, i32, TestInfallible>::start("TimelineTest")
4622            .then(AddOne)
4623            .then(MultiplyByTwo);
4624
4625        let outcome = axon.execute(3, &(), &mut bus).await;
4626        assert!(matches!(outcome, Outcome::Next(8))); // (3 + 1) * 2 = 8
4627
4628        let timeline = bus.read::<Timeline>().unwrap();
4629
4630        // Should have NodeEnter and NodeExit events
4631        let enter_count = timeline
4632            .events
4633            .iter()
4634            .filter(|e| matches!(e, TimelineEvent::NodeEnter { .. }))
4635            .count();
4636        let exit_count = timeline
4637            .events
4638            .iter()
4639            .filter(|e| matches!(e, TimelineEvent::NodeExit { .. }))
4640            .count();
4641
4642        // Expect at least 1 enter and 1 exit (ingress node)
4643        assert!(enter_count >= 1, "Should have at least 1 NodeEnter event");
4644        assert!(exit_count >= 1, "Should have at least 1 NodeExit event");
4645    }
4646
4647    // ── Parallel Step Tests (M231) ───────────────────────────────
4648
4649    #[tokio::test]
4650    async fn parallel_all_succeed_returns_first_next() {
4651        use super::ParallelStrategy;
4652
4653        let mut bus = Bus::new();
4654        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelAllSucceed")
4655            .parallel(
4656                vec![
4657                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4658                    Arc::new(MultiplyByTwo),
4659                ],
4660                ParallelStrategy::AllMustSucceed,
4661            );
4662
4663        // Input 5: AddOne -> 6, MultiplyByTwo -> 10.
4664        // AllMustSucceed returns the first Next (AddOne = 6).
4665        let outcome = axon.execute(5, &(), &mut bus).await;
4666        assert!(matches!(outcome, Outcome::Next(6)));
4667    }
4668
4669    #[tokio::test]
4670    async fn parallel_all_must_succeed_returns_fault_when_any_fails() {
4671        use super::ParallelStrategy;
4672
4673        let mut bus = Bus::new();
4674        let axon = Axon::<i32, i32, String>::start("ParallelAllFault")
4675            .parallel(
4676                vec![
4677                    Arc::new(AddOneString) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4678                    Arc::new(AlwaysFault),
4679                ],
4680                ParallelStrategy::AllMustSucceed,
4681            );
4682
4683        let outcome = axon.execute(5, &(), &mut bus).await;
4684        assert!(
4685            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4686            "Expected Fault(boom), got {:?}",
4687            outcome
4688        );
4689    }
4690
4691    #[tokio::test]
4692    async fn parallel_any_can_fail_returns_success_despite_fault() {
4693        use super::ParallelStrategy;
4694
4695        let mut bus = Bus::new();
4696        let axon = Axon::<i32, i32, String>::start("ParallelAnyCanFail")
4697            .parallel(
4698                vec![
4699                    Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4700                    Arc::new(AddOneString),
4701                ],
4702                ParallelStrategy::AnyCanFail,
4703            );
4704
4705        // AlwaysFault faults, but AddOneString succeeds (5 + 1 = 6).
4706        let outcome = axon.execute(5, &(), &mut bus).await;
4707        assert!(
4708            matches!(outcome, Outcome::Next(6)),
4709            "Expected Next(6), got {:?}",
4710            outcome
4711        );
4712    }
4713
4714    #[tokio::test]
4715    async fn parallel_any_can_fail_all_fault_returns_first_fault() {
4716        use super::ParallelStrategy;
4717
4718        #[derive(Clone)]
4719        struct AlwaysFault2;
4720        #[async_trait]
4721        impl Transition<i32, i32> for AlwaysFault2 {
4722            type Error = String;
4723            type Resources = ();
4724            async fn run(&self, _state: i32, _resources: &(), _bus: &mut Bus) -> Outcome<i32, String> {
4725                Outcome::Fault("boom2".to_string())
4726            }
4727        }
4728
4729        let mut bus = Bus::new();
4730        let axon = Axon::<i32, i32, String>::start("ParallelAllFault2")
4731            .parallel(
4732                vec![
4733                    Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4734                    Arc::new(AlwaysFault2),
4735                ],
4736                ParallelStrategy::AnyCanFail,
4737            );
4738
4739        let outcome = axon.execute(5, &(), &mut bus).await;
4740        // Should return the first fault
4741        assert!(
4742            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4743            "Expected Fault(boom), got {:?}",
4744            outcome
4745        );
4746    }
4747
4748    #[test]
4749    fn parallel_schematic_has_fanout_fanin_nodes() {
4750        use super::ParallelStrategy;
4751        use ranvier_core::schematic::{EdgeType, NodeKind};
4752
4753        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelSchematic")
4754            .parallel(
4755                vec![
4756                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4757                    Arc::new(MultiplyByTwo),
4758                    Arc::new(AddTen),
4759                ],
4760                ParallelStrategy::AllMustSucceed,
4761            );
4762
4763        // Should have: Ingress + FanOut + 3 branch Atoms + FanIn = 6 nodes
4764        assert_eq!(axon.schematic.nodes.len(), 6);
4765        assert!(matches!(axon.schematic.nodes[1].kind, NodeKind::FanOut));
4766        assert!(matches!(axon.schematic.nodes[2].kind, NodeKind::Atom));
4767        assert!(matches!(axon.schematic.nodes[3].kind, NodeKind::Atom));
4768        assert!(matches!(axon.schematic.nodes[4].kind, NodeKind::Atom));
4769        assert!(matches!(axon.schematic.nodes[5].kind, NodeKind::FanIn));
4770
4771        // Check FanOut description
4772        assert!(axon.schematic.nodes[1]
4773            .description
4774            .as_ref()
4775            .unwrap()
4776            .contains("3 branches"));
4777
4778        // Check parallel edges from FanOut to branches
4779        let parallel_edges: Vec<_> = axon
4780            .schematic
4781            .edges
4782            .iter()
4783            .filter(|e| matches!(e.kind, EdgeType::Parallel))
4784            .collect();
4785        // 3 from FanOut->branches + 3 from branches->FanIn = 6
4786        assert_eq!(parallel_edges.len(), 6);
4787    }
4788
4789    #[tokio::test]
4790    async fn parallel_then_chain_composes_correctly() {
4791        use super::ParallelStrategy;
4792
4793        let mut bus = Bus::new();
4794        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelThenChain")
4795            .then(AddOne)
4796            .parallel(
4797                vec![
4798                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4799                    Arc::new(MultiplyByTwo),
4800                ],
4801                ParallelStrategy::AllMustSucceed,
4802            )
4803            .then(AddTen);
4804
4805        // 5 -> AddOne -> 6 -> Parallel(AddOne=7, MultiplyByTwo=12) -> first=7 -> AddTen -> 17
4806        let outcome = axon.execute(5, &(), &mut bus).await;
4807        assert!(
4808            matches!(outcome, Outcome::Next(17)),
4809            "Expected Next(17), got {:?}",
4810            outcome
4811        );
4812    }
4813
4814    #[tokio::test]
4815    async fn parallel_records_timeline_events() {
4816        use super::ParallelStrategy;
4817        use ranvier_core::timeline::TimelineEvent;
4818
4819        let mut bus = Bus::new();
4820        bus.insert(Timeline::new());
4821
4822        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelTimeline")
4823            .parallel(
4824                vec![
4825                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4826                    Arc::new(MultiplyByTwo),
4827                ],
4828                ParallelStrategy::AllMustSucceed,
4829            );
4830
4831        let outcome = axon.execute(3, &(), &mut bus).await;
4832        assert!(matches!(outcome, Outcome::Next(4)));
4833
4834        let timeline = bus.read::<Timeline>().unwrap();
4835
4836        // Check for FanOut enter/exit and FanIn enter/exit
4837        let fanout_enters = timeline
4838            .events
4839            .iter()
4840            .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanOut"))
4841            .count();
4842        let fanin_enters = timeline
4843            .events
4844            .iter()
4845            .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanIn"))
4846            .count();
4847
4848        assert_eq!(fanout_enters, 1, "Should have 1 FanOut enter");
4849        assert_eq!(fanin_enters, 1, "Should have 1 FanIn enter");
4850    }
4851}