Skip to main content

ranvier_runtime/
axon.rs

1//! # Axon: Executable Decision Tree
2//!
3//! The `Axon` is the **runtime execution path** of a Typed Decision Tree.
4//! It functions as a reusable Typed Decision flow (Axon<In, Out, E>).
5//!
6//! ## Design Philosophy
7//!
8//! * **Axon flows, Schematic shows**: Axon executes; Schematic describes
9//! * **Builder pattern**: `Axon::start().then().then()`
10//! * **Schematic extraction**: Every Axon carries its structural metadata
11//!
12//! "Axon is the flowing thing, Schematic is the visible thing."
13
14use crate::persistence::{
15    CompensationAutoTrigger, CompensationContext, CompensationHandle,
16    CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17    PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use async_trait::async_trait;
20use ranvier_audit::{AuditEvent, AuditSink};
21use ranvier_core::bus::Bus;
22use ranvier_core::cluster::DistributedLock;
23use ranvier_core::event::{DlqPolicy, DlqSink};
24use ranvier_core::outcome::Outcome;
25use ranvier_core::policy::DynamicPolicy;
26use ranvier_core::saga::{SagaPolicy, SagaStack};
27use ranvier_core::schematic::{
28    BusCapabilitySchema, Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation,
29};
30use ranvier_core::timeline::{Timeline, TimelineEvent};
31use ranvier_core::transition::Transition;
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use std::any::type_name;
35use std::ffi::OsString;
36use std::fs;
37use std::future::Future;
38use std::panic::Location;
39use std::path::{Path, PathBuf};
40use std::pin::Pin;
41use std::sync::{Arc, Mutex, OnceLock};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tracing::Instrument;
44
45/// Configuration for Execution Mode.
46#[derive(Clone)]
47pub enum ExecutionMode {
48    /// Normal, unpartitioned local execution.
49    Local,
50    /// Singleton execution, ensures only one instance runs across the entire cluster.
51    Singleton {
52        lock_key: String,
53        ttl_ms: u64,
54        lock_provider: Arc<dyn DistributedLock>,
55    },
56}
57
58/// Strategy for parallel step execution.
59///
60/// Controls how the Axon handles faults during parallel branch execution.
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum ParallelStrategy {
63    /// All parallel steps must succeed; if any faults, return the first fault.
64    AllMustSucceed,
65    /// Continue even if some steps fail; return first successful result.
66    /// If all branches fault, returns the first fault.
67    AnyCanFail,
68}
69
70/// Type alias for async boxed futures used in Axon execution.
71pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
72
73/// Executor type for Axon steps.
74/// Now takes an input state `In`, a resource bundle `Res`, and returns an `Outcome<Out, E>`.
75/// Must be Send + Sync to be reusable across threads and clones.
76pub type Executor<In, Out, E, Res> =
77    Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
78
79/// Manual intervention jump command injected into the Bus.
80#[derive(Debug, Clone)]
81pub struct ManualJump {
82    pub target_node: String,
83    pub payload_override: Option<serde_json::Value>,
84}
85
86/// Start step index for resumption, injected into the Bus.
87#[derive(Debug, Clone, Copy)]
88struct StartStep(u64);
89
90/// Persisted state for resumption, injected into the Bus.
91#[derive(Debug, Clone)]
92struct ResumptionState {
93    payload: Option<serde_json::Value>,
94}
95
96/// Helper to extract a readable type name from a type.
97fn type_name_of<T: ?Sized>() -> String {
98    let full = type_name::<T>();
99    full.split("::").last().unwrap_or(full).to_string()
100}
101
102/// The Axon Builder and Runtime.
103///
104/// `Axon` represents an executable decision tree.
105/// It is reusable and thread-safe.
106///
107/// ## Example
108///
109/// ```rust,ignore
110/// use ranvier_core::prelude::*;
111/// // ...
112/// // Start with an identity Axon (In -> In)
113/// let axon = Axon::<(), (), _>::new("My Axon")
114///     .then(StepA)
115///     .then(StepB);
116///
117/// // Execute multiple times
118/// let res1 = axon.execute((), &mut bus1).await;
119/// let res2 = axon.execute((), &mut bus2).await;
120/// ```
121pub struct Axon<In, Out, E, Res = ()> {
122    /// The static structure (for visualization/analysis)
123    pub schematic: Schematic,
124    /// The runtime executor
125    executor: Executor<In, Out, E, Res>,
126    /// How this Axon is executed across the cluster
127    pub execution_mode: ExecutionMode,
128    /// Optional persistence store for state inspection
129    pub persistence_store: Option<Arc<dyn crate::persistence::PersistenceStore>>,
130    /// Optional audit sink for tamper-evident logging of interventions
131    pub audit_sink: Option<Arc<dyn AuditSink>>,
132    /// Optional dead-letter queue sink for storing failed events
133    pub dlq_sink: Option<Arc<dyn DlqSink>>,
134    /// Policy for handling event failures
135    pub dlq_policy: DlqPolicy,
136    /// Optional dynamic (hot-reloadable) DLQ policy — takes precedence over static `dlq_policy`
137    pub dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>,
138    /// Policy for automated saga compensation
139    pub saga_policy: SagaPolicy,
140    /// Optional dynamic (hot-reloadable) Saga policy — takes precedence over static `saga_policy`
141    pub dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>,
142    /// Registry for Saga compensation handlers
143    pub saga_compensation_registry:
144        Arc<std::sync::RwLock<ranvier_core::saga::SagaCompensationRegistry<E, Res>>>,
145    /// Optional IAM handle for identity verification at the Schematic boundary
146    pub iam_handle: Option<ranvier_core::iam::IamHandle>,
147}
148
149/// Schematic export request derived from command-line args/env.
150#[derive(Debug, Clone)]
151pub struct SchematicExportRequest {
152    /// Optional output file path. If omitted, schematic is written to stdout.
153    pub output: Option<PathBuf>,
154}
155
156impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
157    fn clone(&self) -> Self {
158        Self {
159            schematic: self.schematic.clone(),
160            executor: self.executor.clone(),
161            execution_mode: self.execution_mode.clone(),
162            persistence_store: self.persistence_store.clone(),
163            audit_sink: self.audit_sink.clone(),
164            dlq_sink: self.dlq_sink.clone(),
165            dlq_policy: self.dlq_policy.clone(),
166            dynamic_dlq_policy: self.dynamic_dlq_policy.clone(),
167            saga_policy: self.saga_policy.clone(),
168            dynamic_saga_policy: self.dynamic_saga_policy.clone(),
169            saga_compensation_registry: self.saga_compensation_registry.clone(),
170            iam_handle: self.iam_handle.clone(),
171        }
172    }
173}
174
175impl<In, E, Res> Axon<In, In, E, Res>
176where
177    In: Send + Sync + Serialize + DeserializeOwned + 'static,
178    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
179    Res: ranvier_core::transition::ResourceRequirement,
180{
181    /// Create a new Axon flow with the given label.
182    /// This is the preferred entry point per Flat API guidelines.
183    #[track_caller]
184    pub fn new(label: &str) -> Self {
185        let caller = Location::caller();
186        Self::start_with_source(label, caller)
187    }
188
189    /// Start defining a new Axon flow.
190    /// This creates an Identity Axon (In -> In) with no initial resource requirements.
191    #[track_caller]
192    pub fn start(label: &str) -> Self {
193        let caller = Location::caller();
194        Self::start_with_source(label, caller)
195    }
196
197    fn start_with_source(label: &str, caller: &'static Location<'static>) -> Self {
198        let node_id = uuid::Uuid::new_v4().to_string();
199        let node = Node {
200            id: node_id,
201            kind: NodeKind::Ingress,
202            label: label.to_string(),
203            description: None,
204            input_type: "void".to_string(),
205            output_type: type_name_of::<In>(),
206            resource_type: type_name_of::<Res>(),
207            metadata: Default::default(),
208            bus_capability: None,
209            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
210            position: None,
211            compensation_node_id: None,
212            input_schema: None,
213            output_schema: None,
214        };
215
216        let mut schematic = Schematic::new(label);
217        schematic.nodes.push(node);
218
219        let executor: Executor<In, In, E, Res> =
220            Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
221
222        Self {
223            schematic,
224            executor,
225            execution_mode: ExecutionMode::Local,
226            persistence_store: None,
227            audit_sink: None,
228            dlq_sink: None,
229            dlq_policy: DlqPolicy::default(),
230            dynamic_dlq_policy: None,
231            saga_policy: SagaPolicy::default(),
232            dynamic_saga_policy: None,
233            saga_compensation_registry: Arc::new(std::sync::RwLock::new(
234                ranvier_core::saga::SagaCompensationRegistry::new(),
235            )),
236            iam_handle: None,
237        }
238    }
239}
240
241impl Axon<(), (), (), ()> {
242    /// Convenience constructor for simple pipelines with no input state or resources.
243    ///
244    /// Reduces the common `Axon::<(), (), E>::new("label")` turbofish to
245    /// `Axon::simple::<E>("label")`, requiring only the error type parameter.
246    ///
247    /// # Example
248    ///
249    /// ```rust,ignore
250    /// // Before: 3 type parameters, 2 of which are always ()
251    /// let axon = Axon::<(), (), String>::new("pipeline");
252    ///
253    /// // After: only the error type
254    /// let axon = Axon::simple::<String>("pipeline");
255    /// ```
256    #[track_caller]
257    pub fn simple<E>(label: &str) -> Axon<(), (), E, ()>
258    where
259        E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
260    {
261        let caller = Location::caller();
262        <Axon<(), (), E, ()>>::start_with_source(label, caller)
263    }
264}
265
266impl<In, Out, E, Res> Axon<In, Out, E, Res>
267where
268    In: Send + Sync + Serialize + DeserializeOwned + 'static,
269    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
270    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
271    Res: ranvier_core::transition::ResourceRequirement,
272{
273    /// Update the Execution Mode for this Axon (e.g., Local vs Singleton).
274    pub fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
275        self.execution_mode = mode;
276        self
277    }
278
279    /// Set the schematic version for this Axon.
280    pub fn with_version(mut self, version: impl Into<String>) -> Self {
281        self.schematic.schema_version = version.into();
282        self
283    }
284
285    /// Attach a persistence store to enable state inspection via the Inspector.
286    pub fn with_persistence_store<S>(mut self, store: S) -> Self
287    where
288        S: crate::persistence::PersistenceStore + 'static,
289    {
290        self.persistence_store = Some(Arc::new(store));
291        self
292    }
293
294    /// Attach an audit sink for tamper-evident logging.
295    pub fn with_audit_sink<S>(mut self, sink: S) -> Self
296    where
297        S: AuditSink + 'static,
298    {
299        self.audit_sink = Some(Arc::new(sink));
300        self
301    }
302
303    /// Set the Dead Letter Queue sink for this Axon.
304    pub fn with_dlq_sink<S>(mut self, sink: S) -> Self
305    where
306        S: DlqSink + 'static,
307    {
308        self.dlq_sink = Some(Arc::new(sink));
309        self
310    }
311
312    /// Set the Dead Letter Queue policy for this Axon.
313    pub fn with_dlq_policy(mut self, policy: DlqPolicy) -> Self {
314        self.dlq_policy = policy;
315        self
316    }
317
318    /// Set the Saga compensation policy for this Axon.
319    pub fn with_saga_policy(mut self, policy: SagaPolicy) -> Self {
320        self.saga_policy = policy;
321        self
322    }
323
324    /// Set a dynamic (hot-reloadable) DLQ policy. When set, the dynamic policy's
325    /// current value is read at each execution, overriding the static `dlq_policy`.
326    pub fn with_dynamic_dlq_policy(mut self, dynamic: DynamicPolicy<DlqPolicy>) -> Self {
327        self.dynamic_dlq_policy = Some(dynamic);
328        self
329    }
330
331    /// Set a dynamic (hot-reloadable) Saga policy. When set, the dynamic policy's
332    /// current value is read at each execution, overriding the static `saga_policy`.
333    pub fn with_dynamic_saga_policy(mut self, dynamic: DynamicPolicy<SagaPolicy>) -> Self {
334        self.dynamic_saga_policy = Some(dynamic);
335        self
336    }
337
338    /// Set an IAM policy and verifier for identity verification at the Axon boundary.
339    ///
340    /// When set, `execute()` will:
341    /// 1. Read `IamToken` from the Bus (injected by the HTTP layer or test harness)
342    /// 2. Verify the token using the provided verifier
343    /// 3. Enforce the policy against the verified identity
344    /// 4. Insert the resulting `IamIdentity` into the Bus for downstream Transitions
345    pub fn with_iam(
346        mut self,
347        policy: ranvier_core::iam::IamPolicy,
348        verifier: impl ranvier_core::iam::IamVerifier + 'static,
349    ) -> Self {
350        self.iam_handle = Some(ranvier_core::iam::IamHandle::new(
351            policy,
352            Arc::new(verifier),
353        ));
354        self
355    }
356
357    /// Attach a JSON Schema for the **last node's input type** in the schematic.
358    ///
359    /// Requires the `schema` feature and `T: schemars::JsonSchema`.
360    ///
361    /// ```rust,ignore
362    /// let axon = Axon::new("My Circuit")
363    ///     .then(ProcessStep)
364    ///     .with_input_schema::<CreateUserRequest>()
365    ///     .with_output_schema::<UserResponse>();
366    /// ```
367    #[cfg(feature = "schema")]
368    pub fn with_input_schema<T>(mut self) -> Self
369    where
370        T: schemars::JsonSchema,
371    {
372        if let Some(last_node) = self.schematic.nodes.last_mut() {
373            let schema = schemars::schema_for!(T);
374            last_node.input_schema =
375                Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
376        }
377        self
378    }
379
380    /// Attach a JSON Schema for the **last node's output type** in the schematic.
381    ///
382    /// Requires the `schema` feature and `T: schemars::JsonSchema`.
383    #[cfg(feature = "schema")]
384    pub fn with_output_schema<T>(mut self) -> Self
385    where
386        T: schemars::JsonSchema,
387    {
388        if let Some(last_node) = self.schematic.nodes.last_mut() {
389            let schema = schemars::schema_for!(T);
390            last_node.output_schema =
391                Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
392        }
393        self
394    }
395
396    /// Attach a raw JSON Schema value for the **last node's input type** in the schematic.
397    ///
398    /// Use this for pre-built schemas without requiring the `schema` feature.
399    pub fn with_input_schema_value(mut self, schema: serde_json::Value) -> Self {
400        if let Some(last_node) = self.schematic.nodes.last_mut() {
401            last_node.input_schema = Some(schema);
402        }
403        self
404    }
405
406    /// Attach a raw JSON Schema value for the **last node's output type** in the schematic.
407    pub fn with_output_schema_value(mut self, schema: serde_json::Value) -> Self {
408        if let Some(last_node) = self.schematic.nodes.last_mut() {
409            last_node.output_schema = Some(schema);
410        }
411        self
412    }
413}
414
415#[async_trait]
416impl<In, Out, E, Res> ranvier_inspector::StateInspector for Axon<In, Out, E, Res>
417where
418    In: Send + Sync + Serialize + DeserializeOwned + 'static,
419    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
420    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
421    Res: ranvier_core::transition::ResourceRequirement,
422{
423    async fn get_state(&self, trace_id: &str) -> Option<serde_json::Value> {
424        let store = self.persistence_store.as_ref()?;
425        let trace = store.load(trace_id).await.ok().flatten()?;
426        Some(serde_json::to_value(trace).unwrap_or(serde_json::Value::Null))
427    }
428
429    async fn force_resume(
430        &self,
431        trace_id: &str,
432        target_node: &str,
433        payload_override: Option<Value>,
434    ) -> Result<(), String> {
435        let store = self
436            .persistence_store
437            .as_ref()
438            .ok_or("No persistence store attached to Axon")?;
439
440        let intervention = crate::persistence::Intervention {
441            target_node: target_node.to_string(),
442            payload_override,
443            timestamp_ms: now_ms(),
444        };
445
446        store
447            .save_intervention(trace_id, intervention)
448            .await
449            .map_err(|e| format!("Failed to save intervention: {}", e))?;
450
451        if let Some(sink) = self.audit_sink.as_ref() {
452            let event = AuditEvent::new(
453                uuid::Uuid::new_v4().to_string(),
454                "Inspector".to_string(),
455                "ForceResume".to_string(),
456                trace_id.to_string(),
457            )
458            .with_metadata("target_node", target_node);
459
460            let _ = sink.append(&event).await;
461        }
462
463        tracing::info!(trace_id = %trace_id, target_node = %target_node, "Force resume requested via Inspector");
464        Ok(())
465    }
466}
467
468impl<In, Out, E, Res> Axon<In, Out, E, Res>
469where
470    In: Send + Sync + Serialize + DeserializeOwned + 'static,
471    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
472    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
473    Res: ranvier_core::transition::ResourceRequirement,
474{
475    /// Chain a transition to this Axon.
476    ///
477    /// Requires the transition to use the SAME resource bundle as the previous steps.
478    #[track_caller]
479    pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
480    where
481        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
482        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
483    {
484        let caller = Location::caller();
485        // Decompose self to avoid partial move issues
486        let Axon {
487            mut schematic,
488            executor: prev_executor,
489            execution_mode,
490            persistence_store,
491            audit_sink,
492            dlq_sink,
493            dlq_policy,
494            dynamic_dlq_policy,
495            saga_policy,
496            dynamic_saga_policy,
497            saga_compensation_registry,
498            iam_handle,
499        } = self;
500
501        // Update Schematic
502        let next_node_id = uuid::Uuid::new_v4().to_string();
503        let next_node = Node {
504            id: next_node_id.clone(),
505            kind: NodeKind::Atom,
506            label: transition.label(),
507            description: transition.description(),
508            input_type: type_name_of::<Out>(),
509            output_type: type_name_of::<Next>(),
510            resource_type: type_name_of::<Res>(),
511            metadata: Default::default(),
512            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
513            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
514            position: transition
515                .position()
516                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
517            compensation_node_id: None,
518            input_schema: transition.input_schema(),
519            output_schema: None,
520        };
521
522        let last_node_id = schematic
523            .nodes
524            .last()
525            .map(|n| n.id.clone())
526            .unwrap_or_default();
527
528        schematic.nodes.push(next_node);
529        schematic.edges.push(Edge {
530            from: last_node_id,
531            to: next_node_id.clone(),
532            kind: EdgeType::Linear,
533            label: Some("Next".to_string()),
534        });
535
536        // Compose Executor
537        let node_id_for_exec = next_node_id.clone();
538        let node_label_for_exec = transition.label();
539        let bus_policy_for_exec = transition.bus_access_policy();
540        let bus_policy_clone = bus_policy_for_exec.clone();
541        let current_step_idx = schematic.nodes.len() as u64 - 1;
542        let next_executor: Executor<In, Next, E, Res> = Arc::new(
543            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
544                let prev = prev_executor.clone();
545                let trans = transition.clone();
546                let timeline_node_id = node_id_for_exec.clone();
547                let timeline_node_label = node_label_for_exec.clone();
548                let transition_bus_policy = bus_policy_clone.clone();
549                let step_idx = current_step_idx;
550
551                Box::pin(async move {
552                    // Check for manual intervention jump
553                    if let Some(jump) = bus.read::<ManualJump>()
554                        && (jump.target_node == timeline_node_id
555                            || jump.target_node == timeline_node_label)
556                    {
557                        tracing::info!(
558                            node_id = %timeline_node_id,
559                            node_label = %timeline_node_label,
560                            "Manual jump target reached; skipping previous steps"
561                        );
562
563                        let state = if let Some(ow) = jump.payload_override.clone() {
564                            match serde_json::from_value::<Out>(ow) {
565                                Ok(s) => s,
566                                Err(e) => {
567                                    tracing::error!(
568                                        "Payload override deserialization failed: {}",
569                                        e
570                                    );
571                                    return Outcome::emit(
572                                        "execution.jump.payload_error",
573                                        Some(serde_json::json!({"error": e.to_string()})),
574                                    )
575                                    .map(|_: ()| unreachable!());
576                                }
577                            }
578                        } else {
579                            // Default back to the provided input if this is an identity jump or types match
580                            // For now, treat missing payload on a mid-flow jump as an avoidable error if Possible.
581                            // In a better implementation, we'd try to load the last persisted Out for the previous step.
582                            return Outcome::emit(
583                                "execution.jump.missing_payload",
584                                Some(serde_json::json!({"node_id": timeline_node_id})),
585                            );
586                        };
587
588                        // Skip prev() and continue with trans.run(state, ...)
589                        return run_this_step::<Out, Next, E, Res>(
590                            &trans,
591                            state,
592                            res,
593                            bus,
594                            &timeline_node_id,
595                            &timeline_node_label,
596                            &transition_bus_policy,
597                            step_idx,
598                        )
599                        .await;
600                    }
601
602                    // Handle resumption skip
603                    if let Some(start) = bus.read::<StartStep>()
604                        && step_idx == start.0
605                        && bus.read::<ResumptionState>().is_some()
606                    {
607                        // Prefer fresh input (In → Out via JSON round-trip).
608                        // The caller provides updated state (e.g., corrected data after a fault).
609                        // Falls back to persisted checkpoint state when types are incompatible.
610                        let fresh_state = serde_json::to_value(&input)
611                            .ok()
612                            .and_then(|v| serde_json::from_value::<Out>(v).ok());
613                        let persisted_state = bus
614                            .read::<ResumptionState>()
615                            .and_then(|r| r.payload.clone())
616                            .and_then(|p| serde_json::from_value::<Out>(p).ok());
617
618                        if let Some(s) = fresh_state.or(persisted_state) {
619                            tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint");
620                            return run_this_step::<Out, Next, E, Res>(
621                                &trans,
622                                s,
623                                res,
624                                bus,
625                                &timeline_node_id,
626                                &timeline_node_label,
627                                &transition_bus_policy,
628                                step_idx,
629                            )
630                            .await;
631                        }
632
633                        return Outcome::emit(
634                            "execution.resumption.payload_error",
635                            Some(serde_json::json!({"error": "no compatible resumption state"})),
636                        )
637                        .map(|_: ()| unreachable!());
638                    }
639
640                    // Run previous step
641                    let prev_result = prev(input, res, bus).await;
642
643                    // Unpack
644                    let state = match prev_result {
645                        Outcome::Next(t) => t,
646                        other => return other.map(|_| unreachable!()),
647                    };
648
649                    run_this_step::<Out, Next, E, Res>(
650                        &trans,
651                        state,
652                        res,
653                        bus,
654                        &timeline_node_id,
655                        &timeline_node_label,
656                        &transition_bus_policy,
657                        step_idx,
658                    )
659                    .await
660                })
661            },
662        );
663        Axon {
664            schematic,
665            executor: next_executor,
666            execution_mode,
667            persistence_store,
668            audit_sink,
669            dlq_sink,
670            dlq_policy,
671            dynamic_dlq_policy,
672            saga_policy,
673            dynamic_saga_policy,
674            saga_compensation_registry,
675            iam_handle,
676        }
677    }
678
679    /// Chain a transition with a retry policy.
680    ///
681    /// If the transition returns `Outcome::Fault`, it will be retried up to
682    /// `policy.max_retries` times with the configured backoff strategy.
683    /// Timeline events are recorded for each retry attempt.
684    ///
685    /// # Example
686    ///
687    /// ```rust,ignore
688    /// use ranvier_runtime::{Axon, RetryPolicy};
689    /// use std::time::Duration;
690    ///
691    /// let axon = Axon::new("pipeline")
692    ///     .then_with_retry(my_transition, RetryPolicy::fixed(3, Duration::from_millis(100)));
693    /// ```
694    #[track_caller]
695    pub fn then_with_retry<Next, Trans>(
696        self,
697        transition: Trans,
698        policy: crate::retry::RetryPolicy,
699    ) -> Axon<In, Next, E, Res>
700    where
701        Out: Clone,
702        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
703        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
704    {
705        let caller = Location::caller();
706        let Axon {
707            mut schematic,
708            executor: prev_executor,
709            execution_mode,
710            persistence_store,
711            audit_sink,
712            dlq_sink,
713            dlq_policy,
714            dynamic_dlq_policy,
715            saga_policy,
716            dynamic_saga_policy,
717            saga_compensation_registry,
718            iam_handle,
719        } = self;
720
721        let next_node_id = uuid::Uuid::new_v4().to_string();
722        let next_node = Node {
723            id: next_node_id.clone(),
724            kind: NodeKind::Atom,
725            label: transition.label(),
726            description: transition.description(),
727            input_type: type_name_of::<Out>(),
728            output_type: type_name_of::<Next>(),
729            resource_type: type_name_of::<Res>(),
730            metadata: Default::default(),
731            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
732            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
733            position: transition
734                .position()
735                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
736            compensation_node_id: None,
737            input_schema: transition.input_schema(),
738            output_schema: None,
739        };
740
741        let last_node_id = schematic
742            .nodes
743            .last()
744            .map(|n| n.id.clone())
745            .unwrap_or_default();
746
747        schematic.nodes.push(next_node);
748        schematic.edges.push(Edge {
749            from: last_node_id,
750            to: next_node_id.clone(),
751            kind: EdgeType::Linear,
752            label: Some("Next (retryable)".to_string()),
753        });
754
755        let node_id_for_exec = next_node_id.clone();
756        let node_label_for_exec = transition.label();
757        let bus_policy_for_exec = transition.bus_access_policy();
758        let bus_policy_clone = bus_policy_for_exec.clone();
759        let current_step_idx = schematic.nodes.len() as u64 - 1;
760        let next_executor: Executor<In, Next, E, Res> = Arc::new(
761            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
762                let prev = prev_executor.clone();
763                let trans = transition.clone();
764                let timeline_node_id = node_id_for_exec.clone();
765                let timeline_node_label = node_label_for_exec.clone();
766                let transition_bus_policy = bus_policy_clone.clone();
767                let step_idx = current_step_idx;
768                let retry_policy = policy.clone();
769
770                Box::pin(async move {
771                    // Run previous step
772                    let prev_result = prev(input, res, bus).await;
773                    let state = match prev_result {
774                        Outcome::Next(t) => t,
775                        other => return other.map(|_| unreachable!()),
776                    };
777
778                    // Attempt with retries
779                    let mut last_result = None;
780                    for attempt in 0..=retry_policy.max_retries {
781                        let attempt_state = state.clone();
782
783                        let result = run_this_step::<Out, Next, E, Res>(
784                            &trans,
785                            attempt_state,
786                            res,
787                            bus,
788                            &timeline_node_id,
789                            &timeline_node_label,
790                            &transition_bus_policy,
791                            step_idx,
792                        )
793                        .await;
794
795                        match &result {
796                            Outcome::Next(_) => return result,
797                            Outcome::Fault(_) if attempt < retry_policy.max_retries => {
798                                let delay = retry_policy.delay_for_attempt(attempt);
799                                tracing::warn!(
800                                    node_id = %timeline_node_id,
801                                    attempt = attempt + 1,
802                                    max = retry_policy.max_retries,
803                                    delay_ms = delay.as_millis() as u64,
804                                    "Transition failed, retrying"
805                                );
806                                if let Some(timeline) = bus.read_mut::<Timeline>() {
807                                    timeline.push(TimelineEvent::NodeRetry {
808                                        node_id: timeline_node_id.clone(),
809                                        attempt: attempt + 1,
810                                        max_attempts: retry_policy.max_retries,
811                                        backoff_ms: delay.as_millis() as u64,
812                                        timestamp: now_ms(),
813                                    });
814                                }
815                                tokio::time::sleep(delay).await;
816                            }
817                            _ => {
818                                last_result = Some(result);
819                                break;
820                            }
821                        }
822                    }
823
824                    last_result.unwrap_or_else(|| {
825                        Outcome::emit("execution.retry.exhausted", None)
826                    })
827                })
828            },
829        );
830        Axon {
831            schematic,
832            executor: next_executor,
833            execution_mode,
834            persistence_store,
835            audit_sink,
836            dlq_sink,
837            dlq_policy,
838            dynamic_dlq_policy,
839            saga_policy,
840            dynamic_saga_policy,
841            saga_compensation_registry,
842            iam_handle,
843        }
844    }
845
846    /// Chain a transition to this Axon with a Saga compensation step.
847    ///
848    /// If the transition fails, the compensation transition will be executed
849    /// automatically if `CompensationAutoTrigger` is enabled in the Bus.
850    #[track_caller]
851    pub fn then_compensated<Next, Trans, Comp>(
852        self,
853        transition: Trans,
854        compensation: Comp,
855    ) -> Axon<In, Next, E, Res>
856    where
857        Out: Clone,
858        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
859        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
860        Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
861    {
862        let caller = Location::caller();
863        let Axon {
864            mut schematic,
865            executor: prev_executor,
866            execution_mode,
867            persistence_store,
868            audit_sink,
869            dlq_sink,
870            dlq_policy,
871            dynamic_dlq_policy,
872            saga_policy,
873            dynamic_saga_policy,
874            saga_compensation_registry,
875            iam_handle,
876        } = self;
877
878        // 1. Add Primary Node
879        let next_node_id = uuid::Uuid::new_v4().to_string();
880        let comp_node_id = uuid::Uuid::new_v4().to_string();
881
882        let next_node = Node {
883            id: next_node_id.clone(),
884            kind: NodeKind::Atom,
885            label: transition.label(),
886            description: transition.description(),
887            input_type: type_name_of::<Out>(),
888            output_type: type_name_of::<Next>(),
889            resource_type: type_name_of::<Res>(),
890            metadata: Default::default(),
891            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
892            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
893            position: transition
894                .position()
895                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
896            compensation_node_id: Some(comp_node_id.clone()),
897            input_schema: None,
898            output_schema: None,
899        };
900
901        // 2. Add Compensation Node
902        let comp_node = Node {
903            id: comp_node_id.clone(),
904            kind: NodeKind::Atom,
905            label: format!("Compensate: {}", compensation.label()),
906            description: compensation.description(),
907            input_type: type_name_of::<Out>(),
908            output_type: "void".to_string(),
909            resource_type: type_name_of::<Res>(),
910            metadata: Default::default(),
911            bus_capability: None,
912            source_location: None,
913            position: compensation
914                .position()
915                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
916            compensation_node_id: None,
917            input_schema: None,
918            output_schema: None,
919        };
920
921        let last_node_id = schematic
922            .nodes
923            .last()
924            .map(|n| n.id.clone())
925            .unwrap_or_default();
926
927        schematic.nodes.push(next_node);
928        schematic.nodes.push(comp_node);
929        schematic.edges.push(Edge {
930            from: last_node_id,
931            to: next_node_id.clone(),
932            kind: EdgeType::Linear,
933            label: Some("Next".to_string()),
934        });
935
936        // 3. Compose Executor with Compensation Logic
937        let node_id_for_exec = next_node_id.clone();
938        let comp_id_for_exec = comp_node_id.clone();
939        let node_label_for_exec = transition.label();
940        let bus_policy_for_exec = transition.bus_access_policy();
941        let step_idx_for_exec = schematic.nodes.len() as u64 - 2;
942        let comp_for_exec = compensation.clone();
943        let bus_policy_for_executor = bus_policy_for_exec.clone();
944        let bus_policy_for_registry = bus_policy_for_exec.clone();
945        let next_executor: Executor<In, Next, E, Res> = Arc::new(
946            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
947                let prev = prev_executor.clone();
948                let trans = transition.clone();
949                let comp = comp_for_exec.clone();
950                let timeline_node_id = node_id_for_exec.clone();
951                let timeline_comp_id = comp_id_for_exec.clone();
952                let timeline_node_label = node_label_for_exec.clone();
953                let transition_bus_policy = bus_policy_for_executor.clone();
954                let step_idx = step_idx_for_exec;
955
956                Box::pin(async move {
957                    // Check for manual intervention jump
958                    if let Some(jump) = bus.read::<ManualJump>()
959                        && (jump.target_node == timeline_node_id
960                            || jump.target_node == timeline_node_label)
961                    {
962                        tracing::info!(
963                            node_id = %timeline_node_id,
964                            node_label = %timeline_node_label,
965                            "Manual jump target reached (compensated); skipping previous steps"
966                        );
967
968                        let state = if let Some(ow) = jump.payload_override.clone() {
969                            match serde_json::from_value::<Out>(ow) {
970                                Ok(s) => s,
971                                Err(e) => {
972                                    tracing::error!(
973                                        "Payload override deserialization failed: {}",
974                                        e
975                                    );
976                                    return Outcome::emit(
977                                        "execution.jump.payload_error",
978                                        Some(serde_json::json!({"error": e.to_string()})),
979                                    )
980                                    .map(|_: ()| unreachable!());
981                                }
982                            }
983                        } else {
984                            return Outcome::emit(
985                                "execution.jump.missing_payload",
986                                Some(serde_json::json!({"node_id": timeline_node_id})),
987                            )
988                            .map(|_: ()| unreachable!());
989                        };
990
991                        // Skip prev() and continue with trans.run(state, ...)
992                        return run_this_compensated_step::<Out, Next, E, Res, Comp>(
993                            &trans,
994                            &comp,
995                            state,
996                            res,
997                            bus,
998                            &timeline_node_id,
999                            &timeline_comp_id,
1000                            &timeline_node_label,
1001                            &transition_bus_policy,
1002                            step_idx,
1003                        )
1004                        .await;
1005                    }
1006
1007                    // Handle resumption skip
1008                    if let Some(start) = bus.read::<StartStep>()
1009                        && step_idx == start.0
1010                        && bus.read::<ResumptionState>().is_some()
1011                    {
1012                        let fresh_state = serde_json::to_value(&input)
1013                            .ok()
1014                            .and_then(|v| serde_json::from_value::<Out>(v).ok());
1015                        let persisted_state = bus
1016                            .read::<ResumptionState>()
1017                            .and_then(|r| r.payload.clone())
1018                            .and_then(|p| serde_json::from_value::<Out>(p).ok());
1019
1020                        if let Some(s) = fresh_state.or(persisted_state) {
1021                            tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint (compensated)");
1022                            return run_this_compensated_step::<Out, Next, E, Res, Comp>(
1023                                &trans,
1024                                &comp,
1025                                s,
1026                                res,
1027                                bus,
1028                                &timeline_node_id,
1029                                &timeline_comp_id,
1030                                &timeline_node_label,
1031                                &transition_bus_policy,
1032                                step_idx,
1033                            )
1034                            .await;
1035                        }
1036
1037                        return Outcome::emit(
1038                            "execution.resumption.payload_error",
1039                            Some(serde_json::json!({"error": "no compatible resumption state"})),
1040                        )
1041                        .map(|_: ()| unreachable!());
1042                    }
1043
1044                    // Run previous step
1045                    let prev_result = prev(input, res, bus).await;
1046
1047                    // Unpack
1048                    let state = match prev_result {
1049                        Outcome::Next(t) => t,
1050                        other => return other.map(|_| unreachable!()),
1051                    };
1052
1053                    run_this_compensated_step::<Out, Next, E, Res, Comp>(
1054                        &trans,
1055                        &comp,
1056                        state,
1057                        res,
1058                        bus,
1059                        &timeline_node_id,
1060                        &timeline_comp_id,
1061                        &timeline_node_label,
1062                        &transition_bus_policy,
1063                        step_idx,
1064                    )
1065                    .await
1066                })
1067            },
1068        );
1069        // 4. Register Saga Compensation if enabled
1070        {
1071            let mut registry = saga_compensation_registry.write().expect("saga compensation registry lock poisoned");
1072            let comp_fn = compensation.clone();
1073            let transition_bus_policy = bus_policy_for_registry.clone();
1074
1075            let handler: ranvier_core::saga::SagaCompensationFn<E, Res> =
1076                Arc::new(move |input_data, res, bus| {
1077                    let comp = comp_fn.clone();
1078                    let bus_policy = transition_bus_policy.clone();
1079                    Box::pin(async move {
1080                        let input: Out = serde_json::from_slice(&input_data).expect("saga compensation input deserialization failed — type mismatch between snapshot and compensation handler");
1081                        bus.set_access_policy(comp.label(), bus_policy);
1082                        let res = comp.run(input, res, bus).await;
1083                        bus.clear_access_policy();
1084                        res
1085                    })
1086                });
1087            registry.register(next_node_id.clone(), handler);
1088        }
1089
1090        Axon {
1091            schematic,
1092            executor: next_executor,
1093            execution_mode,
1094            persistence_store,
1095            audit_sink,
1096            dlq_sink,
1097            dlq_policy,
1098            dynamic_dlq_policy,
1099            saga_policy,
1100            dynamic_saga_policy,
1101            saga_compensation_registry,
1102            iam_handle,
1103        }
1104    }
1105
1106    /// Attach a compensation transition to the previously added node.
1107    /// This establishes a Schematic-level Saga compensation mapping.
1108    #[track_caller]
1109    pub fn compensate_with<Comp>(mut self, transition: Comp) -> Self
1110    where
1111        Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1112    {
1113        // NOTE: This currently only updates the Schematic.
1114        // For runtime compensation behavior, use `then_compensated`.
1115        let caller = Location::caller();
1116        let comp_node_id = uuid::Uuid::new_v4().to_string();
1117
1118        let comp_node = Node {
1119            id: comp_node_id.clone(),
1120            kind: NodeKind::Atom,
1121            label: transition.label(),
1122            description: transition.description(),
1123            input_type: type_name_of::<Out>(),
1124            output_type: "void".to_string(),
1125            resource_type: type_name_of::<Res>(),
1126            metadata: Default::default(),
1127            bus_capability: None,
1128            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1129            position: transition
1130                .position()
1131                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1132            compensation_node_id: None,
1133            input_schema: None,
1134            output_schema: None,
1135        };
1136
1137        if let Some(last_node) = self.schematic.nodes.last_mut() {
1138            last_node.compensation_node_id = Some(comp_node_id.clone());
1139        }
1140
1141        self.schematic.nodes.push(comp_node);
1142        self
1143    }
1144
1145    /// Add a branch point
1146    #[track_caller]
1147    pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
1148        let caller = Location::caller();
1149        let branch_id_str = branch_id.into();
1150        let last_node_id = self
1151            .schematic
1152            .nodes
1153            .last()
1154            .map(|n| n.id.clone())
1155            .unwrap_or_default();
1156
1157        let branch_node = Node {
1158            id: uuid::Uuid::new_v4().to_string(),
1159            kind: NodeKind::Synapse,
1160            label: label.to_string(),
1161            description: None,
1162            input_type: type_name_of::<Out>(),
1163            output_type: type_name_of::<Out>(),
1164            resource_type: type_name_of::<Res>(),
1165            metadata: Default::default(),
1166            bus_capability: None,
1167            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1168            position: None,
1169            compensation_node_id: None,
1170            input_schema: None,
1171            output_schema: None,
1172        };
1173
1174        self.schematic.nodes.push(branch_node);
1175        self.schematic.edges.push(Edge {
1176            from: last_node_id,
1177            to: branch_id_str.clone(),
1178            kind: EdgeType::Branch(branch_id_str),
1179            label: Some("Branch".to_string()),
1180        });
1181
1182        self
1183    }
1184
1185    /// Run multiple transitions in parallel (fan-out / fan-in).
1186    ///
1187    /// Each transition receives a clone of the current step's output and runs
1188    /// concurrently via [`futures_util::future::join_all`]. The strategy controls
1189    /// how faults are handled:
1190    ///
1191    /// - [`ParallelStrategy::AllMustSucceed`]: All branches must produce `Next`.
1192    ///   If any branch returns `Fault`, the first fault is propagated.
1193    /// - [`ParallelStrategy::AnyCanFail`]: Branches that fault are ignored as
1194    ///   long as at least one succeeds. If all branches fault, the first fault
1195    ///   is returned.
1196    ///
1197    /// The **first successful `Next` value** is forwarded to the next step in
1198    /// the pipeline. A custom merge can be layered on top via a subsequent
1199    /// `.then()` step.
1200    ///
1201    /// Each parallel branch receives its own fresh [`Bus`] instance. Resources
1202    /// should be injected via the shared `Res` bundle rather than the Bus for
1203    /// parallel steps.
1204    ///
1205    /// ## Schematic
1206    ///
1207    /// The method emits a `FanOut` node, one `Atom` node per branch (connected
1208    /// via `Parallel` edges), and a `FanIn` join node.
1209    ///
1210    /// ## Example
1211    ///
1212    /// ```rust,ignore
1213    /// let axon = Axon::new("Pipeline")
1214    ///     .then(ParseInput)
1215    ///     .parallel(
1216    ///         vec![Arc::new(EnrichA), Arc::new(EnrichB)],
1217    ///         ParallelStrategy::AllMustSucceed,
1218    ///     )
1219    ///     .then(MergeResults);
1220    /// ```
1221    #[track_caller]
1222    pub fn parallel(
1223        self,
1224        transitions: Vec<Arc<dyn Transition<Out, Out, Resources = Res, Error = E> + Send + Sync>>,
1225        strategy: ParallelStrategy,
1226    ) -> Axon<In, Out, E, Res>
1227    where
1228        Out: Clone,
1229    {
1230        let caller = Location::caller();
1231        let Axon {
1232            mut schematic,
1233            executor: prev_executor,
1234            execution_mode,
1235            persistence_store,
1236            audit_sink,
1237            dlq_sink,
1238            dlq_policy,
1239            dynamic_dlq_policy,
1240            saga_policy,
1241            dynamic_saga_policy,
1242            saga_compensation_registry,
1243            iam_handle,
1244        } = self;
1245
1246        // ── Schematic: FanOut node ─────────────────────────────────
1247        let fanout_id = uuid::Uuid::new_v4().to_string();
1248        let fanin_id = uuid::Uuid::new_v4().to_string();
1249
1250        let last_node_id = schematic
1251            .nodes
1252            .last()
1253            .map(|n| n.id.clone())
1254            .unwrap_or_default();
1255
1256        let fanout_node = Node {
1257            id: fanout_id.clone(),
1258            kind: NodeKind::FanOut,
1259            label: "FanOut".to_string(),
1260            description: Some(format!(
1261                "Parallel split ({} branches, {:?})",
1262                transitions.len(),
1263                strategy
1264            )),
1265            input_type: type_name_of::<Out>(),
1266            output_type: type_name_of::<Out>(),
1267            resource_type: type_name_of::<Res>(),
1268            metadata: Default::default(),
1269            bus_capability: None,
1270            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1271            position: None,
1272            compensation_node_id: None,
1273            input_schema: None,
1274            output_schema: None,
1275        };
1276
1277        schematic.nodes.push(fanout_node);
1278        schematic.edges.push(Edge {
1279            from: last_node_id,
1280            to: fanout_id.clone(),
1281            kind: EdgeType::Linear,
1282            label: Some("Next".to_string()),
1283        });
1284
1285        // ── Schematic: one Atom node per parallel branch ───────────
1286        let mut branch_node_ids = Vec::with_capacity(transitions.len());
1287        for (i, trans) in transitions.iter().enumerate() {
1288            let branch_id = uuid::Uuid::new_v4().to_string();
1289            let branch_node = Node {
1290                id: branch_id.clone(),
1291                kind: NodeKind::Atom,
1292                label: trans.label(),
1293                description: trans.description(),
1294                input_type: type_name_of::<Out>(),
1295                output_type: type_name_of::<Out>(),
1296                resource_type: type_name_of::<Res>(),
1297                metadata: Default::default(),
1298                bus_capability: bus_capability_schema_from_policy(trans.bus_access_policy()),
1299                source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1300                position: None,
1301                compensation_node_id: None,
1302                input_schema: trans.input_schema(),
1303                output_schema: None,
1304            };
1305            schematic.nodes.push(branch_node);
1306            schematic.edges.push(Edge {
1307                from: fanout_id.clone(),
1308                to: branch_id.clone(),
1309                kind: EdgeType::Parallel,
1310                label: Some(format!("Branch {}", i)),
1311            });
1312            branch_node_ids.push(branch_id);
1313        }
1314
1315        // ── Schematic: FanIn node ──────────────────────────────────
1316        let fanin_node = Node {
1317            id: fanin_id.clone(),
1318            kind: NodeKind::FanIn,
1319            label: "FanIn".to_string(),
1320            description: Some(format!("Parallel join ({:?})", strategy)),
1321            input_type: type_name_of::<Out>(),
1322            output_type: type_name_of::<Out>(),
1323            resource_type: type_name_of::<Res>(),
1324            metadata: Default::default(),
1325            bus_capability: None,
1326            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1327            position: None,
1328            compensation_node_id: None,
1329            input_schema: None,
1330            output_schema: None,
1331        };
1332
1333        schematic.nodes.push(fanin_node);
1334        for branch_id in &branch_node_ids {
1335            schematic.edges.push(Edge {
1336                from: branch_id.clone(),
1337                to: fanin_id.clone(),
1338                kind: EdgeType::Parallel,
1339                label: Some("Join".to_string()),
1340            });
1341        }
1342
1343        // ── Executor: parallel composition ─────────────────────────
1344        let fanout_node_id = fanout_id.clone();
1345        let fanin_node_id = fanin_id.clone();
1346        let branch_ids_for_exec = branch_node_ids.clone();
1347        let step_idx = schematic.nodes.len() as u64 - 1;
1348
1349        let next_executor: Executor<In, Out, E, Res> = Arc::new(
1350            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Out, E>> {
1351                let prev = prev_executor.clone();
1352                let branches = transitions.clone();
1353                let fanout_id = fanout_node_id.clone();
1354                let fanin_id = fanin_node_id.clone();
1355                let branch_ids = branch_ids_for_exec.clone();
1356
1357                Box::pin(async move {
1358                    // Run previous steps
1359                    let prev_result = prev(input, res, bus).await;
1360                    let state = match prev_result {
1361                        Outcome::Next(t) => t,
1362                        other => return other.map(|_| unreachable!()),
1363                    };
1364
1365                    // Timeline: FanOut enter
1366                    let fanout_enter_ts = now_ms();
1367                    if let Some(timeline) = bus.read_mut::<Timeline>() {
1368                        timeline.push(TimelineEvent::NodeEnter {
1369                            node_id: fanout_id.clone(),
1370                            node_label: "FanOut".to_string(),
1371                            timestamp: fanout_enter_ts,
1372                        });
1373                    }
1374
1375                    // Build futures for all branches.
1376                    // Each branch gets its own Bus so they can run concurrently
1377                    // without &mut Bus aliasing. Resources are shared via &Res.
1378                    let futs: Vec<_> = branches
1379                        .iter()
1380                        .enumerate()
1381                        .map(|(i, trans)| {
1382                            let branch_state = state.clone();
1383                            let branch_node_id = branch_ids[i].clone();
1384                            let trans = trans.clone();
1385
1386                            async move {
1387                                let mut branch_bus = Bus::new();
1388                                let label = trans.label();
1389                                let bus_policy = trans.bus_access_policy();
1390
1391                                branch_bus.set_access_policy(label.clone(), bus_policy);
1392                                let result = trans.run(branch_state, res, &mut branch_bus).await;
1393                                branch_bus.clear_access_policy();
1394
1395                                (i, branch_node_id, label, result)
1396                            }
1397                        })
1398                        .collect();
1399
1400                    // Run all branches concurrently within the current task
1401                    let results: Vec<(usize, String, String, Outcome<Out, E>)> =
1402                        futures_util::future::join_all(futs).await;
1403
1404                    // Timeline: record each branch's enter/exit
1405                    for (_, branch_node_id, branch_label, outcome) in &results {
1406                        if let Some(timeline) = bus.read_mut::<Timeline>() {
1407                            let ts = now_ms();
1408                            timeline.push(TimelineEvent::NodeEnter {
1409                                node_id: branch_node_id.clone(),
1410                                node_label: branch_label.clone(),
1411                                timestamp: ts,
1412                            });
1413                            timeline.push(TimelineEvent::NodeExit {
1414                                node_id: branch_node_id.clone(),
1415                                outcome_type: outcome_type_name(outcome),
1416                                duration_ms: 0,
1417                                timestamp: ts,
1418                            });
1419                        }
1420                    }
1421
1422                    // Timeline: FanOut exit
1423                    if let Some(timeline) = bus.read_mut::<Timeline>() {
1424                        timeline.push(TimelineEvent::NodeExit {
1425                            node_id: fanout_id.clone(),
1426                            outcome_type: "Next".to_string(),
1427                            duration_ms: 0,
1428                            timestamp: now_ms(),
1429                        });
1430                    }
1431
1432                    // ── Apply strategy ─────────────────────────────
1433                    let combined = match strategy {
1434                        ParallelStrategy::AllMustSucceed => {
1435                            let mut first_fault = None;
1436                            let mut first_success = None;
1437
1438                            for (_, _, _, outcome) in results {
1439                                match outcome {
1440                                    Outcome::Fault(e) => {
1441                                        if first_fault.is_none() {
1442                                            first_fault = Some(Outcome::Fault(e));
1443                                        }
1444                                    }
1445                                    Outcome::Next(val) => {
1446                                        if first_success.is_none() {
1447                                            first_success = Some(Outcome::Next(val));
1448                                        }
1449                                    }
1450                                    other => {
1451                                        // Non-Next/non-Fault outcomes (Branch, Emit, Jump)
1452                                        // treated as non-success in AllMustSucceed
1453                                        if first_fault.is_none() {
1454                                            first_fault = Some(other);
1455                                        }
1456                                    }
1457                                }
1458                            }
1459
1460                            if let Some(fault) = first_fault {
1461                                fault
1462                            } else {
1463                                first_success.unwrap_or_else(|| {
1464                                    Outcome::emit("execution.parallel.no_results", None)
1465                                })
1466                            }
1467                        }
1468                        ParallelStrategy::AnyCanFail => {
1469                            let mut first_success = None;
1470                            let mut first_fault = None;
1471
1472                            for (_, _, _, outcome) in results {
1473                                match outcome {
1474                                    Outcome::Next(val) => {
1475                                        if first_success.is_none() {
1476                                            first_success = Some(Outcome::Next(val));
1477                                        }
1478                                    }
1479                                    Outcome::Fault(e) => {
1480                                        if first_fault.is_none() {
1481                                            first_fault = Some(Outcome::Fault(e));
1482                                        }
1483                                    }
1484                                    _ => {}
1485                                }
1486                            }
1487
1488                            first_success.unwrap_or_else(|| {
1489                                first_fault.unwrap_or_else(|| {
1490                                    Outcome::emit("execution.parallel.no_results", None)
1491                                })
1492                            })
1493                        }
1494                    };
1495
1496                    // Timeline: FanIn
1497                    let fanin_enter_ts = now_ms();
1498                    if let Some(timeline) = bus.read_mut::<Timeline>() {
1499                        timeline.push(TimelineEvent::NodeEnter {
1500                            node_id: fanin_id.clone(),
1501                            node_label: "FanIn".to_string(),
1502                            timestamp: fanin_enter_ts,
1503                        });
1504                        timeline.push(TimelineEvent::NodeExit {
1505                            node_id: fanin_id.clone(),
1506                            outcome_type: outcome_type_name(&combined),
1507                            duration_ms: 0,
1508                            timestamp: fanin_enter_ts,
1509                        });
1510                    }
1511
1512                    // Persistence
1513                    if let Some(handle) = bus.read::<PersistenceHandle>() {
1514                        let trace_id = persistence_trace_id(bus);
1515                        let circuit = bus
1516                            .read::<ranvier_core::schematic::Schematic>()
1517                            .map(|s| s.name.clone())
1518                            .unwrap_or_default();
1519                        let version = bus
1520                            .read::<ranvier_core::schematic::Schematic>()
1521                            .map(|s| s.schema_version.clone())
1522                            .unwrap_or_default();
1523
1524                        persist_execution_event(
1525                            handle,
1526                            &trace_id,
1527                            &circuit,
1528                            &version,
1529                            step_idx,
1530                            Some(fanin_id.clone()),
1531                            outcome_kind_name(&combined),
1532                            Some(combined.to_json_value()),
1533                        )
1534                        .await;
1535                    }
1536
1537                    combined
1538                })
1539            },
1540        );
1541
1542        Axon {
1543            schematic,
1544            executor: next_executor,
1545            execution_mode,
1546            persistence_store,
1547            audit_sink,
1548            dlq_sink,
1549            dlq_policy,
1550            dynamic_dlq_policy,
1551            saga_policy,
1552            dynamic_saga_policy,
1553            saga_compensation_registry,
1554            iam_handle,
1555        }
1556    }
1557
1558    /// Execute the Axon with the given input and resources.
1559    pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
1560        if let ExecutionMode::Singleton {
1561            lock_key,
1562            ttl_ms,
1563            lock_provider,
1564        } = &self.execution_mode
1565        {
1566            let trace_span = tracing::info_span!("Singleton Execution", key = %lock_key);
1567            let _enter = trace_span.enter();
1568            match lock_provider.try_acquire(lock_key, *ttl_ms).await {
1569                Ok(true) => {
1570                    tracing::debug!("Successfully acquired singleton lock: {}", lock_key);
1571                }
1572                Ok(false) => {
1573                    tracing::debug!(
1574                        "Singleton lock {} already held, aborting execution.",
1575                        lock_key
1576                    );
1577                    // Emit a specific event indicating skip
1578                    return Outcome::emit(
1579                        "execution.skipped.lock_held",
1580                        Some(serde_json::json!({
1581                            "lock_key": lock_key
1582                        })),
1583                    );
1584                }
1585                Err(e) => {
1586                    tracing::error!("Failed to check singleton lock {}: {:?}", lock_key, e);
1587                    return Outcome::emit(
1588                        "execution.skipped.lock_error",
1589                        Some(serde_json::json!({
1590                            "error": e.to_string()
1591                        })),
1592                    );
1593                }
1594            }
1595        }
1596
1597        // ── IAM Boundary Check ────────────────────────────────────
1598        if let Some(iam) = &self.iam_handle {
1599            use ranvier_core::iam::{IamPolicy, IamToken, enforce_policy};
1600
1601            if matches!(iam.policy, IamPolicy::None) {
1602                // No verification required — skip
1603            } else {
1604                let token = bus.read::<IamToken>().map(|t| t.0.clone());
1605
1606                match token {
1607                    Some(raw_token) => {
1608                        match iam.verifier.verify(&raw_token).await {
1609                            Ok(identity) => {
1610                                if let Err(e) = enforce_policy(&iam.policy, &identity) {
1611                                    tracing::warn!(
1612                                        policy = ?iam.policy,
1613                                        subject = %identity.subject,
1614                                        "IAM policy enforcement failed: {}",
1615                                        e
1616                                    );
1617                                    return Outcome::emit(
1618                                        "iam.policy_denied",
1619                                        Some(serde_json::json!({
1620                                            "error": e.to_string(),
1621                                            "subject": identity.subject,
1622                                        })),
1623                                    );
1624                                }
1625                                // Insert verified identity into Bus for downstream access
1626                                bus.insert(identity);
1627                            }
1628                            Err(e) => {
1629                                tracing::warn!("IAM token verification failed: {}", e);
1630                                return Outcome::emit(
1631                                    "iam.verification_failed",
1632                                    Some(serde_json::json!({
1633                                        "error": e.to_string()
1634                                    })),
1635                                );
1636                            }
1637                        }
1638                    }
1639                    None => {
1640                        tracing::warn!("IAM policy requires token but none found in Bus");
1641                        return Outcome::emit("iam.missing_token", None);
1642                    }
1643                }
1644            }
1645        }
1646
1647        let trace_id = persistence_trace_id(bus);
1648        let label = self.schematic.name.clone();
1649
1650        // Inject DLQ config into Bus for step-level reporting
1651        if let Some(sink) = &self.dlq_sink {
1652            bus.insert(sink.clone());
1653        }
1654        // Prefer dynamic (hot-reloadable) policy if available, otherwise use static
1655        let effective_dlq_policy = self
1656            .dynamic_dlq_policy
1657            .as_ref()
1658            .map(|d| d.current())
1659            .unwrap_or_else(|| self.dlq_policy.clone());
1660        bus.insert(effective_dlq_policy);
1661        bus.insert(self.schematic.clone());
1662        let effective_saga_policy = self
1663            .dynamic_saga_policy
1664            .as_ref()
1665            .map(|d| d.current())
1666            .unwrap_or_else(|| self.saga_policy.clone());
1667        bus.insert(effective_saga_policy.clone());
1668
1669        // Initialize Saga stack if enabled and not already present (e.g. from resumption)
1670        if effective_saga_policy == SagaPolicy::Enabled && bus.read::<SagaStack>().is_none() {
1671            bus.insert(SagaStack::new());
1672        }
1673
1674        let persistence_handle = bus.read::<PersistenceHandle>().cloned();
1675        let compensation_handle = bus.read::<CompensationHandle>().cloned();
1676        let compensation_retry_policy = compensation_retry_policy(bus);
1677        let compensation_idempotency = bus.read::<CompensationIdempotencyHandle>().cloned();
1678        let version = self.schematic.schema_version.clone();
1679        let migration_registry = bus
1680            .read::<ranvier_core::schematic::MigrationRegistry>()
1681            .cloned();
1682
1683        let persistence_start_step = if let Some(handle) = persistence_handle.as_ref() {
1684            let (mut start_step, trace_version, intervention, last_node_id, mut last_payload) =
1685                load_persistence_version(handle, &trace_id).await;
1686
1687            if let Some(interv) = intervention {
1688                tracing::info!(
1689                    trace_id = %trace_id,
1690                    target_node = %interv.target_node,
1691                    "Applying manual intervention command"
1692                );
1693
1694                // Find the step index for the target node
1695                if let Some(target_idx) = self
1696                    .schematic
1697                    .nodes
1698                    .iter()
1699                    .position(|n| n.id == interv.target_node || n.label == interv.target_node)
1700                {
1701                    tracing::info!(
1702                        trace_id = %trace_id,
1703                        target_node = %interv.target_node,
1704                        target_step = target_idx,
1705                        "Intervention: Jumping to target node"
1706                    );
1707                    start_step = target_idx as u64;
1708
1709                    // Inject ManualJump into the bus so executors can handle skipping/payload overrides
1710                    bus.insert(ManualJump {
1711                        target_node: interv.target_node.clone(),
1712                        payload_override: interv.payload_override.clone(),
1713                    });
1714
1715                    // Log audit event for intervention application
1716                    if let Some(sink) = self.audit_sink.as_ref() {
1717                        let event = AuditEvent::new(
1718                            uuid::Uuid::new_v4().to_string(),
1719                            "System".to_string(),
1720                            "ApplyIntervention".to_string(),
1721                            trace_id.to_string(),
1722                        )
1723                        .with_metadata("target_node", interv.target_node.clone())
1724                        .with_metadata("target_step", target_idx);
1725
1726                        let _ = sink.append(&event).await;
1727                    }
1728                } else {
1729                    tracing::warn!(
1730                        trace_id = %trace_id,
1731                        target_node = %interv.target_node,
1732                        "Intervention target node not found in schematic; ignoring jump"
1733                    );
1734                }
1735            }
1736
1737            if let Some(old_version) = trace_version
1738                && old_version != version
1739            {
1740                tracing::info!(
1741                    trace_id = %trace_id,
1742                    old_version = %old_version,
1743                    current_version = %version,
1744                    "Version mismatch detected during resumption"
1745                );
1746
1747                // Try multi-hop migration path first, fall back to direct lookup
1748                let migration_path = migration_registry
1749                    .as_ref()
1750                    .and_then(|r| r.find_migration_path(&old_version, &version));
1751
1752                let (final_migration, mapped_payload) = if let Some(path) = migration_path {
1753                    if path.is_empty() {
1754                        (None, last_payload.clone())
1755                    } else {
1756                        // Apply payload mappers along the migration chain
1757                        let mut payload = last_payload.clone();
1758                        for hop in &path {
1759                            if let (Some(mapper), Some(p)) = (&hop.payload_mapper, payload.as_ref())
1760                            {
1761                                match mapper.map_state(p) {
1762                                    Ok(mapped) => payload = Some(mapped),
1763                                    Err(e) => {
1764                                        tracing::error!(
1765                                            trace_id = %trace_id,
1766                                            from = %hop.from_version,
1767                                            to = %hop.to_version,
1768                                            error = %e,
1769                                            "Payload migration mapper failed"
1770                                        );
1771                                        return Outcome::emit(
1772                                            "execution.resumption.payload_migration_failed",
1773                                            Some(serde_json::json!({
1774                                                "trace_id": trace_id,
1775                                                "from": hop.from_version,
1776                                                "to": hop.to_version,
1777                                                "error": e.to_string()
1778                                            })),
1779                                        );
1780                                    }
1781                                }
1782                            }
1783                        }
1784                        let hops: Vec<String> = path
1785                            .iter()
1786                            .map(|h| format!("{}->{}", h.from_version, h.to_version))
1787                            .collect();
1788                        tracing::info!(trace_id = %trace_id, hops = ?hops, "Applied multi-hop migration path");
1789                        (path.last().copied(), payload)
1790                    }
1791                } else {
1792                    (None, last_payload.clone())
1793                };
1794
1795                // Use the final migration in the path to determine strategy
1796                let migration = final_migration.or_else(|| {
1797                    migration_registry
1798                        .as_ref()
1799                        .and_then(|r| r.find_migration(&old_version, &version))
1800                });
1801
1802                // Update last_payload with mapped version
1803                if mapped_payload.is_some() {
1804                    last_payload = mapped_payload;
1805                }
1806
1807                let strategy = if let (Some(m), Some(node_id)) = (migration, last_node_id.as_ref())
1808                {
1809                    m.node_mapping
1810                        .get(node_id)
1811                        .cloned()
1812                        .unwrap_or(m.default_strategy.clone())
1813                } else {
1814                    migration
1815                        .map(|m| m.default_strategy.clone())
1816                        .unwrap_or(ranvier_core::schematic::MigrationStrategy::Fail)
1817                };
1818
1819                match strategy {
1820                    ranvier_core::schematic::MigrationStrategy::ResumeFromStart => {
1821                        tracing::info!(trace_id = %trace_id, "Applying ResumeFromStart migration strategy");
1822                        start_step = 0;
1823                    }
1824                    ranvier_core::schematic::MigrationStrategy::MigrateActiveNode {
1825                        new_node_id,
1826                        ..
1827                    } => {
1828                        tracing::info!(trace_id = %trace_id, to_node = %new_node_id, "Applying MigrateActiveNode strategy");
1829                        if let Some(target_idx) = self
1830                            .schematic
1831                            .nodes
1832                            .iter()
1833                            .position(|n| n.id == new_node_id || n.label == new_node_id)
1834                        {
1835                            start_step = target_idx as u64;
1836                        } else {
1837                            tracing::warn!(trace_id = %trace_id, "MigrateActiveNode: target node {} not found", new_node_id);
1838                            return Outcome::emit(
1839                                "execution.resumption.migration_target_not_found",
1840                                Some(serde_json::json!({ "node_id": new_node_id })),
1841                            );
1842                        }
1843                    }
1844                    ranvier_core::schematic::MigrationStrategy::FallbackToNode(node_id) => {
1845                        tracing::info!(trace_id = %trace_id, to_node = %node_id, "Applying FallbackToNode strategy");
1846                        if let Some(target_idx) = self
1847                            .schematic
1848                            .nodes
1849                            .iter()
1850                            .position(|n| n.id == node_id || n.label == node_id)
1851                        {
1852                            start_step = target_idx as u64;
1853                        } else {
1854                            tracing::warn!(trace_id = %trace_id, "FallbackToNode: node {} not found", node_id);
1855                            return Outcome::emit(
1856                                "execution.resumption.migration_target_not_found",
1857                                Some(serde_json::json!({ "node_id": node_id })),
1858                            );
1859                        }
1860                    }
1861                    ranvier_core::schematic::MigrationStrategy::Fail => {
1862                        tracing::error!(trace_id = %trace_id, "Version mismatch: no migration path found. Failing resumption.");
1863                        return Outcome::emit(
1864                            "execution.resumption.version_mismatch_failed",
1865                            Some(serde_json::json!({
1866                                "trace_id": trace_id,
1867                                "old_version": old_version,
1868                                "current_version": version
1869                            })),
1870                        );
1871                    }
1872                    _ => {
1873                        tracing::error!(trace_id = %trace_id, "Unsupported migration strategy: {:?}", strategy);
1874                        return Outcome::emit(
1875                            "execution.resumption.unsupported_migration",
1876                            Some(serde_json::json!({
1877                                "trace_id": trace_id,
1878                                "strategy": format!("{:?}", strategy)
1879                            })),
1880                        );
1881                    }
1882                }
1883            }
1884
1885            let ingress_node_id = self.schematic.nodes.first().map(|n| n.id.clone());
1886            persist_execution_event(
1887                handle,
1888                &trace_id,
1889                &label,
1890                &version,
1891                start_step,
1892                ingress_node_id,
1893                "Enter",
1894                None,
1895            )
1896            .await;
1897
1898            bus.insert(StartStep(start_step));
1899            if start_step > 0 {
1900                bus.insert(ResumptionState {
1901                    payload: last_payload,
1902                });
1903            }
1904
1905            Some(start_step)
1906        } else {
1907            None
1908        };
1909
1910        let should_capture = should_attach_timeline(bus);
1911        let inserted_timeline = if should_capture {
1912            ensure_timeline(bus)
1913        } else {
1914            false
1915        };
1916        let ingress_started = std::time::Instant::now();
1917        let ingress_enter_ts = now_ms();
1918        if should_capture
1919            && let (Some(timeline), Some(ingress)) =
1920                (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1921        {
1922            timeline.push(TimelineEvent::NodeEnter {
1923                node_id: ingress.id.clone(),
1924                node_label: ingress.label.clone(),
1925                timestamp: ingress_enter_ts,
1926            });
1927        }
1928
1929        let circuit_span = tracing::info_span!(
1930            "Circuit",
1931            ranvier.circuit = %label,
1932            ranvier.outcome_kind = tracing::field::Empty,
1933            ranvier.outcome_target = tracing::field::Empty
1934        );
1935        let outcome = (self.executor)(input, resources, bus)
1936            .instrument(circuit_span.clone())
1937            .await;
1938        circuit_span.record("ranvier.outcome_kind", outcome_kind_name(&outcome));
1939        if let Some(target) = outcome_target(&outcome) {
1940            circuit_span.record("ranvier.outcome_target", tracing::field::display(&target));
1941        }
1942
1943        // Automated Saga Rollback (LIFO)
1944        if matches!(outcome, Outcome::Fault(_)) && self.saga_policy == SagaPolicy::Enabled {
1945            while let Some(task) = {
1946                let mut stack = bus.read_mut::<SagaStack>();
1947                stack.as_mut().and_then(|s| s.pop())
1948            } {
1949                tracing::info!(trace_id = %trace_id, node_id = %task.node_id, "Compensating step: {}", task.node_label);
1950
1951                let handler = {
1952                    let registry = self.saga_compensation_registry.read().expect("saga compensation registry lock poisoned");
1953                    registry.get(&task.node_id)
1954                };
1955                if let Some(handler) = handler {
1956                    let res = handler(task.input_snapshot, resources, bus).await;
1957                    if let Outcome::Fault(e) = res {
1958                        tracing::error!(trace_id = %trace_id, node_id = %task.node_id, "Saga compensation FAILED: {:?}", e);
1959                    }
1960                } else {
1961                    tracing::warn!(trace_id = %trace_id, node_id = %task.node_id, "No compensation handler found in registry for saga rollback");
1962                }
1963            }
1964            tracing::info!(trace_id = %trace_id, "Saga automated rollback completed");
1965        }
1966
1967        let ingress_exit_ts = now_ms();
1968        if should_capture
1969            && let (Some(timeline), Some(ingress)) =
1970                (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1971        {
1972            timeline.push(TimelineEvent::NodeExit {
1973                node_id: ingress.id.clone(),
1974                outcome_type: outcome_type_name(&outcome),
1975                duration_ms: ingress_started.elapsed().as_millis() as u64,
1976                timestamp: ingress_exit_ts,
1977            });
1978        }
1979
1980        if let Some(handle) = persistence_handle.as_ref() {
1981            let fault_step = persistence_start_step.map(|s| s + 1).unwrap_or(1);
1982            persist_execution_event(
1983                handle,
1984                &trace_id,
1985                &label,
1986                &version,
1987                fault_step,
1988                None, // Outcome-level events might not have a single node_id context here
1989                outcome_kind_name(&outcome),
1990                Some(outcome.to_json_value()),
1991            )
1992            .await;
1993
1994            let mut completion = completion_from_outcome(&outcome);
1995            if matches!(outcome, Outcome::Fault(_))
1996                && let Some(compensation) = compensation_handle.as_ref()
1997                && compensation_auto_trigger(bus)
1998            {
1999                let context = CompensationContext {
2000                    trace_id: trace_id.clone(),
2001                    circuit: label.clone(),
2002                    fault_kind: outcome_kind_name(&outcome).to_string(),
2003                    fault_step,
2004                    timestamp_ms: now_ms(),
2005                };
2006
2007                if run_compensation(
2008                    compensation,
2009                    context,
2010                    compensation_retry_policy,
2011                    compensation_idempotency.clone(),
2012                )
2013                .await
2014                {
2015                    persist_execution_event(
2016                        handle,
2017                        &trace_id,
2018                        &label,
2019                        &version,
2020                        fault_step.saturating_add(1),
2021                        None,
2022                        "Compensated",
2023                        None,
2024                    )
2025                    .await;
2026                    completion = CompletionState::Compensated;
2027                }
2028            }
2029
2030            if persistence_auto_complete(bus) {
2031                persist_completion(handle, &trace_id, completion).await;
2032            }
2033        }
2034
2035        if should_capture {
2036            maybe_export_timeline(bus, &outcome);
2037        }
2038        if inserted_timeline {
2039            let _ = bus.remove::<Timeline>();
2040        }
2041
2042        outcome
2043    }
2044
2045    /// Starts the Ranvier Inspector for this Axon on the specified port.
2046    /// This spawns a background task to serve the Schematic.
2047    pub fn serve_inspector(self, port: u16) -> Self {
2048        if !inspector_dev_mode_from_env() {
2049            tracing::info!("Inspector disabled because RANVIER_MODE is production");
2050            return self;
2051        }
2052        if !inspector_enabled_from_env() {
2053            tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
2054            return self;
2055        }
2056
2057        let schematic = self.schematic.clone();
2058        let axon_inspector = Arc::new(self.clone());
2059        tokio::spawn(async move {
2060            if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
2061                .with_projection_files_from_env()
2062                .with_mode_from_env()
2063                .with_auth_policy_from_env()
2064                .with_state_inspector(axon_inspector)
2065                .serve()
2066                .await
2067            {
2068                tracing::error!("Inspector server failed: {}", e);
2069            }
2070        });
2071        self
2072    }
2073
2074    /// Get a reference to the Schematic (structural view).
2075    pub fn schematic(&self) -> &Schematic {
2076        &self.schematic
2077    }
2078
2079    /// Consume and return the Schematic.
2080    pub fn into_schematic(self) -> Schematic {
2081        self.schematic
2082    }
2083
2084    /// Detect schematic export mode from runtime flags.
2085    ///
2086    /// Supported triggers:
2087    /// - `RANVIER_SCHEMATIC=1|true|on|yes`
2088    /// - `--schematic`
2089    ///
2090    /// Optional output path:
2091    /// - `RANVIER_SCHEMATIC_OUTPUT=<path>`
2092    /// - `--schematic-output <path>` / `--schematic-output=<path>`
2093    /// - `--output <path>` / `--output=<path>` (only relevant in schematic mode)
2094    pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
2095        schematic_export_request_from_process()
2096    }
2097
2098    /// Export schematic and return `true` when schematic mode is active.
2099    ///
2100    /// Use this once after circuit construction and before server/custom loops:
2101    ///
2102    /// ```rust,ignore
2103    /// let axon = build_axon();
2104    /// if axon.maybe_export_and_exit()? {
2105    ///     return Ok(());
2106    /// }
2107    /// // Normal runtime path...
2108    /// ```
2109    pub fn maybe_export_and_exit(&self) -> anyhow::Result<bool> {
2110        self.maybe_export_and_exit_with(|_| ())
2111    }
2112
2113    /// Same as [`Self::maybe_export_and_exit`] but allows a custom hook right before export/exit.
2114    ///
2115    /// This is useful when your app has custom loop/bootstrap behavior and you want
2116    /// to skip or cleanup that logic in schematic mode.
2117    pub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> anyhow::Result<bool>
2118    where
2119        F: FnOnce(&SchematicExportRequest),
2120    {
2121        let Some(request) = self.schematic_export_request() else {
2122            return Ok(false);
2123        };
2124        on_before_exit(&request);
2125        self.export_schematic(&request)?;
2126        Ok(true)
2127    }
2128
2129    /// Export schematic according to the provided request.
2130    pub fn export_schematic(&self, request: &SchematicExportRequest) -> anyhow::Result<()> {
2131        let json = serde_json::to_string_pretty(self.schematic())?;
2132        if let Some(path) = &request.output {
2133            if let Some(parent) = path.parent()
2134                && !parent.as_os_str().is_empty()
2135            {
2136                fs::create_dir_all(parent)?;
2137            }
2138            fs::write(path, json.as_bytes())?;
2139            return Ok(());
2140        }
2141        println!("{}", json);
2142        Ok(())
2143    }
2144}
2145
2146fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
2147    let args: Vec<OsString> = std::env::args_os().skip(1).collect();
2148    let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
2149    let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
2150
2151    let mut i = 0;
2152    while i < args.len() {
2153        let arg = args[i].to_string_lossy();
2154
2155        if arg == "--schematic" {
2156            enabled = true;
2157            i += 1;
2158            continue;
2159        }
2160
2161        if arg == "--schematic-output" || arg == "--output" {
2162            if let Some(next) = args.get(i + 1) {
2163                output = Some(PathBuf::from(next));
2164                i += 2;
2165                continue;
2166            }
2167        } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
2168            output = Some(PathBuf::from(value));
2169        } else if let Some(value) = arg.strip_prefix("--output=") {
2170            output = Some(PathBuf::from(value));
2171        }
2172
2173        i += 1;
2174    }
2175
2176    if enabled {
2177        Some(SchematicExportRequest { output })
2178    } else {
2179        None
2180    }
2181}
2182
2183fn env_flag_is_true(key: &str) -> bool {
2184    match std::env::var(key) {
2185        Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
2186        Err(_) => false,
2187    }
2188}
2189
2190fn inspector_enabled_from_env() -> bool {
2191    let raw = std::env::var("RANVIER_INSPECTOR").ok();
2192    inspector_enabled_from_value(raw.as_deref())
2193}
2194
2195fn inspector_enabled_from_value(value: Option<&str>) -> bool {
2196    match value {
2197        Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
2198        None => true,
2199    }
2200}
2201
2202fn inspector_dev_mode_from_env() -> bool {
2203    let raw = std::env::var("RANVIER_MODE").ok();
2204    inspector_dev_mode_from_value(raw.as_deref())
2205}
2206
2207fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
2208    !matches!(
2209        value.map(|v| v.to_ascii_lowercase()),
2210        Some(mode) if mode == "prod" || mode == "production"
2211    )
2212}
2213
2214fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
2215    let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
2216        Ok(v) if !v.trim().is_empty() => v,
2217        _ => return,
2218    };
2219
2220    let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
2221    let policy = timeline_adaptive_policy();
2222    let forced = should_force_export(outcome, &policy);
2223    let should_export = sampled || forced;
2224    if !should_export {
2225        record_sampling_stats(false, sampled, forced, "none", &policy);
2226        return;
2227    }
2228
2229    let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
2230    timeline.sort();
2231
2232    let mode = std::env::var("RANVIER_TIMELINE_MODE")
2233        .unwrap_or_else(|_| "overwrite".to_string())
2234        .to_ascii_lowercase();
2235
2236    if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
2237        tracing::warn!(
2238            "Failed to persist timeline file {} (mode={}): {}",
2239            path,
2240            mode,
2241            err
2242        );
2243        record_sampling_stats(false, sampled, forced, &mode, &policy);
2244    } else {
2245        record_sampling_stats(true, sampled, forced, &mode, &policy);
2246    }
2247}
2248
2249fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
2250    match outcome {
2251        Outcome::Next(_) => "Next".to_string(),
2252        Outcome::Branch(id, _) => format!("Branch:{}", id),
2253        Outcome::Jump(id, _) => format!("Jump:{}", id),
2254        Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
2255        Outcome::Fault(_) => "Fault".to_string(),
2256    }
2257}
2258
2259fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
2260    match outcome {
2261        Outcome::Next(_) => "Next",
2262        Outcome::Branch(_, _) => "Branch",
2263        Outcome::Jump(_, _) => "Jump",
2264        Outcome::Emit(_, _) => "Emit",
2265        Outcome::Fault(_) => "Fault",
2266    }
2267}
2268
2269fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
2270    match outcome {
2271        Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
2272        Outcome::Jump(node_id, _) => Some(node_id.to_string()),
2273        Outcome::Emit(event_type, _) => Some(event_type.clone()),
2274        Outcome::Next(_) | Outcome::Fault(_) => None,
2275    }
2276}
2277
2278fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
2279    match outcome {
2280        Outcome::Fault(_) => CompletionState::Fault,
2281        _ => CompletionState::Success,
2282    }
2283}
2284
2285fn persistence_trace_id(bus: &Bus) -> String {
2286    if let Some(explicit) = bus.read::<PersistenceTraceId>() {
2287        explicit.0.clone()
2288    } else {
2289        format!("{}:{}", bus.id, now_ms())
2290    }
2291}
2292
2293fn persistence_auto_complete(bus: &Bus) -> bool {
2294    bus.read::<PersistenceAutoComplete>()
2295        .map(|v| v.0)
2296        .unwrap_or(true)
2297}
2298
2299fn compensation_auto_trigger(bus: &Bus) -> bool {
2300    bus.read::<CompensationAutoTrigger>()
2301        .map(|v| v.0)
2302        .unwrap_or(true)
2303}
2304
2305fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
2306    bus.read::<CompensationRetryPolicy>()
2307        .copied()
2308        .unwrap_or_default()
2309}
2310
2311/// Unwrap the Outcome enum layer from a persisted event payload.
2312///
2313/// Events are stored with `outcome.to_json_value()` which serializes the full
2314/// Outcome enum, e.g. `{"Next": {"order_id": "1001", ...}}`. The resumption
2315/// handler expects the raw inner value, so we extract it here.
2316fn unwrap_outcome_payload(payload: Option<&serde_json::Value>) -> Option<serde_json::Value> {
2317    payload.map(|p| {
2318        p.get("Next")
2319            .or_else(|| p.get("Branch"))
2320            .or_else(|| p.get("Jump"))
2321            .cloned()
2322            .unwrap_or_else(|| p.clone())
2323    })
2324}
2325
2326async fn load_persistence_version(
2327    handle: &PersistenceHandle,
2328    trace_id: &str,
2329) -> (
2330    u64,
2331    Option<String>,
2332    Option<crate::persistence::Intervention>,
2333    Option<String>,
2334    Option<serde_json::Value>,
2335) {
2336    let store = handle.store();
2337    match store.load(trace_id).await {
2338        Ok(Some(trace)) => {
2339            let (next_step, last_node_id, last_payload) =
2340                if let Some(resume_from_step) = trace.resumed_from_step {
2341                    let anchor_event = trace
2342                        .events
2343                        .iter()
2344                        .rev()
2345                        .find(|event| {
2346                            event.step <= resume_from_step
2347                                && event.outcome_kind == "Next"
2348                                && event.payload.is_some()
2349                        })
2350                        .or_else(|| {
2351                            trace.events.iter().rev().find(|event| {
2352                                event.step <= resume_from_step
2353                                    && event.outcome_kind != "Fault"
2354                                    && event.payload.is_some()
2355                            })
2356                        })
2357                        .or_else(|| {
2358                            trace.events.iter().rev().find(|event| {
2359                                event.step <= resume_from_step && event.payload.is_some()
2360                            })
2361                        })
2362                        .or_else(|| trace.events.last());
2363
2364                    (
2365                        resume_from_step.saturating_add(1),
2366                        anchor_event.and_then(|event| event.node_id.clone()),
2367                        anchor_event.and_then(|event| {
2368                            unwrap_outcome_payload(event.payload.as_ref())
2369                        }),
2370                    )
2371                } else {
2372                    let last_event = trace.events.last();
2373                    (
2374                        last_event
2375                            .map(|event| event.step.saturating_add(1))
2376                            .unwrap_or(0),
2377                        last_event.and_then(|event| event.node_id.clone()),
2378                        last_event.and_then(|event| {
2379                            unwrap_outcome_payload(event.payload.as_ref())
2380                        }),
2381                    )
2382                };
2383
2384            (
2385                next_step,
2386                Some(trace.schematic_version),
2387                trace.interventions.last().cloned(),
2388                last_node_id,
2389                last_payload,
2390            )
2391        }
2392        Ok(None) => (0, None, None, None, None),
2393        Err(err) => {
2394            tracing::warn!(
2395                trace_id = %trace_id,
2396                error = %err,
2397                "Failed to load persistence trace; falling back to step=0"
2398            );
2399            (0, None, None, None, None)
2400        }
2401    }
2402}
2403
2404#[allow(clippy::too_many_arguments)]
2405async fn run_this_step<In, Out, E, Res>(
2406    trans: &(impl Transition<In, Out, Resources = Res, Error = E> + Clone + 'static),
2407    state: In,
2408    res: &Res,
2409    bus: &mut Bus,
2410    node_id: &str,
2411    node_label: &str,
2412    bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2413    step_idx: u64,
2414) -> Outcome<Out, E>
2415where
2416    In: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2417    Out: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2418    E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2419    Res: ranvier_core::transition::ResourceRequirement,
2420{
2421    let label = trans.label();
2422    let res_type = std::any::type_name::<Res>()
2423        .split("::")
2424        .last()
2425        .unwrap_or("unknown");
2426
2427    // Debug pausing
2428    let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2429        debug.should_pause(node_id)
2430    } else {
2431        false
2432    };
2433
2434    if should_pause {
2435        let trace_id = persistence_trace_id(bus);
2436        tracing::event!(
2437            target: "ranvier.debugger",
2438            tracing::Level::INFO,
2439            trace_id = %trace_id,
2440            node_id = %node_id,
2441            "Node paused"
2442        );
2443
2444        if let Some(timeline) = bus.read_mut::<Timeline>() {
2445            timeline.push(TimelineEvent::NodePaused {
2446                node_id: node_id.to_string(),
2447                timestamp: now_ms(),
2448            });
2449        }
2450        if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2451            debug.wait().await;
2452        }
2453    }
2454
2455    let enter_ts = now_ms();
2456    if let Some(timeline) = bus.read_mut::<Timeline>() {
2457        timeline.push(TimelineEvent::NodeEnter {
2458            node_id: node_id.to_string(),
2459            node_label: node_label.to_string(),
2460            timestamp: enter_ts,
2461        });
2462    }
2463
2464    // Check DLQ retry policy and pre-serialize state for potential retries
2465    let dlq_retry_config = bus.read::<DlqPolicy>().and_then(|p| {
2466        if let DlqPolicy::RetryThenDlq {
2467            max_attempts,
2468            backoff_ms,
2469        } = p
2470        {
2471            Some((*max_attempts, *backoff_ms))
2472        } else {
2473            None
2474        }
2475    });
2476    let retry_state_snapshot = if dlq_retry_config.is_some() {
2477        serde_json::to_value(&state).ok()
2478    } else {
2479        None
2480    };
2481
2482    // State capture for Saga - SERIALIZE BEFORE CONSUMPTION
2483    let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2484        Some(serde_json::to_vec(&state).unwrap_or_default())
2485    } else {
2486        None
2487    };
2488
2489    let node_span = tracing::info_span!(
2490        "Node",
2491        ranvier.node = %label,
2492        ranvier.resource_type = %res_type,
2493        ranvier.outcome_kind = tracing::field::Empty,
2494        ranvier.outcome_target = tracing::field::Empty
2495    );
2496    let started = std::time::Instant::now();
2497    bus.set_access_policy(label.clone(), bus_policy.clone());
2498    let result = trans
2499        .run(state, res, bus)
2500        .instrument(node_span.clone())
2501        .await;
2502    bus.clear_access_policy();
2503
2504    // DLQ Retry loop: if first attempt faulted and RetryThenDlq is configured,
2505    // retry with exponential backoff before giving up.
2506    let result = if let Outcome::Fault(_) = &result {
2507        if let (Some((max_attempts, backoff_ms)), Some(snapshot)) =
2508            (dlq_retry_config, &retry_state_snapshot)
2509        {
2510            let mut final_result = result;
2511            // attempt 1 already done; retry from 2..=max_attempts
2512            for attempt in 2..=max_attempts {
2513                let delay = backoff_ms.saturating_mul(2u64.saturating_pow(attempt - 2));
2514
2515                tracing::info!(
2516                    ranvier.node = %label,
2517                    attempt = attempt,
2518                    max_attempts = max_attempts,
2519                    backoff_ms = delay,
2520                    "Retrying faulted node"
2521                );
2522
2523                if let Some(timeline) = bus.read_mut::<Timeline>() {
2524                    timeline.push(TimelineEvent::NodeRetry {
2525                        node_id: node_id.to_string(),
2526                        attempt,
2527                        max_attempts,
2528                        backoff_ms: delay,
2529                        timestamp: now_ms(),
2530                    });
2531                }
2532
2533                tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
2534
2535                if let Ok(retry_state) = serde_json::from_value::<In>(snapshot.clone()) {
2536                    bus.set_access_policy(label.clone(), bus_policy.clone());
2537                    let retry_result = trans
2538                        .run(retry_state, res, bus)
2539                        .instrument(tracing::info_span!(
2540                            "NodeRetry",
2541                            ranvier.node = %label,
2542                            attempt = attempt
2543                        ))
2544                        .await;
2545                    bus.clear_access_policy();
2546
2547                    match &retry_result {
2548                        Outcome::Fault(_) => {
2549                            final_result = retry_result;
2550                        }
2551                        _ => {
2552                            final_result = retry_result;
2553                            break;
2554                        }
2555                    }
2556                }
2557            }
2558            final_result
2559        } else {
2560            result
2561        }
2562    } else {
2563        result
2564    };
2565
2566    node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
2567    if let Some(target) = outcome_target(&result) {
2568        node_span.record("ranvier.outcome_target", tracing::field::display(&target));
2569    }
2570    let duration_ms = started.elapsed().as_millis() as u64;
2571    let exit_ts = now_ms();
2572
2573    if let Some(timeline) = bus.read_mut::<Timeline>() {
2574        timeline.push(TimelineEvent::NodeExit {
2575            node_id: node_id.to_string(),
2576            outcome_type: outcome_type_name(&result),
2577            duration_ms,
2578            timestamp: exit_ts,
2579        });
2580
2581        if let Outcome::Branch(branch_id, _) = &result {
2582            timeline.push(TimelineEvent::Branchtaken {
2583                branch_id: branch_id.clone(),
2584                timestamp: exit_ts,
2585            });
2586        }
2587    }
2588
2589    // Push to Saga Stack if Next outcome and snapshot taken
2590    if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot)
2591        && let Some(stack) = bus.read_mut::<SagaStack>()
2592    {
2593        stack.push(node_id.to_string(), label.clone(), snapshot);
2594    }
2595
2596    if let Some(handle) = bus.read::<PersistenceHandle>() {
2597        let trace_id = persistence_trace_id(bus);
2598        let circuit = bus
2599            .read::<ranvier_core::schematic::Schematic>()
2600            .map(|s| s.name.clone())
2601            .unwrap_or_default();
2602        let version = bus
2603            .read::<ranvier_core::schematic::Schematic>()
2604            .map(|s| s.schema_version.clone())
2605            .unwrap_or_default();
2606
2607        persist_execution_event(
2608            handle,
2609            &trace_id,
2610            &circuit,
2611            &version,
2612            step_idx,
2613            Some(node_id.to_string()),
2614            outcome_kind_name(&result),
2615            Some(result.to_json_value()),
2616        )
2617        .await;
2618    }
2619
2620    // DLQ reporting — only fires after all retries are exhausted (RetryThenDlq)
2621    // or immediately (SendToDlq). Drop policy skips entirely.
2622    if let Outcome::Fault(f) = &result {
2623        // Read policy and sink, then drop the borrows before mutable timeline access
2624        let dlq_action = {
2625            let policy = bus.read::<DlqPolicy>();
2626            let sink = bus.read::<Arc<dyn DlqSink>>();
2627            match (sink, policy) {
2628                (Some(s), Some(p)) if !matches!(p, DlqPolicy::Drop) => Some(s.clone()),
2629                _ => None,
2630            }
2631        };
2632
2633        if let Some(sink) = dlq_action {
2634            if let Some((max_attempts, _)) = dlq_retry_config
2635                && let Some(timeline) = bus.read_mut::<Timeline>()
2636            {
2637                timeline.push(TimelineEvent::DlqExhausted {
2638                    node_id: node_id.to_string(),
2639                    total_attempts: max_attempts,
2640                    timestamp: now_ms(),
2641                });
2642            }
2643
2644            let trace_id = persistence_trace_id(bus);
2645            let circuit = bus
2646                .read::<ranvier_core::schematic::Schematic>()
2647                .map(|s| s.name.clone())
2648                .unwrap_or_default();
2649            let _ = sink
2650                .store_dead_letter(
2651                    &trace_id,
2652                    &circuit,
2653                    node_id,
2654                    &format!("{:?}", f),
2655                    &serde_json::to_vec(&f).unwrap_or_default(),
2656                )
2657                .await;
2658        }
2659    }
2660
2661    result
2662}
2663
2664#[allow(clippy::too_many_arguments)]
2665async fn run_this_compensated_step<Out, Next, E, Res, Comp>(
2666    trans: &(impl Transition<Out, Next, Resources = Res, Error = E> + Clone + 'static),
2667    comp: &Comp,
2668    state: Out,
2669    res: &Res,
2670    bus: &mut Bus,
2671    node_id: &str,
2672    comp_node_id: &str,
2673    node_label: &str,
2674    bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2675    step_idx: u64,
2676) -> Outcome<Next, E>
2677where
2678    Out: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
2679    Next: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2680    E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2681    Res: ranvier_core::transition::ResourceRequirement,
2682    Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
2683{
2684    let label = trans.label();
2685
2686    // Debug pausing
2687    let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2688        debug.should_pause(node_id)
2689    } else {
2690        false
2691    };
2692
2693    if should_pause {
2694        let trace_id = persistence_trace_id(bus);
2695        tracing::event!(
2696            target: "ranvier.debugger",
2697            tracing::Level::INFO,
2698            trace_id = %trace_id,
2699            node_id = %node_id,
2700            "Node paused (compensated)"
2701        );
2702
2703        if let Some(timeline) = bus.read_mut::<Timeline>() {
2704            timeline.push(TimelineEvent::NodePaused {
2705                node_id: node_id.to_string(),
2706                timestamp: now_ms(),
2707            });
2708        }
2709        if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2710            debug.wait().await;
2711        }
2712    }
2713
2714    let enter_ts = now_ms();
2715    if let Some(timeline) = bus.read_mut::<Timeline>() {
2716        timeline.push(TimelineEvent::NodeEnter {
2717            node_id: node_id.to_string(),
2718            node_label: node_label.to_string(),
2719            timestamp: enter_ts,
2720        });
2721    }
2722
2723    // State capture for Saga - SERIALIZE BEFORE CONSUMPTION
2724    let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2725        Some(serde_json::to_vec(&state).unwrap_or_default())
2726    } else {
2727        None
2728    };
2729
2730    let node_span = tracing::info_span!("Node", ranvier.node = %label);
2731    bus.set_access_policy(label.clone(), bus_policy.clone());
2732    let result = trans
2733        .run(state.clone(), res, bus)
2734        .instrument(node_span)
2735        .await;
2736    bus.clear_access_policy();
2737
2738    let duration_ms = 0; // Simplified
2739    let exit_ts = now_ms();
2740
2741    if let Some(timeline) = bus.read_mut::<Timeline>() {
2742        timeline.push(TimelineEvent::NodeExit {
2743            node_id: node_id.to_string(),
2744            outcome_type: outcome_type_name(&result),
2745            duration_ms,
2746            timestamp: exit_ts,
2747        });
2748    }
2749
2750    // Automated Compensation Trigger
2751    if let Outcome::Fault(ref err) = result {
2752        if compensation_auto_trigger(bus) {
2753            tracing::info!(
2754                ranvier.node = %label,
2755                ranvier.compensation.trigger = "saga",
2756                error = ?err,
2757                "Saga compensation triggered"
2758            );
2759
2760            if let Some(timeline) = bus.read_mut::<Timeline>() {
2761                timeline.push(TimelineEvent::NodeEnter {
2762                    node_id: comp_node_id.to_string(),
2763                    node_label: format!("Compensate: {}", comp.label()),
2764                    timestamp: exit_ts,
2765                });
2766            }
2767
2768            // Run compensation
2769            let _ = comp.run(state, res, bus).await;
2770
2771            if let Some(timeline) = bus.read_mut::<Timeline>() {
2772                timeline.push(TimelineEvent::NodeExit {
2773                    node_id: comp_node_id.to_string(),
2774                    outcome_type: "Compensated".to_string(),
2775                    duration_ms: 0,
2776                    timestamp: now_ms(),
2777                });
2778            }
2779
2780            if let Some(handle) = bus.read::<PersistenceHandle>() {
2781                let trace_id = persistence_trace_id(bus);
2782                let circuit = bus
2783                    .read::<ranvier_core::schematic::Schematic>()
2784                    .map(|s| s.name.clone())
2785                    .unwrap_or_default();
2786                let version = bus
2787                    .read::<ranvier_core::schematic::Schematic>()
2788                    .map(|s| s.schema_version.clone())
2789                    .unwrap_or_default();
2790
2791                persist_execution_event(
2792                    handle,
2793                    &trace_id,
2794                    &circuit,
2795                    &version,
2796                    step_idx + 1, // Compensation node index
2797                    Some(comp_node_id.to_string()),
2798                    "Compensated",
2799                    None,
2800                )
2801                .await;
2802            }
2803        }
2804    } else if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot) {
2805        // Push to Saga Stack if Next outcome and snapshot taken
2806        if let Some(stack) = bus.read_mut::<SagaStack>() {
2807            stack.push(node_id.to_string(), label.clone(), snapshot);
2808        }
2809
2810        if let Some(handle) = bus.read::<PersistenceHandle>() {
2811            let trace_id = persistence_trace_id(bus);
2812            let circuit = bus
2813                .read::<ranvier_core::schematic::Schematic>()
2814                .map(|s| s.name.clone())
2815                .unwrap_or_default();
2816            let version = bus
2817                .read::<ranvier_core::schematic::Schematic>()
2818                .map(|s| s.schema_version.clone())
2819                .unwrap_or_default();
2820
2821            persist_execution_event(
2822                handle,
2823                &trace_id,
2824                &circuit,
2825                &version,
2826                step_idx,
2827                Some(node_id.to_string()),
2828                outcome_kind_name(&result),
2829                Some(result.to_json_value()),
2830            )
2831            .await;
2832        }
2833    }
2834
2835    // DLQ reporting for compensated steps
2836    if let Outcome::Fault(f) = &result
2837        && let (Some(sink), Some(policy)) =
2838            (bus.read::<Arc<dyn DlqSink>>(), bus.read::<DlqPolicy>())
2839    {
2840        let should_dlq = !matches!(policy, DlqPolicy::Drop);
2841        if should_dlq {
2842            let trace_id = persistence_trace_id(bus);
2843            let circuit = bus
2844                .read::<ranvier_core::schematic::Schematic>()
2845                .map(|s| s.name.clone())
2846                .unwrap_or_default();
2847            let _ = sink
2848                .store_dead_letter(
2849                    &trace_id,
2850                    &circuit,
2851                    node_id,
2852                    &format!("{:?}", f),
2853                    &serde_json::to_vec(&f).unwrap_or_default(),
2854                )
2855                .await;
2856        }
2857    }
2858
2859    result
2860}
2861
2862#[allow(clippy::too_many_arguments)]
2863pub async fn persist_execution_event(
2864    handle: &PersistenceHandle,
2865    trace_id: &str,
2866    circuit: &str,
2867    schematic_version: &str,
2868    step: u64,
2869    node_id: Option<String>,
2870    outcome_kind: &str,
2871    payload: Option<serde_json::Value>,
2872) {
2873    let store = handle.store();
2874    let envelope = PersistenceEnvelope {
2875        trace_id: trace_id.to_string(),
2876        circuit: circuit.to_string(),
2877        schematic_version: schematic_version.to_string(),
2878        step,
2879        node_id,
2880        outcome_kind: outcome_kind.to_string(),
2881        timestamp_ms: now_ms(),
2882        payload_hash: None,
2883        payload,
2884    };
2885
2886    if let Err(err) = store.append(envelope).await {
2887        tracing::warn!(
2888            trace_id = %trace_id,
2889            circuit = %circuit,
2890            step,
2891            outcome_kind = %outcome_kind,
2892            error = %err,
2893            "Failed to append persistence envelope"
2894        );
2895    }
2896}
2897
2898async fn persist_completion(
2899    handle: &PersistenceHandle,
2900    trace_id: &str,
2901    completion: CompletionState,
2902) {
2903    let store = handle.store();
2904    if let Err(err) = store.complete(trace_id, completion).await {
2905        tracing::warn!(
2906            trace_id = %trace_id,
2907            error = %err,
2908            "Failed to complete persistence trace"
2909        );
2910    }
2911}
2912
2913fn compensation_idempotency_key(context: &CompensationContext) -> String {
2914    format!(
2915        "{}:{}:{}",
2916        context.trace_id, context.circuit, context.fault_kind
2917    )
2918}
2919
2920async fn run_compensation(
2921    handle: &CompensationHandle,
2922    context: CompensationContext,
2923    retry_policy: CompensationRetryPolicy,
2924    idempotency: Option<CompensationIdempotencyHandle>,
2925) -> bool {
2926    let hook = handle.hook();
2927    let key = compensation_idempotency_key(&context);
2928
2929    if let Some(store_handle) = idempotency.as_ref() {
2930        let store = store_handle.store();
2931        match store.was_compensated(&key).await {
2932            Ok(true) => {
2933                tracing::info!(
2934                    trace_id = %context.trace_id,
2935                    circuit = %context.circuit,
2936                    key = %key,
2937                    "Compensation already recorded; skipping duplicate hook execution"
2938                );
2939                return true;
2940            }
2941            Ok(false) => {}
2942            Err(err) => {
2943                tracing::warn!(
2944                    trace_id = %context.trace_id,
2945                    key = %key,
2946                    error = %err,
2947                    "Failed to check compensation idempotency state"
2948                );
2949            }
2950        }
2951    }
2952
2953    let max_attempts = retry_policy.max_attempts.max(1);
2954    for attempt in 1..=max_attempts {
2955        match hook.compensate(context.clone()).await {
2956            Ok(()) => {
2957                if let Some(store_handle) = idempotency.as_ref() {
2958                    let store = store_handle.store();
2959                    if let Err(err) = store.mark_compensated(&key).await {
2960                        tracing::warn!(
2961                            trace_id = %context.trace_id,
2962                            key = %key,
2963                            error = %err,
2964                            "Failed to mark compensation idempotency state"
2965                        );
2966                    }
2967                }
2968                return true;
2969            }
2970            Err(err) => {
2971                let is_last = attempt == max_attempts;
2972                tracing::warn!(
2973                    trace_id = %context.trace_id,
2974                    circuit = %context.circuit,
2975                    fault_kind = %context.fault_kind,
2976                    fault_step = context.fault_step,
2977                    attempt,
2978                    max_attempts,
2979                    error = %err,
2980                    "Compensation hook attempt failed"
2981                );
2982                if !is_last && retry_policy.backoff_ms > 0 {
2983                    tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
2984                        .await;
2985                }
2986            }
2987        }
2988    }
2989    false
2990}
2991
2992fn ensure_timeline(bus: &mut Bus) -> bool {
2993    if bus.has::<Timeline>() {
2994        false
2995    } else {
2996        bus.insert(Timeline::new());
2997        true
2998    }
2999}
3000
3001fn should_attach_timeline(bus: &Bus) -> bool {
3002    // Respect explicitly provided timeline collector from caller.
3003    if bus.has::<Timeline>() {
3004        return true;
3005    }
3006
3007    // Attach timeline when runtime export path exists.
3008    has_timeline_output_path()
3009}
3010
3011fn has_timeline_output_path() -> bool {
3012    std::env::var("RANVIER_TIMELINE_OUTPUT")
3013        .ok()
3014        .map(|v| !v.trim().is_empty())
3015        .unwrap_or(false)
3016}
3017
3018fn timeline_sample_rate() -> f64 {
3019    std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
3020        .ok()
3021        .and_then(|v| v.parse::<f64>().ok())
3022        .map(|v| v.clamp(0.0, 1.0))
3023        .unwrap_or(1.0)
3024}
3025
3026fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
3027    if rate <= 0.0 {
3028        return false;
3029    }
3030    if rate >= 1.0 {
3031        return true;
3032    }
3033    let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
3034    bucket < rate
3035}
3036
3037fn timeline_adaptive_policy() -> String {
3038    std::env::var("RANVIER_TIMELINE_ADAPTIVE")
3039        .unwrap_or_else(|_| "fault_branch".to_string())
3040        .to_ascii_lowercase()
3041}
3042
3043fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
3044    match policy {
3045        "off" => false,
3046        "fault_only" => matches!(outcome, Outcome::Fault(_)),
3047        "fault_branch_emit" => {
3048            matches!(
3049                outcome,
3050                Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
3051            )
3052        }
3053        _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
3054    }
3055}
3056
3057#[derive(Default, Clone)]
3058struct SamplingStats {
3059    total_decisions: u64,
3060    exported: u64,
3061    skipped: u64,
3062    sampled_exports: u64,
3063    forced_exports: u64,
3064    last_mode: String,
3065    last_policy: String,
3066    last_updated_ms: u64,
3067}
3068
3069static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
3070
3071fn stats_cell() -> &'static Mutex<SamplingStats> {
3072    TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
3073}
3074
3075fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
3076    let snapshot = {
3077        let mut stats = match stats_cell().lock() {
3078            Ok(guard) => guard,
3079            Err(_) => return,
3080        };
3081
3082        stats.total_decisions += 1;
3083        if exported {
3084            stats.exported += 1;
3085        } else {
3086            stats.skipped += 1;
3087        }
3088        if sampled && exported {
3089            stats.sampled_exports += 1;
3090        }
3091        if forced && exported {
3092            stats.forced_exports += 1;
3093        }
3094        stats.last_mode = mode.to_string();
3095        stats.last_policy = policy.to_string();
3096        stats.last_updated_ms = now_ms();
3097        stats.clone()
3098    };
3099
3100    tracing::debug!(
3101        ranvier.timeline.total_decisions = snapshot.total_decisions,
3102        ranvier.timeline.exported = snapshot.exported,
3103        ranvier.timeline.skipped = snapshot.skipped,
3104        ranvier.timeline.sampled_exports = snapshot.sampled_exports,
3105        ranvier.timeline.forced_exports = snapshot.forced_exports,
3106        ranvier.timeline.mode = %snapshot.last_mode,
3107        ranvier.timeline.policy = %snapshot.last_policy,
3108        "Timeline sampling stats updated"
3109    );
3110
3111    if let Some(path) = timeline_stats_output_path() {
3112        let payload = serde_json::json!({
3113            "total_decisions": snapshot.total_decisions,
3114            "exported": snapshot.exported,
3115            "skipped": snapshot.skipped,
3116            "sampled_exports": snapshot.sampled_exports,
3117            "forced_exports": snapshot.forced_exports,
3118            "last_mode": snapshot.last_mode,
3119            "last_policy": snapshot.last_policy,
3120            "last_updated_ms": snapshot.last_updated_ms
3121        });
3122        if let Some(parent) = Path::new(&path).parent() {
3123            let _ = fs::create_dir_all(parent);
3124        }
3125        if let Err(err) = fs::write(&path, payload.to_string()) {
3126            tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
3127        }
3128    }
3129}
3130
3131fn timeline_stats_output_path() -> Option<String> {
3132    std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
3133        .ok()
3134        .filter(|v| !v.trim().is_empty())
3135}
3136
3137fn write_timeline_with_policy(
3138    path: &str,
3139    mode: &str,
3140    mut timeline: Timeline,
3141) -> Result<(), String> {
3142    match mode {
3143        "append" => {
3144            if Path::new(path).exists() {
3145                let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
3146                match serde_json::from_str::<Timeline>(&content) {
3147                    Ok(mut existing) => {
3148                        existing.events.append(&mut timeline.events);
3149                        existing.sort();
3150                        if let Some(max_events) = max_events_limit() {
3151                            truncate_timeline_events(&mut existing, max_events);
3152                        }
3153                        write_timeline_json(path, &existing)
3154                    }
3155                    Err(_) => {
3156                        // Fallback: if existing is invalid, replace with current timeline
3157                        if let Some(max_events) = max_events_limit() {
3158                            truncate_timeline_events(&mut timeline, max_events);
3159                        }
3160                        write_timeline_json(path, &timeline)
3161                    }
3162                }
3163            } else {
3164                if let Some(max_events) = max_events_limit() {
3165                    truncate_timeline_events(&mut timeline, max_events);
3166                }
3167                write_timeline_json(path, &timeline)
3168            }
3169        }
3170        "rotate" => {
3171            let rotated_path = rotated_path(path, now_ms());
3172            write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
3173            if let Some(keep) = rotate_keep_limit() {
3174                cleanup_rotated_files(path, keep)?;
3175            }
3176            Ok(())
3177        }
3178        _ => write_timeline_json(path, &timeline),
3179    }
3180}
3181
3182fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
3183    if let Some(parent) = Path::new(path).parent()
3184        && !parent.as_os_str().is_empty()
3185    {
3186        fs::create_dir_all(parent).map_err(|e| e.to_string())?;
3187    }
3188    let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
3189    fs::write(path, json).map_err(|e| e.to_string())
3190}
3191
3192fn rotated_path(path: &str, suffix: u64) -> PathBuf {
3193    let p = Path::new(path);
3194    let parent = p.parent().unwrap_or_else(|| Path::new(""));
3195    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
3196    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
3197    parent.join(format!("{}_{}.{}", stem, suffix, ext))
3198}
3199
3200fn max_events_limit() -> Option<usize> {
3201    std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
3202        .ok()
3203        .and_then(|v| v.parse::<usize>().ok())
3204        .filter(|v| *v > 0)
3205}
3206
3207fn rotate_keep_limit() -> Option<usize> {
3208    std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
3209        .ok()
3210        .and_then(|v| v.parse::<usize>().ok())
3211        .filter(|v| *v > 0)
3212}
3213
3214fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
3215    let len = timeline.events.len();
3216    if len > max_events {
3217        let keep_from = len - max_events;
3218        timeline.events = timeline.events.split_off(keep_from);
3219    }
3220}
3221
3222fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
3223    let p = Path::new(base_path);
3224    let parent = p.parent().unwrap_or_else(|| Path::new("."));
3225    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
3226    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
3227    let prefix = format!("{}_", stem);
3228    let suffix = format!(".{}", ext);
3229
3230    let mut files = fs::read_dir(parent)
3231        .map_err(|e| e.to_string())?
3232        .filter_map(|entry| entry.ok())
3233        .filter(|entry| {
3234            let name = entry.file_name();
3235            let name = name.to_string_lossy();
3236            name.starts_with(&prefix) && name.ends_with(&suffix)
3237        })
3238        .map(|entry| {
3239            let modified = entry
3240                .metadata()
3241                .ok()
3242                .and_then(|m| m.modified().ok())
3243                .unwrap_or(SystemTime::UNIX_EPOCH);
3244            (entry.path(), modified)
3245        })
3246        .collect::<Vec<_>>();
3247
3248    files.sort_by(|a, b| b.1.cmp(&a.1));
3249    for (path, _) in files.into_iter().skip(keep) {
3250        let _ = fs::remove_file(path);
3251    }
3252    Ok(())
3253}
3254
3255fn bus_capability_schema_from_policy(
3256    policy: Option<ranvier_core::bus::BusAccessPolicy>,
3257) -> Option<BusCapabilitySchema> {
3258    let policy = policy?;
3259
3260    let mut allow = policy
3261        .allow
3262        .unwrap_or_default()
3263        .into_iter()
3264        .map(|entry| entry.type_name.to_string())
3265        .collect::<Vec<_>>();
3266    let mut deny = policy
3267        .deny
3268        .into_iter()
3269        .map(|entry| entry.type_name.to_string())
3270        .collect::<Vec<_>>();
3271    allow.sort();
3272    deny.sort();
3273
3274    if allow.is_empty() && deny.is_empty() {
3275        return None;
3276    }
3277
3278    Some(BusCapabilitySchema { allow, deny })
3279}
3280
3281fn now_ms() -> u64 {
3282    SystemTime::now()
3283        .duration_since(UNIX_EPOCH)
3284        .map(|d| d.as_millis() as u64)
3285        .unwrap_or(0)
3286}
3287
3288#[cfg(test)]
3289mod tests {
3290    use super::{
3291        Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
3292        should_force_export,
3293    };
3294    use crate::persistence::{
3295        CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
3296        CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
3297        InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
3298        PersistenceHandle, PersistenceStore, PersistenceTraceId,
3299    };
3300    use anyhow::Result;
3301    use async_trait::async_trait;
3302    use ranvier_audit::{AuditError, AuditEvent, AuditSink};
3303    use ranvier_core::event::{DlqPolicy, DlqSink};
3304    use ranvier_core::saga::SagaStack;
3305    use ranvier_core::timeline::{Timeline, TimelineEvent};
3306    use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
3307    use serde::{Deserialize, Serialize};
3308    use std::sync::Arc;
3309    use tokio::sync::Mutex;
3310    use uuid::Uuid;
3311
3312    struct MockAuditSink {
3313        events: Arc<Mutex<Vec<AuditEvent>>>,
3314    }
3315
3316    #[async_trait]
3317    impl AuditSink for MockAuditSink {
3318        async fn append(&self, event: &AuditEvent) -> Result<(), AuditError> {
3319            self.events.lock().await.push(event.clone());
3320            Ok(())
3321        }
3322    }
3323
3324    #[tokio::test]
3325    async fn execute_logs_audit_events_for_intervention() {
3326        use ranvier_inspector::StateInspector;
3327
3328        let trace_id = "test-audit-trace";
3329        let store_impl = InMemoryPersistenceStore::new();
3330        let events = Arc::new(Mutex::new(Vec::new()));
3331        let sink = MockAuditSink {
3332            events: events.clone(),
3333        };
3334
3335        let axon = Axon::<i32, i32, TestInfallible>::start("AuditTest")
3336            .then(AddOne)
3337            .with_persistence_store(store_impl.clone())
3338            .with_audit_sink(sink);
3339
3340        let mut bus = Bus::new();
3341        bus.insert(PersistenceHandle::from_arc(Arc::new(store_impl.clone())));
3342        bus.insert(PersistenceTraceId::new(trace_id));
3343        let target_node_id = axon.schematic.nodes[0].id.clone();
3344
3345        // 0. Pre-requisite: Save an initial trace state so intervention has a target to attach to
3346        store_impl
3347            .append(crate::persistence::PersistenceEnvelope {
3348                trace_id: trace_id.to_string(),
3349                circuit: "AuditTest".to_string(),
3350                schematic_version: "v1.0".to_string(),
3351                step: 0,
3352                node_id: None,
3353                outcome_kind: "Next".to_string(),
3354                timestamp_ms: 0,
3355                payload_hash: None,
3356                payload: None,
3357            })
3358            .await
3359            .unwrap();
3360
3361        // 1. Trigger force_resume (should log ForceResume)
3362        axon.force_resume(trace_id, &target_node_id, None)
3363            .await
3364            .unwrap();
3365
3366        // 2. Execute (should log ApplyIntervention)
3367        axon.execute(10, &(), &mut bus).await;
3368
3369        let recorded = events.lock().await;
3370        assert_eq!(
3371            recorded.len(),
3372            2,
3373            "Should have 2 audit events: ForceResume and ApplyIntervention"
3374        );
3375        assert_eq!(recorded[0].action, "ForceResume");
3376        assert_eq!(recorded[0].target, trace_id);
3377        assert_eq!(recorded[1].action, "ApplyIntervention");
3378        assert_eq!(recorded[1].target, trace_id);
3379    }
3380
3381    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
3382    pub enum TestInfallible {}
3383
3384    #[test]
3385    fn inspector_enabled_flag_matrix() {
3386        assert!(inspector_enabled_from_value(None));
3387        assert!(inspector_enabled_from_value(Some("1")));
3388        assert!(inspector_enabled_from_value(Some("true")));
3389        assert!(inspector_enabled_from_value(Some("on")));
3390        assert!(!inspector_enabled_from_value(Some("0")));
3391        assert!(!inspector_enabled_from_value(Some("false")));
3392    }
3393
3394    #[test]
3395    fn inspector_dev_mode_matrix() {
3396        assert!(inspector_dev_mode_from_value(None));
3397        assert!(inspector_dev_mode_from_value(Some("dev")));
3398        assert!(inspector_dev_mode_from_value(Some("staging")));
3399        assert!(!inspector_dev_mode_from_value(Some("prod")));
3400        assert!(!inspector_dev_mode_from_value(Some("production")));
3401    }
3402
3403    #[test]
3404    fn adaptive_policy_force_export_matrix() {
3405        let next = Outcome::<(), &'static str>::Next(());
3406        let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
3407        let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
3408        let fault = Outcome::<(), &'static str>::Fault("boom");
3409
3410        assert!(!should_force_export(&next, "off"));
3411        assert!(!should_force_export(&fault, "off"));
3412
3413        assert!(!should_force_export(&branch, "fault_only"));
3414        assert!(should_force_export(&fault, "fault_only"));
3415
3416        assert!(should_force_export(&branch, "fault_branch"));
3417        assert!(!should_force_export(&emit, "fault_branch"));
3418        assert!(should_force_export(&fault, "fault_branch"));
3419
3420        assert!(should_force_export(&branch, "fault_branch_emit"));
3421        assert!(should_force_export(&emit, "fault_branch_emit"));
3422        assert!(should_force_export(&fault, "fault_branch_emit"));
3423    }
3424
3425    #[test]
3426    fn sampling_and_adaptive_combination_decisions() {
3427        let bus_id = Uuid::nil();
3428        let next = Outcome::<(), &'static str>::Next(());
3429        let fault = Outcome::<(), &'static str>::Fault("boom");
3430
3431        let sampled_never = sampled_by_bus_id(bus_id, 0.0);
3432        assert!(!sampled_never);
3433        assert!(!(sampled_never || should_force_export(&next, "off")));
3434        assert!(sampled_never || should_force_export(&fault, "fault_only"));
3435
3436        let sampled_always = sampled_by_bus_id(bus_id, 1.0);
3437        assert!(sampled_always);
3438        assert!(sampled_always || should_force_export(&next, "off"));
3439        assert!(sampled_always || should_force_export(&fault, "off"));
3440    }
3441
3442    #[derive(Clone)]
3443    struct AddOne;
3444
3445    #[async_trait]
3446    impl Transition<i32, i32> for AddOne {
3447        type Error = TestInfallible;
3448        type Resources = ();
3449
3450        async fn run(
3451            &self,
3452            state: i32,
3453            _resources: &Self::Resources,
3454            _bus: &mut Bus,
3455        ) -> Outcome<i32, Self::Error> {
3456            Outcome::Next(state + 1)
3457        }
3458    }
3459
3460    #[derive(Clone)]
3461    struct AlwaysFault;
3462
3463    #[async_trait]
3464    impl Transition<i32, i32> for AlwaysFault {
3465        type Error = String;
3466        type Resources = ();
3467
3468        async fn run(
3469            &self,
3470            _state: i32,
3471            _resources: &Self::Resources,
3472            _bus: &mut Bus,
3473        ) -> Outcome<i32, Self::Error> {
3474            Outcome::Fault("boom".to_string())
3475        }
3476    }
3477
3478    #[derive(Clone)]
3479    struct CapabilityGuarded;
3480
3481    #[async_trait]
3482    impl Transition<(), ()> for CapabilityGuarded {
3483        type Error = String;
3484        type Resources = ();
3485
3486        fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
3487            Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
3488        }
3489
3490        async fn run(
3491            &self,
3492            _state: (),
3493            _resources: &Self::Resources,
3494            bus: &mut Bus,
3495        ) -> Outcome<(), Self::Error> {
3496            match bus.get::<String>() {
3497                Ok(_) => Outcome::Next(()),
3498                Err(err) => Outcome::Fault(err.to_string()),
3499            }
3500        }
3501    }
3502
3503    #[derive(Clone)]
3504    struct RecordingCompensationHook {
3505        calls: Arc<Mutex<Vec<CompensationContext>>>,
3506        should_fail: bool,
3507    }
3508
3509    #[async_trait]
3510    impl CompensationHook for RecordingCompensationHook {
3511        async fn compensate(&self, context: CompensationContext) -> Result<()> {
3512            self.calls.lock().await.push(context);
3513            if self.should_fail {
3514                return Err(anyhow::anyhow!("compensation failed"));
3515            }
3516            Ok(())
3517        }
3518    }
3519
3520    #[derive(Clone)]
3521    struct FlakyCompensationHook {
3522        calls: Arc<Mutex<u32>>,
3523        failures_remaining: Arc<Mutex<u32>>,
3524    }
3525
3526    #[async_trait]
3527    impl CompensationHook for FlakyCompensationHook {
3528        async fn compensate(&self, _context: CompensationContext) -> Result<()> {
3529            {
3530                let mut calls = self.calls.lock().await;
3531                *calls += 1;
3532            }
3533            let mut failures_remaining = self.failures_remaining.lock().await;
3534            if *failures_remaining > 0 {
3535                *failures_remaining -= 1;
3536                return Err(anyhow::anyhow!("transient compensation failure"));
3537            }
3538            Ok(())
3539        }
3540    }
3541
3542    #[derive(Clone)]
3543    struct FailingCompensationIdempotencyStore {
3544        read_calls: Arc<Mutex<u32>>,
3545        write_calls: Arc<Mutex<u32>>,
3546    }
3547
3548    #[async_trait]
3549    impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
3550        async fn was_compensated(&self, _key: &str) -> Result<bool> {
3551            let mut read_calls = self.read_calls.lock().await;
3552            *read_calls += 1;
3553            Err(anyhow::anyhow!("forced idempotency read failure"))
3554        }
3555
3556        async fn mark_compensated(&self, _key: &str) -> Result<()> {
3557            let mut write_calls = self.write_calls.lock().await;
3558            *write_calls += 1;
3559            Err(anyhow::anyhow!("forced idempotency write failure"))
3560        }
3561    }
3562
3563    #[tokio::test]
3564    async fn execute_persists_success_trace_when_handle_exists() {
3565        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3566        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3567
3568        let mut bus = Bus::new();
3569        bus.insert(PersistenceHandle::from_arc(store_dyn));
3570        bus.insert(PersistenceTraceId::new("trace-success"));
3571
3572        let axon = Axon::<i32, i32, TestInfallible>::start("PersistSuccess").then(AddOne);
3573        let outcome = axon.execute(41, &(), &mut bus).await;
3574        assert!(matches!(outcome, Outcome::Next(42)));
3575
3576        let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
3577        assert_eq!(persisted.events.len(), 3); // Enter + step-level Next + final Next
3578        assert_eq!(persisted.events[0].outcome_kind, "Enter");
3579        assert_eq!(persisted.events[1].outcome_kind, "Next"); // step-level
3580        assert_eq!(persisted.events[2].outcome_kind, "Next"); // final
3581        assert_eq!(persisted.completion, Some(CompletionState::Success));
3582    }
3583
3584    #[tokio::test]
3585    async fn execute_persists_fault_completion_state() {
3586        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3587        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3588
3589        let mut bus = Bus::new();
3590        bus.insert(PersistenceHandle::from_arc(store_dyn));
3591        bus.insert(PersistenceTraceId::new("trace-fault"));
3592
3593        let axon = Axon::<i32, i32, String>::start("PersistFault").then(AlwaysFault);
3594        let outcome = axon.execute(41, &(), &mut bus).await;
3595        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3596
3597        let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
3598        assert_eq!(persisted.events.len(), 3); // Enter + step-level Fault + final Fault
3599        assert_eq!(persisted.events[1].outcome_kind, "Fault"); // step-level
3600        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3601        assert_eq!(persisted.completion, Some(CompletionState::Fault));
3602    }
3603
3604    #[tokio::test]
3605    async fn execute_respects_persistence_auto_complete_off() {
3606        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3607        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3608
3609        let mut bus = Bus::new();
3610        bus.insert(PersistenceHandle::from_arc(store_dyn));
3611        bus.insert(PersistenceTraceId::new("trace-no-complete"));
3612        bus.insert(PersistenceAutoComplete(false));
3613
3614        let axon = Axon::<i32, i32, TestInfallible>::start("PersistNoComplete").then(AddOne);
3615        let outcome = axon.execute(1, &(), &mut bus).await;
3616        assert!(matches!(outcome, Outcome::Next(2)));
3617
3618        let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
3619        assert_eq!(persisted.events.len(), 3); // Enter + step-level Next + final Next
3620        assert_eq!(persisted.completion, None);
3621    }
3622
3623    #[tokio::test]
3624    async fn fault_triggers_compensation_and_marks_compensated() {
3625        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3626        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3627        let calls = Arc::new(Mutex::new(Vec::new()));
3628        let compensation = RecordingCompensationHook {
3629            calls: calls.clone(),
3630            should_fail: false,
3631        };
3632
3633        let mut bus = Bus::new();
3634        bus.insert(PersistenceHandle::from_arc(store_dyn));
3635        bus.insert(PersistenceTraceId::new("trace-compensated"));
3636        bus.insert(CompensationHandle::from_hook(compensation));
3637
3638        let axon = Axon::<i32, i32, String>::start("CompensatedFault").then(AlwaysFault);
3639        let outcome = axon.execute(7, &(), &mut bus).await;
3640        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3641
3642        let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
3643        assert_eq!(persisted.events.len(), 4); // Enter + step-level Fault + final Fault + Compensated
3644        assert_eq!(persisted.events[0].outcome_kind, "Enter");
3645        assert_eq!(persisted.events[1].outcome_kind, "Fault"); // step-level
3646        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3647        assert_eq!(persisted.events[3].outcome_kind, "Compensated");
3648        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3649
3650        let recorded = calls.lock().await;
3651        assert_eq!(recorded.len(), 1);
3652        assert_eq!(recorded[0].trace_id, "trace-compensated");
3653        assert_eq!(recorded[0].fault_kind, "Fault");
3654    }
3655
3656    #[tokio::test]
3657    async fn failed_compensation_keeps_fault_completion() {
3658        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3659        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3660        let calls = Arc::new(Mutex::new(Vec::new()));
3661        let compensation = RecordingCompensationHook {
3662            calls: calls.clone(),
3663            should_fail: true,
3664        };
3665
3666        let mut bus = Bus::new();
3667        bus.insert(PersistenceHandle::from_arc(store_dyn));
3668        bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
3669        bus.insert(CompensationHandle::from_hook(compensation));
3670
3671        let axon = Axon::<i32, i32, String>::start("CompensationFails").then(AlwaysFault);
3672        let outcome = axon.execute(7, &(), &mut bus).await;
3673        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3674
3675        let persisted = store_impl
3676            .load("trace-compensation-failed")
3677            .await
3678            .unwrap()
3679            .unwrap();
3680        assert_eq!(persisted.events.len(), 3); // Enter + step-level Fault + final Fault
3681        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3682        assert_eq!(persisted.completion, Some(CompletionState::Fault));
3683
3684        let recorded = calls.lock().await;
3685        assert_eq!(recorded.len(), 1);
3686    }
3687
3688    #[tokio::test]
3689    async fn compensation_retry_policy_succeeds_after_retries() {
3690        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3691        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3692        let calls = Arc::new(Mutex::new(0u32));
3693        let failures_remaining = Arc::new(Mutex::new(2u32));
3694        let compensation = FlakyCompensationHook {
3695            calls: calls.clone(),
3696            failures_remaining,
3697        };
3698
3699        let mut bus = Bus::new();
3700        bus.insert(PersistenceHandle::from_arc(store_dyn));
3701        bus.insert(PersistenceTraceId::new("trace-retry-success"));
3702        bus.insert(CompensationHandle::from_hook(compensation));
3703        bus.insert(CompensationRetryPolicy {
3704            max_attempts: 3,
3705            backoff_ms: 0,
3706        });
3707
3708        let axon = Axon::<i32, i32, String>::start("CompensationRetry").then(AlwaysFault);
3709        let outcome = axon.execute(7, &(), &mut bus).await;
3710        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3711
3712        let persisted = store_impl
3713            .load("trace-retry-success")
3714            .await
3715            .unwrap()
3716            .unwrap();
3717        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3718        assert_eq!(
3719            persisted.events.last().map(|e| e.outcome_kind.as_str()),
3720            Some("Compensated")
3721        );
3722
3723        let attempt_count = calls.lock().await;
3724        assert_eq!(*attempt_count, 3);
3725    }
3726
3727    #[tokio::test]
3728    async fn compensation_idempotency_skips_duplicate_hook_execution() {
3729        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3730        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3731        let calls = Arc::new(Mutex::new(Vec::new()));
3732        let compensation = RecordingCompensationHook {
3733            calls: calls.clone(),
3734            should_fail: false,
3735        };
3736        let idempotency = InMemoryCompensationIdempotencyStore::new();
3737
3738        let mut bus = Bus::new();
3739        bus.insert(PersistenceHandle::from_arc(store_dyn));
3740        bus.insert(PersistenceTraceId::new("trace-idempotent"));
3741        bus.insert(PersistenceAutoComplete(false));
3742        bus.insert(CompensationHandle::from_hook(compensation));
3743        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3744
3745        let axon = Axon::<i32, i32, String>::start("CompensationIdempotency").then(AlwaysFault);
3746
3747        let outcome1 = axon.execute(7, &(), &mut bus).await;
3748        let outcome2 = axon.execute(8, &(), &mut bus).await;
3749        assert!(matches!(outcome1, Outcome::Fault(msg) if msg == "boom"));
3750        assert!(matches!(outcome2, Outcome::Fault(msg) if msg == "boom"));
3751
3752        let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
3753        assert_eq!(persisted.completion, None);
3754        // Verify that "Compensated" events are present for both executions
3755        let compensated_count = persisted
3756            .events
3757            .iter()
3758            .filter(|e| e.outcome_kind == "Compensated")
3759            .count();
3760        assert_eq!(
3761            compensated_count, 2,
3762            "Should have 2 Compensated events (one per execution)"
3763        );
3764
3765        let recorded = calls.lock().await;
3766        assert_eq!(recorded.len(), 1);
3767    }
3768
3769    #[tokio::test]
3770    async fn compensation_idempotency_store_failure_does_not_block_compensation() {
3771        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3772        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3773        let calls = Arc::new(Mutex::new(Vec::new()));
3774        let read_calls = Arc::new(Mutex::new(0u32));
3775        let write_calls = Arc::new(Mutex::new(0u32));
3776        let compensation = RecordingCompensationHook {
3777            calls: calls.clone(),
3778            should_fail: false,
3779        };
3780        let idempotency = FailingCompensationIdempotencyStore {
3781            read_calls: read_calls.clone(),
3782            write_calls: write_calls.clone(),
3783        };
3784
3785        let mut bus = Bus::new();
3786        bus.insert(PersistenceHandle::from_arc(store_dyn));
3787        bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
3788        bus.insert(CompensationHandle::from_hook(compensation));
3789        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3790
3791        let axon = Axon::<i32, i32, String>::start("IdempotencyStoreFailure").then(AlwaysFault);
3792        let outcome = axon.execute(9, &(), &mut bus).await;
3793        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3794
3795        let persisted = store_impl
3796            .load("trace-idempotency-store-failure")
3797            .await
3798            .unwrap()
3799            .unwrap();
3800        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3801        assert_eq!(
3802            persisted.events.last().map(|e| e.outcome_kind.as_str()),
3803            Some("Compensated")
3804        );
3805
3806        let recorded = calls.lock().await;
3807        assert_eq!(recorded.len(), 1);
3808        assert_eq!(*read_calls.lock().await, 1);
3809        assert_eq!(*write_calls.lock().await, 1);
3810    }
3811
3812    #[tokio::test]
3813    async fn transition_bus_policy_blocks_unauthorized_resource_access() {
3814        let mut bus = Bus::new();
3815        bus.insert(1_i32);
3816        bus.insert("secret".to_string());
3817
3818        let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
3819        let outcome = axon.execute((), &(), &mut bus).await;
3820
3821        match outcome {
3822            Outcome::Fault(msg) => {
3823                assert!(msg.contains("Bus access denied"), "{msg}");
3824                assert!(msg.contains("CapabilityGuarded"), "{msg}");
3825                assert!(msg.contains("alloc::string::String"), "{msg}");
3826            }
3827            other => panic!("expected fault, got {other:?}"),
3828        }
3829    }
3830
3831    #[tokio::test]
3832    async fn execute_fails_on_version_mismatch_without_migration() {
3833        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3834        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3835
3836        let trace_id = "v-mismatch";
3837        // Create an existing trace with an older version
3838        let old_envelope = crate::persistence::PersistenceEnvelope {
3839            trace_id: trace_id.to_string(),
3840            circuit: "TestCircuit".to_string(),
3841            schematic_version: "0.9".to_string(),
3842            step: 0,
3843            node_id: None,
3844            outcome_kind: "Enter".to_string(),
3845            timestamp_ms: 0,
3846            payload_hash: None,
3847            payload: None,
3848        };
3849        store_impl.append(old_envelope).await.unwrap();
3850
3851        let mut bus = Bus::new();
3852        bus.insert(PersistenceHandle::from_arc(store_dyn));
3853        bus.insert(PersistenceTraceId::new(trace_id));
3854
3855        // Current axon is version 1.0
3856        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3857        let outcome = axon.execute(10, &(), &mut bus).await;
3858
3859        if let Outcome::Emit(kind, _) = outcome {
3860            assert_eq!(kind, "execution.resumption.version_mismatch_failed");
3861        } else {
3862            panic!("Expected version mismatch emission, got {:?}", outcome);
3863        }
3864    }
3865
3866    #[tokio::test]
3867    async fn execute_resumes_from_start_on_migration_strategy() {
3868        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3869        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3870
3871        let trace_id = "v-migration";
3872        // Create an existing trace with an older version at step 5
3873        let old_envelope = crate::persistence::PersistenceEnvelope {
3874            trace_id: trace_id.to_string(),
3875            circuit: "TestCircuit".to_string(),
3876            schematic_version: "0.9".to_string(),
3877            step: 5,
3878            node_id: None,
3879            outcome_kind: "Next".to_string(),
3880            timestamp_ms: 0,
3881            payload_hash: None,
3882            payload: None,
3883        };
3884        store_impl.append(old_envelope).await.unwrap();
3885
3886        let mut registry = ranvier_core::schematic::MigrationRegistry::new("TestCircuit");
3887        registry.register(ranvier_core::schematic::SnapshotMigration {
3888            name: Some("v0.9 to v1.0".to_string()),
3889            from_version: "0.9".to_string(),
3890            to_version: "1.0".to_string(),
3891            default_strategy: ranvier_core::schematic::MigrationStrategy::ResumeFromStart,
3892            node_mapping: std::collections::HashMap::new(),
3893            payload_mapper: None,
3894        });
3895
3896        let mut bus = Bus::new();
3897        bus.insert(PersistenceHandle::from_arc(store_dyn));
3898        bus.insert(PersistenceTraceId::new(trace_id));
3899        bus.insert(registry);
3900
3901        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3902        let outcome = axon.execute(10, &(), &mut bus).await;
3903
3904        // Should have resumed from start (step 0), resulting in 11
3905        assert!(matches!(outcome, Outcome::Next(11)));
3906
3907        // Verify new event has current version
3908        let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3909        assert_eq!(persisted.schematic_version, "1.0");
3910    }
3911
3912    #[tokio::test]
3913    async fn execute_applies_manual_intervention_jump_and_payload() {
3914        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3915        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3916
3917        let trace_id = "intervention-test";
3918        // 1. Run a normal trace part-way
3919        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit")
3920            .then(AddOne)
3921            .then(AddOne);
3922
3923        let mut bus = Bus::new();
3924        bus.insert(PersistenceHandle::from_arc(store_dyn));
3925        bus.insert(PersistenceTraceId::new(trace_id));
3926
3927        // Save an intervention: Jump to the second 'AddOne' node (which has the label 'AddOne')
3928        // with a payload override of 100.
3929        // The first node is 'AddOne', the second is ALSO 'AddOne'.
3930        // Schematic position: 0=Ingress, 1=AddOne, 2=AddOne
3931        let _target_node_label = "AddOne";
3932        // To be precise, let's find the ID of the second node.
3933        let target_node_id = axon.schematic.nodes[2].id.clone();
3934
3935        // Pre-seed an initial trace entry so save_intervention doesn't fail
3936        store_impl
3937            .append(crate::persistence::PersistenceEnvelope {
3938                trace_id: trace_id.to_string(),
3939                circuit: "TestCircuit".to_string(),
3940                schematic_version: "1.0".to_string(),
3941                step: 0,
3942                node_id: None,
3943                outcome_kind: "Enter".to_string(),
3944                timestamp_ms: 0,
3945                payload_hash: None,
3946                payload: None,
3947            })
3948            .await
3949            .unwrap();
3950
3951        store_impl
3952            .save_intervention(
3953                trace_id,
3954                crate::persistence::Intervention {
3955                    target_node: target_node_id.clone(),
3956                    payload_override: Some(serde_json::json!(100)),
3957                    timestamp_ms: 0,
3958                },
3959            )
3960            .await
3961            .unwrap();
3962
3963        // 2. Execute. It should skip the first AddOne and use 100 for the second AddOne.
3964        // Result should be 100 + 1 = 101.
3965        let outcome = axon.execute(10, &(), &mut bus).await;
3966
3967        match outcome {
3968            Outcome::Next(val) => assert_eq!(val, 101, "Should have used payload 100 and added 1"),
3969            other => panic!("Expected Outcome::Next(101), got {:?}", other),
3970        }
3971
3972        // Verify the jump was logged in trace
3973        let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3974        // The last event should be from the jump target's execution.
3975        assert_eq!(persisted.interventions.len(), 1);
3976        assert_eq!(persisted.interventions[0].target_node, target_node_id);
3977    }
3978
3979    // ── DLQ Retry Tests ──────────────────────────────────────────────
3980
3981    /// A transition that fails a configurable number of times before succeeding.
3982    #[derive(Clone)]
3983    struct FailNThenSucceed {
3984        remaining: Arc<tokio::sync::Mutex<u32>>,
3985    }
3986
3987    #[async_trait]
3988    impl Transition<i32, i32> for FailNThenSucceed {
3989        type Error = String;
3990        type Resources = ();
3991
3992        async fn run(
3993            &self,
3994            state: i32,
3995            _resources: &Self::Resources,
3996            _bus: &mut Bus,
3997        ) -> Outcome<i32, Self::Error> {
3998            let mut rem = self.remaining.lock().await;
3999            if *rem > 0 {
4000                *rem -= 1;
4001                Outcome::Fault("transient failure".to_string())
4002            } else {
4003                Outcome::Next(state + 1)
4004            }
4005        }
4006    }
4007
4008    /// A mock DLQ sink that records all dead letters.
4009    #[derive(Clone)]
4010    struct MockDlqSink {
4011        letters: Arc<tokio::sync::Mutex<Vec<String>>>,
4012    }
4013
4014    #[async_trait]
4015    impl DlqSink for MockDlqSink {
4016        async fn store_dead_letter(
4017            &self,
4018            workflow_id: &str,
4019            _circuit_label: &str,
4020            node_id: &str,
4021            error_msg: &str,
4022            _payload: &[u8],
4023        ) -> Result<(), String> {
4024            let entry = format!("{}:{}:{}", workflow_id, node_id, error_msg);
4025            self.letters.lock().await.push(entry);
4026            Ok(())
4027        }
4028    }
4029
4030    #[tokio::test]
4031    async fn retry_then_dlq_retries_and_succeeds_before_exhaustion() {
4032        // Fail 2 times, succeed on 3rd attempt. Policy allows 5 attempts.
4033        let remaining = Arc::new(tokio::sync::Mutex::new(2u32));
4034        let trans = FailNThenSucceed { remaining };
4035
4036        let dlq_sink = MockDlqSink {
4037            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4038        };
4039
4040        let mut bus = Bus::new();
4041        bus.insert(Timeline::new());
4042
4043        let axon = Axon::<i32, i32, String>::start("RetrySucceed")
4044            .then(trans)
4045            .with_dlq_policy(DlqPolicy::RetryThenDlq {
4046                max_attempts: 5,
4047                backoff_ms: 1,
4048            })
4049            .with_dlq_sink(dlq_sink.clone());
4050        let outcome = axon.execute(10, &(), &mut bus).await;
4051
4052        // Should succeed (10 + 1 = 11)
4053        assert!(
4054            matches!(outcome, Outcome::Next(11)),
4055            "Expected Next(11), got {:?}",
4056            outcome
4057        );
4058
4059        // No dead letters since it recovered
4060        let letters = dlq_sink.letters.lock().await;
4061        assert!(
4062            letters.is_empty(),
4063            "Should have 0 dead letters, got {}",
4064            letters.len()
4065        );
4066
4067        // Timeline should contain NodeRetry events
4068        let timeline = bus.read::<Timeline>().unwrap();
4069        let retry_count = timeline
4070            .events
4071            .iter()
4072            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4073            .count();
4074        assert_eq!(retry_count, 2, "Should have 2 retry events");
4075    }
4076
4077    #[tokio::test]
4078    async fn retry_then_dlq_exhausts_retries_and_sends_to_dlq() {
4079        // Always fails. Policy allows 3 attempts (1 initial + 2 retries).
4080        let mut bus = Bus::new();
4081        bus.insert(Timeline::new());
4082
4083        let dlq_sink = MockDlqSink {
4084            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4085        };
4086
4087        let axon = Axon::<i32, i32, String>::start("RetryExhaust")
4088            .then(AlwaysFault)
4089            .with_dlq_policy(DlqPolicy::RetryThenDlq {
4090                max_attempts: 3,
4091                backoff_ms: 1,
4092            })
4093            .with_dlq_sink(dlq_sink.clone());
4094        let outcome = axon.execute(42, &(), &mut bus).await;
4095
4096        assert!(
4097            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4098            "Expected Fault(boom), got {:?}",
4099            outcome
4100        );
4101
4102        // Should have exactly 1 dead letter
4103        let letters = dlq_sink.letters.lock().await;
4104        assert_eq!(letters.len(), 1, "Should have 1 dead letter");
4105
4106        // Timeline should have 2 retry events and 1 DlqExhausted event
4107        let timeline = bus.read::<Timeline>().unwrap();
4108        let retry_count = timeline
4109            .events
4110            .iter()
4111            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4112            .count();
4113        let dlq_count = timeline
4114            .events
4115            .iter()
4116            .filter(|e| matches!(e, TimelineEvent::DlqExhausted { .. }))
4117            .count();
4118        assert_eq!(
4119            retry_count, 2,
4120            "Should have 2 retry events (attempts 2 and 3)"
4121        );
4122        assert_eq!(dlq_count, 1, "Should have 1 DlqExhausted event");
4123    }
4124
4125    #[tokio::test]
4126    async fn send_to_dlq_policy_sends_immediately_without_retry() {
4127        let mut bus = Bus::new();
4128        bus.insert(Timeline::new());
4129
4130        let dlq_sink = MockDlqSink {
4131            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4132        };
4133
4134        let axon = Axon::<i32, i32, String>::start("SendDlq")
4135            .then(AlwaysFault)
4136            .with_dlq_policy(DlqPolicy::SendToDlq)
4137            .with_dlq_sink(dlq_sink.clone());
4138        let outcome = axon.execute(1, &(), &mut bus).await;
4139
4140        assert!(matches!(outcome, Outcome::Fault(_)));
4141
4142        // Should have exactly 1 dead letter (immediate, no retries)
4143        let letters = dlq_sink.letters.lock().await;
4144        assert_eq!(letters.len(), 1);
4145
4146        // No retry or DlqExhausted events
4147        let timeline = bus.read::<Timeline>().unwrap();
4148        let retry_count = timeline
4149            .events
4150            .iter()
4151            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4152            .count();
4153        assert_eq!(retry_count, 0);
4154    }
4155
4156    #[tokio::test]
4157    async fn drop_policy_does_not_send_to_dlq() {
4158        let mut bus = Bus::new();
4159
4160        let dlq_sink = MockDlqSink {
4161            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4162        };
4163
4164        let axon = Axon::<i32, i32, String>::start("DropDlq")
4165            .then(AlwaysFault)
4166            .with_dlq_policy(DlqPolicy::Drop)
4167            .with_dlq_sink(dlq_sink.clone());
4168        let outcome = axon.execute(1, &(), &mut bus).await;
4169
4170        assert!(matches!(outcome, Outcome::Fault(_)));
4171
4172        // No dead letters
4173        let letters = dlq_sink.letters.lock().await;
4174        assert!(letters.is_empty());
4175    }
4176
4177    #[tokio::test]
4178    async fn dynamic_policy_hot_reload_changes_dlq_behavior() {
4179        use ranvier_core::policy::DynamicPolicy;
4180
4181        // Start with Drop policy (no DLQ)
4182        let (tx, dynamic) = DynamicPolicy::new(DlqPolicy::Drop);
4183        let dlq_sink = MockDlqSink {
4184            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4185        };
4186
4187        let axon = Axon::<i32, i32, String>::start("DynamicDlq")
4188            .then(AlwaysFault)
4189            .with_dynamic_dlq_policy(dynamic)
4190            .with_dlq_sink(dlq_sink.clone());
4191
4192        // First execution: Drop policy → no dead letters
4193        let mut bus = Bus::new();
4194        let outcome = axon.execute(1, &(), &mut bus).await;
4195        assert!(matches!(outcome, Outcome::Fault(_)));
4196        assert!(
4197            dlq_sink.letters.lock().await.is_empty(),
4198            "Drop policy should produce no DLQ entries"
4199        );
4200
4201        // Hot-reload: switch to SendToDlq
4202        tx.send(DlqPolicy::SendToDlq).unwrap();
4203
4204        // Second execution: SendToDlq policy → dead letter captured
4205        let mut bus2 = Bus::new();
4206        let outcome2 = axon.execute(2, &(), &mut bus2).await;
4207        assert!(matches!(outcome2, Outcome::Fault(_)));
4208        assert_eq!(
4209            dlq_sink.letters.lock().await.len(),
4210            1,
4211            "SendToDlq policy should produce 1 DLQ entry"
4212        );
4213    }
4214
4215    #[tokio::test]
4216    async fn dynamic_saga_policy_hot_reload() {
4217        use ranvier_core::policy::DynamicPolicy;
4218        use ranvier_core::saga::SagaPolicy;
4219
4220        // Start with Disabled saga
4221        let (tx, dynamic) = DynamicPolicy::new(SagaPolicy::Disabled);
4222
4223        let axon = Axon::<i32, i32, TestInfallible>::start("DynamicSaga")
4224            .then(AddOne)
4225            .with_dynamic_saga_policy(dynamic);
4226
4227        // First execution: Disabled → no SagaStack in bus
4228        let mut bus = Bus::new();
4229        let _outcome = axon.execute(1, &(), &mut bus).await;
4230        assert!(
4231            bus.read::<SagaStack>().is_none() || bus.read::<SagaStack>().unwrap().is_empty(),
4232            "SagaStack should be absent or empty when disabled"
4233        );
4234
4235        // Hot-reload: enable saga
4236        tx.send(SagaPolicy::Enabled).unwrap();
4237
4238        // Second execution: Enabled → SagaStack populated
4239        let mut bus2 = Bus::new();
4240        let _outcome2 = axon.execute(10, &(), &mut bus2).await;
4241        assert!(
4242            bus2.read::<SagaStack>().is_some(),
4243            "SagaStack should exist when saga is enabled"
4244        );
4245    }
4246
4247    // ── IAM Boundary Tests ──────────────────────────────────────
4248
4249    mod iam_tests {
4250        use super::*;
4251        use ranvier_core::iam::{IamError, IamIdentity, IamPolicy, IamToken, IamVerifier};
4252
4253        /// Mock IamVerifier that returns a fixed identity.
4254        #[derive(Clone)]
4255        struct MockVerifier {
4256            identity: IamIdentity,
4257            should_fail: bool,
4258        }
4259
4260        #[async_trait]
4261        impl IamVerifier for MockVerifier {
4262            async fn verify(&self, _token: &str) -> Result<IamIdentity, IamError> {
4263                if self.should_fail {
4264                    Err(IamError::InvalidToken("mock verification failure".into()))
4265                } else {
4266                    Ok(self.identity.clone())
4267                }
4268            }
4269        }
4270
4271        #[tokio::test]
4272        async fn iam_require_identity_passes_with_valid_token() {
4273            let verifier = MockVerifier {
4274                identity: IamIdentity::new("alice").with_role("user"),
4275                should_fail: false,
4276            };
4277
4278            let axon = Axon::<i32, i32, TestInfallible>::new("IamTest")
4279                .with_iam(IamPolicy::RequireIdentity, verifier)
4280                .then(AddOne);
4281
4282            let mut bus = Bus::new();
4283            bus.insert(IamToken("valid-token".to_string()));
4284            let outcome = axon.execute(10, &(), &mut bus).await;
4285
4286            assert!(matches!(outcome, Outcome::Next(11)));
4287            // Verify IamIdentity was injected into Bus
4288            let identity = bus
4289                .read::<IamIdentity>()
4290                .expect("IamIdentity should be in Bus");
4291            assert_eq!(identity.subject, "alice");
4292        }
4293
4294        #[tokio::test]
4295        async fn iam_require_identity_rejects_missing_token() {
4296            let verifier = MockVerifier {
4297                identity: IamIdentity::new("ignored"),
4298                should_fail: false,
4299            };
4300
4301            let axon = Axon::<i32, i32, TestInfallible>::new("IamNoToken")
4302                .with_iam(IamPolicy::RequireIdentity, verifier)
4303                .then(AddOne);
4304
4305            let mut bus = Bus::new();
4306            // No IamToken inserted
4307            let outcome = axon.execute(10, &(), &mut bus).await;
4308
4309            // Should emit missing_token event
4310            match &outcome {
4311                Outcome::Emit(label, _) => {
4312                    assert_eq!(label, "iam.missing_token");
4313                }
4314                other => panic!("Expected Emit(iam.missing_token), got {:?}", other),
4315            }
4316        }
4317
4318        #[tokio::test]
4319        async fn iam_rejects_failed_verification() {
4320            let verifier = MockVerifier {
4321                identity: IamIdentity::new("ignored"),
4322                should_fail: true,
4323            };
4324
4325            let axon = Axon::<i32, i32, TestInfallible>::new("IamBadToken")
4326                .with_iam(IamPolicy::RequireIdentity, verifier)
4327                .then(AddOne);
4328
4329            let mut bus = Bus::new();
4330            bus.insert(IamToken("bad-token".to_string()));
4331            let outcome = axon.execute(10, &(), &mut bus).await;
4332
4333            match &outcome {
4334                Outcome::Emit(label, _) => {
4335                    assert_eq!(label, "iam.verification_failed");
4336                }
4337                other => panic!("Expected Emit(iam.verification_failed), got {:?}", other),
4338            }
4339        }
4340
4341        #[tokio::test]
4342        async fn iam_require_role_passes_with_matching_role() {
4343            let verifier = MockVerifier {
4344                identity: IamIdentity::new("bob").with_role("admin").with_role("user"),
4345                should_fail: false,
4346            };
4347
4348            let axon = Axon::<i32, i32, TestInfallible>::new("IamRole")
4349                .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
4350                .then(AddOne);
4351
4352            let mut bus = Bus::new();
4353            bus.insert(IamToken("token".to_string()));
4354            let outcome = axon.execute(5, &(), &mut bus).await;
4355
4356            assert!(matches!(outcome, Outcome::Next(6)));
4357        }
4358
4359        #[tokio::test]
4360        async fn iam_require_role_denies_without_role() {
4361            let verifier = MockVerifier {
4362                identity: IamIdentity::new("carol").with_role("user"),
4363                should_fail: false,
4364            };
4365
4366            let axon = Axon::<i32, i32, TestInfallible>::new("IamRoleDeny")
4367                .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
4368                .then(AddOne);
4369
4370            let mut bus = Bus::new();
4371            bus.insert(IamToken("token".to_string()));
4372            let outcome = axon.execute(5, &(), &mut bus).await;
4373
4374            match &outcome {
4375                Outcome::Emit(label, _) => {
4376                    assert_eq!(label, "iam.policy_denied");
4377                }
4378                other => panic!("Expected Emit(iam.policy_denied), got {:?}", other),
4379            }
4380        }
4381
4382        #[tokio::test]
4383        async fn iam_policy_none_skips_verification() {
4384            let verifier = MockVerifier {
4385                identity: IamIdentity::new("ignored"),
4386                should_fail: true, // would fail if actually called
4387            };
4388
4389            let axon = Axon::<i32, i32, TestInfallible>::new("IamNone")
4390                .with_iam(IamPolicy::None, verifier)
4391                .then(AddOne);
4392
4393            let mut bus = Bus::new();
4394            // No token needed when policy is None
4395            let outcome = axon.execute(10, &(), &mut bus).await;
4396
4397            assert!(matches!(outcome, Outcome::Next(11)));
4398        }
4399    }
4400
4401    // ── Schema Propagation Tests (M201-RQ9, RQ12) ──────────────────
4402
4403    #[derive(Clone)]
4404    struct SchemaTransition;
4405
4406    #[async_trait]
4407    impl Transition<String, String> for SchemaTransition {
4408        type Error = String;
4409        type Resources = ();
4410
4411        fn input_schema(&self) -> Option<serde_json::Value> {
4412            Some(serde_json::json!({
4413                "type": "object",
4414                "required": ["name"],
4415                "properties": {
4416                    "name": { "type": "string" }
4417                }
4418            }))
4419        }
4420
4421        async fn run(
4422            &self,
4423            state: String,
4424            _resources: &Self::Resources,
4425            _bus: &mut Bus,
4426        ) -> Outcome<String, Self::Error> {
4427            Outcome::Next(state)
4428        }
4429    }
4430
4431    #[test]
4432    fn then_auto_populates_input_schema_from_transition() {
4433        let axon = Axon::<String, String, String>::new("SchemaTest").then(SchemaTransition);
4434
4435        // The last node (added by .then()) should have input_schema
4436        let last_node = axon.schematic.nodes.last().unwrap();
4437        assert!(last_node.input_schema.is_some());
4438        let schema = last_node.input_schema.as_ref().unwrap();
4439        assert_eq!(schema["type"], "object");
4440        assert_eq!(schema["required"][0], "name");
4441    }
4442
4443    #[test]
4444    fn then_leaves_input_schema_none_when_not_provided() {
4445        let axon = Axon::<i32, i32, TestInfallible>::new("NoSchema").then(AddOne);
4446
4447        let last_node = axon.schematic.nodes.last().unwrap();
4448        assert!(last_node.input_schema.is_none());
4449    }
4450
4451    #[test]
4452    fn with_input_schema_value_sets_on_last_node() {
4453        let schema = serde_json::json!({"type": "integer"});
4454        let axon = Axon::<i32, i32, TestInfallible>::new("ManualSchema")
4455            .then(AddOne)
4456            .with_input_schema_value(schema.clone());
4457
4458        let last_node = axon.schematic.nodes.last().unwrap();
4459        assert_eq!(last_node.input_schema.as_ref().unwrap(), &schema);
4460    }
4461
4462    #[test]
4463    fn with_output_schema_value_sets_on_last_node() {
4464        let schema = serde_json::json!({"type": "integer"});
4465        let axon = Axon::<i32, i32, TestInfallible>::new("OutputSchema")
4466            .then(AddOne)
4467            .with_output_schema_value(schema.clone());
4468
4469        let last_node = axon.schematic.nodes.last().unwrap();
4470        assert_eq!(last_node.output_schema.as_ref().unwrap(), &schema);
4471    }
4472
4473    #[test]
4474    fn schematic_export_includes_schema_fields() {
4475        let axon = Axon::<String, String, String>::new("ExportTest")
4476            .then(SchemaTransition)
4477            .with_output_schema_value(serde_json::json!({"type": "string"}));
4478
4479        let json = serde_json::to_value(&axon.schematic).unwrap();
4480        let nodes = json["nodes"].as_array().unwrap();
4481        // Find the SchemaTransition node (last one)
4482        let last = nodes.last().unwrap();
4483        assert!(last.get("input_schema").is_some());
4484        assert_eq!(last["input_schema"]["type"], "object");
4485        assert_eq!(last["output_schema"]["type"], "string");
4486    }
4487
4488    #[test]
4489    fn schematic_export_omits_schema_fields_when_none() {
4490        let axon = Axon::<i32, i32, TestInfallible>::new("NoSchemaExport").then(AddOne);
4491
4492        let json = serde_json::to_value(&axon.schematic).unwrap();
4493        let nodes = json["nodes"].as_array().unwrap();
4494        let last = nodes.last().unwrap();
4495        let obj = last.as_object().unwrap();
4496        assert!(!obj.contains_key("input_schema"));
4497        assert!(!obj.contains_key("output_schema"));
4498    }
4499
4500    #[test]
4501    fn schematic_json_roundtrip_preserves_schemas() {
4502        let axon = Axon::<String, String, String>::new("Roundtrip")
4503            .then(SchemaTransition)
4504            .with_output_schema_value(serde_json::json!({"type": "string"}));
4505
4506        let json_str = serde_json::to_string(&axon.schematic).unwrap();
4507        let deserialized: ranvier_core::schematic::Schematic =
4508            serde_json::from_str(&json_str).unwrap();
4509
4510        let last = deserialized.nodes.last().unwrap();
4511        assert!(last.input_schema.is_some());
4512        assert!(last.output_schema.is_some());
4513        assert_eq!(last.input_schema.as_ref().unwrap()["required"][0], "name");
4514        assert_eq!(last.output_schema.as_ref().unwrap()["type"], "string");
4515    }
4516
4517    // Test transitions for new unit tests
4518    #[derive(Clone)]
4519    struct MultiplyByTwo;
4520
4521    #[async_trait]
4522    impl Transition<i32, i32> for MultiplyByTwo {
4523        type Error = TestInfallible;
4524        type Resources = ();
4525
4526        async fn run(
4527            &self,
4528            state: i32,
4529            _resources: &Self::Resources,
4530            _bus: &mut Bus,
4531        ) -> Outcome<i32, Self::Error> {
4532            Outcome::Next(state * 2)
4533        }
4534    }
4535
4536    #[derive(Clone)]
4537    struct AddTen;
4538
4539    #[async_trait]
4540    impl Transition<i32, i32> for AddTen {
4541        type Error = TestInfallible;
4542        type Resources = ();
4543
4544        async fn run(
4545            &self,
4546            state: i32,
4547            _resources: &Self::Resources,
4548            _bus: &mut Bus,
4549        ) -> Outcome<i32, Self::Error> {
4550            Outcome::Next(state + 10)
4551        }
4552    }
4553
4554    #[derive(Clone)]
4555    struct AddOneString;
4556
4557    #[async_trait]
4558    impl Transition<i32, i32> for AddOneString {
4559        type Error = String;
4560        type Resources = ();
4561
4562        async fn run(
4563            &self,
4564            state: i32,
4565            _resources: &Self::Resources,
4566            _bus: &mut Bus,
4567        ) -> Outcome<i32, Self::Error> {
4568            Outcome::Next(state + 1)
4569        }
4570    }
4571
4572    #[derive(Clone)]
4573    struct AddTenString;
4574
4575    #[async_trait]
4576    impl Transition<i32, i32> for AddTenString {
4577        type Error = String;
4578        type Resources = ();
4579
4580        async fn run(
4581            &self,
4582            state: i32,
4583            _resources: &Self::Resources,
4584            _bus: &mut Bus,
4585        ) -> Outcome<i32, Self::Error> {
4586            Outcome::Next(state + 10)
4587        }
4588    }
4589
4590    #[tokio::test]
4591    async fn axon_single_step_chain_executes_and_returns_next() {
4592        let mut bus = Bus::new();
4593        let axon = Axon::<i32, i32, TestInfallible>::start("SingleStep").then(AddOne);
4594
4595        let outcome = axon.execute(5, &(), &mut bus).await;
4596        assert!(matches!(outcome, Outcome::Next(6)));
4597    }
4598
4599    #[tokio::test]
4600    async fn axon_three_step_chain_executes_in_order() {
4601        let mut bus = Bus::new();
4602        let axon = Axon::<i32, i32, TestInfallible>::start("ThreeStep")
4603            .then(AddOne)
4604            .then(MultiplyByTwo)
4605            .then(AddTen);
4606
4607        // Starting with 5: AddOne -> 6, MultiplyByTwo -> 12, AddTen -> 22
4608        let outcome = axon.execute(5, &(), &mut bus).await;
4609        assert!(matches!(outcome, Outcome::Next(22)));
4610    }
4611
4612    #[tokio::test]
4613    async fn axon_with_fault_in_middle_step_propagates_error() {
4614        let mut bus = Bus::new();
4615
4616        // Create a 3-step chain where the middle step faults
4617        // Step 1: AddOneString (5 -> 6)
4618        // Step 2: AlwaysFault (should fault here)
4619        // Step 3: AddTenString (never reached)
4620        let axon = Axon::<i32, i32, String>::start("FaultInMiddle")
4621            .then(AddOneString)
4622            .then(AlwaysFault)
4623            .then(AddTenString);
4624
4625        let outcome = axon.execute(5, &(), &mut bus).await;
4626        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
4627    }
4628
4629    #[test]
4630    fn axon_schematic_has_correct_node_count_after_chaining() {
4631        let axon = Axon::<i32, i32, TestInfallible>::start("NodeCount")
4632            .then(AddOne)
4633            .then(MultiplyByTwo)
4634            .then(AddTen);
4635
4636        // Should have 4 nodes: ingress + 3 transitions
4637        assert_eq!(axon.schematic.nodes.len(), 4);
4638        assert_eq!(axon.schematic.name, "NodeCount");
4639    }
4640
4641    #[tokio::test]
4642    async fn axon_execution_records_timeline_events() {
4643        let mut bus = Bus::new();
4644        bus.insert(Timeline::new());
4645
4646        let axon = Axon::<i32, i32, TestInfallible>::start("TimelineTest")
4647            .then(AddOne)
4648            .then(MultiplyByTwo);
4649
4650        let outcome = axon.execute(3, &(), &mut bus).await;
4651        assert!(matches!(outcome, Outcome::Next(8))); // (3 + 1) * 2 = 8
4652
4653        let timeline = bus.read::<Timeline>().unwrap();
4654
4655        // Should have NodeEnter and NodeExit events
4656        let enter_count = timeline
4657            .events
4658            .iter()
4659            .filter(|e| matches!(e, TimelineEvent::NodeEnter { .. }))
4660            .count();
4661        let exit_count = timeline
4662            .events
4663            .iter()
4664            .filter(|e| matches!(e, TimelineEvent::NodeExit { .. }))
4665            .count();
4666
4667        // Expect at least 1 enter and 1 exit (ingress node)
4668        assert!(enter_count >= 1, "Should have at least 1 NodeEnter event");
4669        assert!(exit_count >= 1, "Should have at least 1 NodeExit event");
4670    }
4671
4672    // ── Parallel Step Tests (M231) ───────────────────────────────
4673
4674    #[tokio::test]
4675    async fn parallel_all_succeed_returns_first_next() {
4676        use super::ParallelStrategy;
4677
4678        let mut bus = Bus::new();
4679        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelAllSucceed")
4680            .parallel(
4681                vec![
4682                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4683                    Arc::new(MultiplyByTwo),
4684                ],
4685                ParallelStrategy::AllMustSucceed,
4686            );
4687
4688        // Input 5: AddOne -> 6, MultiplyByTwo -> 10.
4689        // AllMustSucceed returns the first Next (AddOne = 6).
4690        let outcome = axon.execute(5, &(), &mut bus).await;
4691        assert!(matches!(outcome, Outcome::Next(6)));
4692    }
4693
4694    #[tokio::test]
4695    async fn parallel_all_must_succeed_returns_fault_when_any_fails() {
4696        use super::ParallelStrategy;
4697
4698        let mut bus = Bus::new();
4699        let axon = Axon::<i32, i32, String>::start("ParallelAllFault")
4700            .parallel(
4701                vec![
4702                    Arc::new(AddOneString) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4703                    Arc::new(AlwaysFault),
4704                ],
4705                ParallelStrategy::AllMustSucceed,
4706            );
4707
4708        let outcome = axon.execute(5, &(), &mut bus).await;
4709        assert!(
4710            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4711            "Expected Fault(boom), got {:?}",
4712            outcome
4713        );
4714    }
4715
4716    #[tokio::test]
4717    async fn parallel_any_can_fail_returns_success_despite_fault() {
4718        use super::ParallelStrategy;
4719
4720        let mut bus = Bus::new();
4721        let axon = Axon::<i32, i32, String>::start("ParallelAnyCanFail")
4722            .parallel(
4723                vec![
4724                    Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4725                    Arc::new(AddOneString),
4726                ],
4727                ParallelStrategy::AnyCanFail,
4728            );
4729
4730        // AlwaysFault faults, but AddOneString succeeds (5 + 1 = 6).
4731        let outcome = axon.execute(5, &(), &mut bus).await;
4732        assert!(
4733            matches!(outcome, Outcome::Next(6)),
4734            "Expected Next(6), got {:?}",
4735            outcome
4736        );
4737    }
4738
4739    #[tokio::test]
4740    async fn parallel_any_can_fail_all_fault_returns_first_fault() {
4741        use super::ParallelStrategy;
4742
4743        #[derive(Clone)]
4744        struct AlwaysFault2;
4745        #[async_trait]
4746        impl Transition<i32, i32> for AlwaysFault2 {
4747            type Error = String;
4748            type Resources = ();
4749            async fn run(&self, _state: i32, _resources: &(), _bus: &mut Bus) -> Outcome<i32, String> {
4750                Outcome::Fault("boom2".to_string())
4751            }
4752        }
4753
4754        let mut bus = Bus::new();
4755        let axon = Axon::<i32, i32, String>::start("ParallelAllFault2")
4756            .parallel(
4757                vec![
4758                    Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4759                    Arc::new(AlwaysFault2),
4760                ],
4761                ParallelStrategy::AnyCanFail,
4762            );
4763
4764        let outcome = axon.execute(5, &(), &mut bus).await;
4765        // Should return the first fault
4766        assert!(
4767            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4768            "Expected Fault(boom), got {:?}",
4769            outcome
4770        );
4771    }
4772
4773    #[test]
4774    fn parallel_schematic_has_fanout_fanin_nodes() {
4775        use super::ParallelStrategy;
4776        use ranvier_core::schematic::{EdgeType, NodeKind};
4777
4778        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelSchematic")
4779            .parallel(
4780                vec![
4781                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4782                    Arc::new(MultiplyByTwo),
4783                    Arc::new(AddTen),
4784                ],
4785                ParallelStrategy::AllMustSucceed,
4786            );
4787
4788        // Should have: Ingress + FanOut + 3 branch Atoms + FanIn = 6 nodes
4789        assert_eq!(axon.schematic.nodes.len(), 6);
4790        assert!(matches!(axon.schematic.nodes[1].kind, NodeKind::FanOut));
4791        assert!(matches!(axon.schematic.nodes[2].kind, NodeKind::Atom));
4792        assert!(matches!(axon.schematic.nodes[3].kind, NodeKind::Atom));
4793        assert!(matches!(axon.schematic.nodes[4].kind, NodeKind::Atom));
4794        assert!(matches!(axon.schematic.nodes[5].kind, NodeKind::FanIn));
4795
4796        // Check FanOut description
4797        assert!(axon.schematic.nodes[1]
4798            .description
4799            .as_ref()
4800            .unwrap()
4801            .contains("3 branches"));
4802
4803        // Check parallel edges from FanOut to branches
4804        let parallel_edges: Vec<_> = axon
4805            .schematic
4806            .edges
4807            .iter()
4808            .filter(|e| matches!(e.kind, EdgeType::Parallel))
4809            .collect();
4810        // 3 from FanOut->branches + 3 from branches->FanIn = 6
4811        assert_eq!(parallel_edges.len(), 6);
4812    }
4813
4814    #[tokio::test]
4815    async fn parallel_then_chain_composes_correctly() {
4816        use super::ParallelStrategy;
4817
4818        let mut bus = Bus::new();
4819        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelThenChain")
4820            .then(AddOne)
4821            .parallel(
4822                vec![
4823                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4824                    Arc::new(MultiplyByTwo),
4825                ],
4826                ParallelStrategy::AllMustSucceed,
4827            )
4828            .then(AddTen);
4829
4830        // 5 -> AddOne -> 6 -> Parallel(AddOne=7, MultiplyByTwo=12) -> first=7 -> AddTen -> 17
4831        let outcome = axon.execute(5, &(), &mut bus).await;
4832        assert!(
4833            matches!(outcome, Outcome::Next(17)),
4834            "Expected Next(17), got {:?}",
4835            outcome
4836        );
4837    }
4838
4839    #[tokio::test]
4840    async fn parallel_records_timeline_events() {
4841        use super::ParallelStrategy;
4842        use ranvier_core::timeline::TimelineEvent;
4843
4844        let mut bus = Bus::new();
4845        bus.insert(Timeline::new());
4846
4847        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelTimeline")
4848            .parallel(
4849                vec![
4850                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4851                    Arc::new(MultiplyByTwo),
4852                ],
4853                ParallelStrategy::AllMustSucceed,
4854            );
4855
4856        let outcome = axon.execute(3, &(), &mut bus).await;
4857        assert!(matches!(outcome, Outcome::Next(4)));
4858
4859        let timeline = bus.read::<Timeline>().unwrap();
4860
4861        // Check for FanOut enter/exit and FanIn enter/exit
4862        let fanout_enters = timeline
4863            .events
4864            .iter()
4865            .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanOut"))
4866            .count();
4867        let fanin_enters = timeline
4868            .events
4869            .iter()
4870            .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanIn"))
4871            .count();
4872
4873        assert_eq!(fanout_enters, 1, "Should have 1 FanOut enter");
4874        assert_eq!(fanin_enters, 1, "Should have 1 FanIn enter");
4875    }
4876
4877    // ── Axon::simple() convenience constructor ───────────────────────────────
4878
4879    #[derive(Clone)]
4880    struct Greet;
4881
4882    #[async_trait]
4883    impl Transition<(), String> for Greet {
4884        type Error = String;
4885        type Resources = ();
4886
4887        async fn run(
4888            &self,
4889            _state: (),
4890            _resources: &Self::Resources,
4891            _bus: &mut Bus,
4892        ) -> Outcome<String, Self::Error> {
4893            Outcome::Next("Hello from simple!".to_string())
4894        }
4895    }
4896
4897    #[tokio::test]
4898    async fn axon_simple_creates_pipeline() {
4899        let axon = Axon::simple::<String>("SimpleTest").then(Greet);
4900
4901        let mut bus = Bus::new();
4902        let result = axon.execute((), &(), &mut bus).await;
4903
4904        match result {
4905            Outcome::Next(msg) => assert_eq!(msg, "Hello from simple!"),
4906            other => panic!("Expected Outcome::Next, got {:?}", other),
4907        }
4908    }
4909
4910    #[tokio::test]
4911    async fn axon_simple_equivalent_to_explicit() {
4912        // Axon::simple::<E>("label") should behave identically to Axon::<(), (), E>::new("label")
4913        let simple = Axon::simple::<String>("Equiv").then(Greet);
4914        let explicit = Axon::<(), (), String>::new("Equiv").then(Greet);
4915
4916        let mut bus1 = Bus::new();
4917        let mut bus2 = Bus::new();
4918
4919        let r1 = simple.execute((), &(), &mut bus1).await;
4920        let r2 = explicit.execute((), &(), &mut bus2).await;
4921
4922        match (r1, r2) {
4923            (Outcome::Next(a), Outcome::Next(b)) => assert_eq!(a, b),
4924            _ => panic!("Both should produce Outcome::Next"),
4925        }
4926    }
4927}