Skip to main content

ranvier_runtime/
axon.rs

1//! # Axon: Executable Decision Tree
2//!
3//! The `Axon` is the **runtime execution path** of a Typed Decision Tree.
4//! It functions as a reusable Typed Decision flow (Axon<In, Out, E>).
5//!
6//! ## Design Philosophy
7//!
8//! * **Axon flows, Schematic shows**: Axon executes; Schematic describes
9//! * **Builder pattern**: `Axon::start().then().then()`
10//! * **Schematic extraction**: Every Axon carries its structural metadata
11//!
12//! "Axon is the flowing thing, Schematic is the visible thing."
13
14use crate::persistence::{
15    CompensationAutoTrigger, CompensationContext, CompensationHandle,
16    CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17    PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use async_trait::async_trait;
20use ranvier_audit::{AuditEvent, AuditSink};
21use ranvier_core::bus::Bus;
22use ranvier_core::cluster::DistributedLock;
23use ranvier_core::event::{DlqPolicy, DlqSink};
24use ranvier_core::outcome::Outcome;
25use ranvier_core::policy::DynamicPolicy;
26use ranvier_core::saga::{SagaPolicy, SagaStack};
27use ranvier_core::schematic::{
28    BusCapabilitySchema, Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation,
29};
30use ranvier_core::timeline::{Timeline, TimelineEvent};
31use ranvier_core::transition::Transition;
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use std::any::type_name;
35use std::ffi::OsString;
36use std::fs;
37use std::future::Future;
38use std::panic::Location;
39use std::path::{Path, PathBuf};
40use std::pin::Pin;
41use std::sync::{Arc, Mutex, OnceLock};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tracing::Instrument;
44
45/// Configuration for Execution Mode.
46#[derive(Clone)]
47pub enum ExecutionMode {
48    /// Normal, unpartitioned local execution.
49    Local,
50    /// Singleton execution, ensures only one instance runs across the entire cluster.
51    Singleton {
52        lock_key: String,
53        ttl_ms: u64,
54        lock_provider: Arc<dyn DistributedLock>,
55    },
56}
57
58/// Strategy for parallel step execution.
59///
60/// Controls how the Axon handles faults during parallel branch execution.
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum ParallelStrategy {
63    /// All parallel steps must succeed; if any faults, return the first fault.
64    AllMustSucceed,
65    /// Continue even if some steps fail; return first successful result.
66    /// If all branches fault, returns the first fault.
67    AnyCanFail,
68}
69
70/// Type alias for async boxed futures used in Axon execution.
71pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
72
73/// Executor type for Axon steps.
74/// Now takes an input state `In`, a resource bundle `Res`, and returns an `Outcome<Out, E>`.
75/// Must be Send + Sync to be reusable across threads and clones.
76pub type Executor<In, Out, E, Res> =
77    Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
78
79/// Manual intervention jump command injected into the Bus.
80#[derive(Debug, Clone)]
81pub struct ManualJump {
82    pub target_node: String,
83    pub payload_override: Option<serde_json::Value>,
84}
85
86/// Start step index for resumption, injected into the Bus.
87#[derive(Debug, Clone, Copy)]
88struct StartStep(u64);
89
90/// Persisted state for resumption, injected into the Bus.
91#[derive(Debug, Clone)]
92struct ResumptionState {
93    payload: Option<serde_json::Value>,
94}
95
96/// Helper to extract a readable type name from a type.
97fn type_name_of<T: ?Sized>() -> String {
98    let full = type_name::<T>();
99    full.split("::").last().unwrap_or(full).to_string()
100}
101
102/// The Axon Builder and Runtime.
103///
104/// `Axon` represents an executable decision tree.
105/// It is reusable and thread-safe.
106///
107/// ## Example
108///
109/// ```rust,ignore
110/// use ranvier_core::prelude::*;
111/// // ...
112/// // Start with an identity Axon (In -> In)
113/// let axon = Axon::<(), (), _>::new("My Axon")
114///     .then(StepA)
115///     .then(StepB);
116///
117/// // Execute multiple times
118/// let res1 = axon.execute((), &mut bus1).await;
119/// let res2 = axon.execute((), &mut bus2).await;
120/// ```
121pub struct Axon<In, Out, E, Res = ()> {
122    /// The static structure (for visualization/analysis)
123    pub schematic: Schematic,
124    /// The runtime executor
125    executor: Executor<In, Out, E, Res>,
126    /// How this Axon is executed across the cluster
127    pub execution_mode: ExecutionMode,
128    /// Optional persistence store for state inspection
129    pub persistence_store: Option<Arc<dyn crate::persistence::PersistenceStore>>,
130    /// Optional audit sink for tamper-evident logging of interventions
131    pub audit_sink: Option<Arc<dyn AuditSink>>,
132    /// Optional dead-letter queue sink for storing failed events
133    pub dlq_sink: Option<Arc<dyn DlqSink>>,
134    /// Policy for handling event failures
135    pub dlq_policy: DlqPolicy,
136    /// Optional dynamic (hot-reloadable) DLQ policy — takes precedence over static `dlq_policy`
137    pub dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>,
138    /// Policy for automated saga compensation
139    pub saga_policy: SagaPolicy,
140    /// Optional dynamic (hot-reloadable) Saga policy — takes precedence over static `saga_policy`
141    pub dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>,
142    /// Registry for Saga compensation handlers
143    pub saga_compensation_registry:
144        Arc<std::sync::RwLock<ranvier_core::saga::SagaCompensationRegistry<E, Res>>>,
145    /// Optional IAM handle for identity verification at the Schematic boundary
146    pub iam_handle: Option<ranvier_core::iam::IamHandle>,
147}
148
149/// Schematic export request derived from command-line args/env.
150#[derive(Debug, Clone)]
151pub struct SchematicExportRequest {
152    /// Optional output file path. If omitted, schematic is written to stdout.
153    pub output: Option<PathBuf>,
154}
155
156impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
157    fn clone(&self) -> Self {
158        Self {
159            schematic: self.schematic.clone(),
160            executor: self.executor.clone(),
161            execution_mode: self.execution_mode.clone(),
162            persistence_store: self.persistence_store.clone(),
163            audit_sink: self.audit_sink.clone(),
164            dlq_sink: self.dlq_sink.clone(),
165            dlq_policy: self.dlq_policy.clone(),
166            dynamic_dlq_policy: self.dynamic_dlq_policy.clone(),
167            saga_policy: self.saga_policy.clone(),
168            dynamic_saga_policy: self.dynamic_saga_policy.clone(),
169            saga_compensation_registry: self.saga_compensation_registry.clone(),
170            iam_handle: self.iam_handle.clone(),
171        }
172    }
173}
174
175impl<In, E, Res> Axon<In, In, E, Res>
176where
177    In: Send + Sync + Serialize + DeserializeOwned + 'static,
178    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
179    Res: ranvier_core::transition::ResourceRequirement,
180{
181    /// Create a new Axon flow with the given label.
182    /// This is the preferred entry point per Flat API guidelines.
183    #[track_caller]
184    pub fn new(label: &str) -> Self {
185        let caller = Location::caller();
186        Self::start_with_source(label, caller)
187    }
188
189    /// Start defining a new Axon flow.
190    /// This creates an Identity Axon (In -> In) with no initial resource requirements.
191    #[track_caller]
192    pub fn start(label: &str) -> Self {
193        let caller = Location::caller();
194        Self::start_with_source(label, caller)
195    }
196
197    fn start_with_source(label: &str, caller: &'static Location<'static>) -> Self {
198        let node_id = uuid::Uuid::new_v4().to_string();
199        let node = Node {
200            id: node_id,
201            kind: NodeKind::Ingress,
202            label: label.to_string(),
203            description: None,
204            input_type: "void".to_string(),
205            output_type: type_name_of::<In>(),
206            resource_type: type_name_of::<Res>(),
207            metadata: Default::default(),
208            bus_capability: None,
209            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
210            position: None,
211            compensation_node_id: None,
212            input_schema: None,
213            output_schema: None,
214        };
215
216        let mut schematic = Schematic::new(label);
217        schematic.nodes.push(node);
218
219        let executor: Executor<In, In, E, Res> =
220            Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
221
222        Self {
223            schematic,
224            executor,
225            execution_mode: ExecutionMode::Local,
226            persistence_store: None,
227            audit_sink: None,
228            dlq_sink: None,
229            dlq_policy: DlqPolicy::default(),
230            dynamic_dlq_policy: None,
231            saga_policy: SagaPolicy::default(),
232            dynamic_saga_policy: None,
233            saga_compensation_registry: Arc::new(std::sync::RwLock::new(
234                ranvier_core::saga::SagaCompensationRegistry::new(),
235            )),
236            iam_handle: None,
237        }
238    }
239}
240
241impl Axon<(), (), (), ()> {
242    /// Convenience constructor for simple pipelines with no input state or resources.
243    ///
244    /// Reduces the common `Axon::<(), (), E>::new("label")` turbofish to
245    /// `Axon::simple::<E>("label")`, requiring only the error type parameter.
246    ///
247    /// # Example
248    ///
249    /// ```rust,ignore
250    /// // Before: 3 type parameters, 2 of which are always ()
251    /// let axon = Axon::<(), (), String>::new("pipeline");
252    ///
253    /// // After: only the error type
254    /// let axon = Axon::simple::<String>("pipeline");
255    /// ```
256    #[track_caller]
257    pub fn simple<E>(label: &str) -> Axon<(), (), E, ()>
258    where
259        E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
260    {
261        let caller = Location::caller();
262        <Axon<(), (), E, ()>>::start_with_source(label, caller)
263    }
264}
265
266impl<In, Out, E, Res> Axon<In, Out, E, Res>
267where
268    In: Send + Sync + Serialize + DeserializeOwned + 'static,
269    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
270    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
271    Res: ranvier_core::transition::ResourceRequirement,
272{
273    /// Update the Execution Mode for this Axon (e.g., Local vs Singleton).
274    pub fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
275        self.execution_mode = mode;
276        self
277    }
278
279    /// Set the schematic version for this Axon.
280    pub fn with_version(mut self, version: impl Into<String>) -> Self {
281        self.schematic.schema_version = version.into();
282        self
283    }
284
285    /// Attach a persistence store to enable state inspection via the Inspector.
286    pub fn with_persistence_store<S>(mut self, store: S) -> Self
287    where
288        S: crate::persistence::PersistenceStore + 'static,
289    {
290        self.persistence_store = Some(Arc::new(store));
291        self
292    }
293
294    /// Attach an audit sink for tamper-evident logging.
295    pub fn with_audit_sink<S>(mut self, sink: S) -> Self
296    where
297        S: AuditSink + 'static,
298    {
299        self.audit_sink = Some(Arc::new(sink));
300        self
301    }
302
303    /// Set the Dead Letter Queue sink for this Axon.
304    pub fn with_dlq_sink<S>(mut self, sink: S) -> Self
305    where
306        S: DlqSink + 'static,
307    {
308        self.dlq_sink = Some(Arc::new(sink));
309        self
310    }
311
312    /// Set the Dead Letter Queue policy for this Axon.
313    pub fn with_dlq_policy(mut self, policy: DlqPolicy) -> Self {
314        self.dlq_policy = policy;
315        self
316    }
317
318    /// Set the Saga compensation policy for this Axon.
319    pub fn with_saga_policy(mut self, policy: SagaPolicy) -> Self {
320        self.saga_policy = policy;
321        self
322    }
323
324    /// Set a dynamic (hot-reloadable) DLQ policy. When set, the dynamic policy's
325    /// current value is read at each execution, overriding the static `dlq_policy`.
326    pub fn with_dynamic_dlq_policy(mut self, dynamic: DynamicPolicy<DlqPolicy>) -> Self {
327        self.dynamic_dlq_policy = Some(dynamic);
328        self
329    }
330
331    /// Set a dynamic (hot-reloadable) Saga policy. When set, the dynamic policy's
332    /// current value is read at each execution, overriding the static `saga_policy`.
333    pub fn with_dynamic_saga_policy(mut self, dynamic: DynamicPolicy<SagaPolicy>) -> Self {
334        self.dynamic_saga_policy = Some(dynamic);
335        self
336    }
337
338    /// Set an IAM policy and verifier for identity verification at the Axon boundary.
339    ///
340    /// When set, `execute()` will:
341    /// 1. Read `IamToken` from the Bus (injected by the HTTP layer or test harness)
342    /// 2. Verify the token using the provided verifier
343    /// 3. Enforce the policy against the verified identity
344    /// 4. Insert the resulting `IamIdentity` into the Bus for downstream Transitions
345    pub fn with_iam(
346        mut self,
347        policy: ranvier_core::iam::IamPolicy,
348        verifier: impl ranvier_core::iam::IamVerifier + 'static,
349    ) -> Self {
350        self.iam_handle = Some(ranvier_core::iam::IamHandle::new(
351            policy,
352            Arc::new(verifier),
353        ));
354        self
355    }
356
357    /// Attach a JSON Schema for the **last node's input type** in the schematic.
358    ///
359    /// Requires the `schema` feature and `T: schemars::JsonSchema`.
360    ///
361    /// ```rust,ignore
362    /// let axon = Axon::new("My Circuit")
363    ///     .then(ProcessStep)
364    ///     .with_input_schema::<CreateUserRequest>()
365    ///     .with_output_schema::<UserResponse>();
366    /// ```
367    #[cfg(feature = "schema")]
368    pub fn with_input_schema<T>(mut self) -> Self
369    where
370        T: schemars::JsonSchema,
371    {
372        if let Some(last_node) = self.schematic.nodes.last_mut() {
373            let schema = schemars::schema_for!(T);
374            last_node.input_schema =
375                Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
376        }
377        self
378    }
379
380    /// Attach a JSON Schema for the **last node's output type** in the schematic.
381    ///
382    /// Requires the `schema` feature and `T: schemars::JsonSchema`.
383    #[cfg(feature = "schema")]
384    pub fn with_output_schema<T>(mut self) -> Self
385    where
386        T: schemars::JsonSchema,
387    {
388        if let Some(last_node) = self.schematic.nodes.last_mut() {
389            let schema = schemars::schema_for!(T);
390            last_node.output_schema =
391                Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
392        }
393        self
394    }
395
396    /// Attach a raw JSON Schema value for the **last node's input type** in the schematic.
397    ///
398    /// Use this for pre-built schemas without requiring the `schema` feature.
399    pub fn with_input_schema_value(mut self, schema: serde_json::Value) -> Self {
400        if let Some(last_node) = self.schematic.nodes.last_mut() {
401            last_node.input_schema = Some(schema);
402        }
403        self
404    }
405
406    /// Attach a raw JSON Schema value for the **last node's output type** in the schematic.
407    pub fn with_output_schema_value(mut self, schema: serde_json::Value) -> Self {
408        if let Some(last_node) = self.schematic.nodes.last_mut() {
409            last_node.output_schema = Some(schema);
410        }
411        self
412    }
413}
414
415#[async_trait]
416impl<In, Out, E, Res> ranvier_inspector::StateInspector for Axon<In, Out, E, Res>
417where
418    In: Send + Sync + Serialize + DeserializeOwned + 'static,
419    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
420    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
421    Res: ranvier_core::transition::ResourceRequirement,
422{
423    async fn get_state(&self, trace_id: &str) -> Option<serde_json::Value> {
424        let store = self.persistence_store.as_ref()?;
425        let trace = store.load(trace_id).await.ok().flatten()?;
426        Some(serde_json::to_value(trace).unwrap_or(serde_json::Value::Null))
427    }
428
429    async fn force_resume(
430        &self,
431        trace_id: &str,
432        target_node: &str,
433        payload_override: Option<Value>,
434    ) -> Result<(), String> {
435        let store = self
436            .persistence_store
437            .as_ref()
438            .ok_or("No persistence store attached to Axon")?;
439
440        let intervention = crate::persistence::Intervention {
441            target_node: target_node.to_string(),
442            payload_override,
443            timestamp_ms: now_ms(),
444        };
445
446        store
447            .save_intervention(trace_id, intervention)
448            .await
449            .map_err(|e| format!("Failed to save intervention: {}", e))?;
450
451        if let Some(sink) = self.audit_sink.as_ref() {
452            let event = AuditEvent::new(
453                uuid::Uuid::new_v4().to_string(),
454                "Inspector".to_string(),
455                "ForceResume".to_string(),
456                trace_id.to_string(),
457            )
458            .with_metadata("target_node", target_node);
459
460            let _ = sink.append(&event).await;
461        }
462
463        tracing::info!(trace_id = %trace_id, target_node = %target_node, "Force resume requested via Inspector");
464        Ok(())
465    }
466}
467
468impl<In, Out, E, Res> Axon<In, Out, E, Res>
469where
470    In: Send + Sync + Serialize + DeserializeOwned + 'static,
471    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
472    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
473    Res: ranvier_core::transition::ResourceRequirement,
474{
475    /// Chain a transition to this Axon.
476    ///
477    /// Requires the transition to use the SAME resource bundle as the previous steps.
478    #[track_caller]
479    pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
480    where
481        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
482        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
483    {
484        let caller = Location::caller();
485        // Decompose self to avoid partial move issues
486        let Axon {
487            mut schematic,
488            executor: prev_executor,
489            execution_mode,
490            persistence_store,
491            audit_sink,
492            dlq_sink,
493            dlq_policy,
494            dynamic_dlq_policy,
495            saga_policy,
496            dynamic_saga_policy,
497            saga_compensation_registry,
498            iam_handle,
499        } = self;
500
501        // Update Schematic
502        let next_node_id = uuid::Uuid::new_v4().to_string();
503        let next_node = Node {
504            id: next_node_id.clone(),
505            kind: NodeKind::Atom,
506            label: transition.label(),
507            description: transition.description(),
508            input_type: type_name_of::<Out>(),
509            output_type: type_name_of::<Next>(),
510            resource_type: type_name_of::<Res>(),
511            metadata: Default::default(),
512            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
513            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
514            position: transition
515                .position()
516                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
517            compensation_node_id: None,
518            input_schema: transition.input_schema(),
519            output_schema: None,
520        };
521
522        let last_node_id = schematic
523            .nodes
524            .last()
525            .map(|n| n.id.clone())
526            .unwrap_or_default();
527
528        schematic.nodes.push(next_node);
529        schematic.edges.push(Edge {
530            from: last_node_id,
531            to: next_node_id.clone(),
532            kind: EdgeType::Linear,
533            label: Some("Next".to_string()),
534        });
535
536        // Compose Executor
537        let node_id_for_exec = next_node_id.clone();
538        let node_label_for_exec = transition.label();
539        let bus_policy_for_exec = transition.bus_access_policy();
540        let bus_policy_clone = bus_policy_for_exec.clone();
541        let current_step_idx = schematic.nodes.len() as u64 - 1;
542        let next_executor: Executor<In, Next, E, Res> = Arc::new(
543            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
544                let prev = prev_executor.clone();
545                let trans = transition.clone();
546                let timeline_node_id = node_id_for_exec.clone();
547                let timeline_node_label = node_label_for_exec.clone();
548                let transition_bus_policy = bus_policy_clone.clone();
549                let step_idx = current_step_idx;
550
551                Box::pin(async move {
552                    // Check for manual intervention jump
553                    if let Some(jump) = bus.read::<ManualJump>()
554                        && (jump.target_node == timeline_node_id
555                            || jump.target_node == timeline_node_label)
556                    {
557                        tracing::info!(
558                            node_id = %timeline_node_id,
559                            node_label = %timeline_node_label,
560                            "Manual jump target reached; skipping previous steps"
561                        );
562
563                        let state = if let Some(ow) = jump.payload_override.clone() {
564                            match serde_json::from_value::<Out>(ow) {
565                                Ok(s) => s,
566                                Err(e) => {
567                                    tracing::error!(
568                                        "Payload override deserialization failed: {}",
569                                        e
570                                    );
571                                    return Outcome::emit(
572                                        "execution.jump.payload_error",
573                                        Some(serde_json::json!({"error": e.to_string()})),
574                                    )
575                                    .map(|_: ()| unreachable!());
576                                }
577                            }
578                        } else {
579                            // Default back to the provided input if this is an identity jump or types match
580                            // For now, treat missing payload on a mid-flow jump as an avoidable error if Possible.
581                            // In a better implementation, we'd try to load the last persisted Out for the previous step.
582                            return Outcome::emit(
583                                "execution.jump.missing_payload",
584                                Some(serde_json::json!({"node_id": timeline_node_id})),
585                            );
586                        };
587
588                        // Skip prev() and continue with trans.run(state, ...)
589                        return run_this_step::<Out, Next, E, Res>(
590                            &trans,
591                            state,
592                            res,
593                            bus,
594                            &timeline_node_id,
595                            &timeline_node_label,
596                            &transition_bus_policy,
597                            step_idx,
598                        )
599                        .await;
600                    }
601
602                    // Handle resumption skip
603                    if let Some(start) = bus.read::<StartStep>()
604                        && step_idx == start.0
605                        && bus.read::<ResumptionState>().is_some()
606                    {
607                        // Prefer fresh input (In → Out via JSON round-trip).
608                        // The caller provides updated state (e.g., corrected data after a fault).
609                        // Falls back to persisted checkpoint state when types are incompatible.
610                        let fresh_state = serde_json::to_value(&input)
611                            .ok()
612                            .and_then(|v| serde_json::from_value::<Out>(v).ok());
613                        let persisted_state = bus
614                            .read::<ResumptionState>()
615                            .and_then(|r| r.payload.clone())
616                            .and_then(|p| serde_json::from_value::<Out>(p).ok());
617
618                        if let Some(s) = fresh_state.or(persisted_state) {
619                            tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint");
620                            return run_this_step::<Out, Next, E, Res>(
621                                &trans,
622                                s,
623                                res,
624                                bus,
625                                &timeline_node_id,
626                                &timeline_node_label,
627                                &transition_bus_policy,
628                                step_idx,
629                            )
630                            .await;
631                        }
632
633                        return Outcome::emit(
634                            "execution.resumption.payload_error",
635                            Some(serde_json::json!({"error": "no compatible resumption state"})),
636                        )
637                        .map(|_: ()| unreachable!());
638                    }
639
640                    // Run previous step
641                    let prev_result = prev(input, res, bus).await;
642
643                    // Unpack
644                    let state = match prev_result {
645                        Outcome::Next(t) => t,
646                        other => return other.map(|_| unreachable!()),
647                    };
648
649                    run_this_step::<Out, Next, E, Res>(
650                        &trans,
651                        state,
652                        res,
653                        bus,
654                        &timeline_node_id,
655                        &timeline_node_label,
656                        &transition_bus_policy,
657                        step_idx,
658                    )
659                    .await
660                })
661            },
662        );
663        Axon {
664            schematic,
665            executor: next_executor,
666            execution_mode,
667            persistence_store,
668            audit_sink,
669            dlq_sink,
670            dlq_policy,
671            dynamic_dlq_policy,
672            saga_policy,
673            dynamic_saga_policy,
674            saga_compensation_registry,
675            iam_handle,
676        }
677    }
678
679    /// Chain a transition with a retry policy.
680    ///
681    /// If the transition returns `Outcome::Fault`, it will be retried up to
682    /// `policy.max_retries` times with the configured backoff strategy.
683    /// Timeline events are recorded for each retry attempt.
684    ///
685    /// # Example
686    ///
687    /// ```rust,ignore
688    /// use ranvier_runtime::{Axon, RetryPolicy};
689    /// use std::time::Duration;
690    ///
691    /// let axon = Axon::new("pipeline")
692    ///     .then_with_retry(my_transition, RetryPolicy::fixed(3, Duration::from_millis(100)));
693    /// ```
694    #[track_caller]
695    pub fn then_with_retry<Next, Trans>(
696        self,
697        transition: Trans,
698        policy: crate::retry::RetryPolicy,
699    ) -> Axon<In, Next, E, Res>
700    where
701        Out: Clone,
702        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
703        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
704    {
705        let caller = Location::caller();
706        let Axon {
707            mut schematic,
708            executor: prev_executor,
709            execution_mode,
710            persistence_store,
711            audit_sink,
712            dlq_sink,
713            dlq_policy,
714            dynamic_dlq_policy,
715            saga_policy,
716            dynamic_saga_policy,
717            saga_compensation_registry,
718            iam_handle,
719        } = self;
720
721        let next_node_id = uuid::Uuid::new_v4().to_string();
722        let next_node = Node {
723            id: next_node_id.clone(),
724            kind: NodeKind::Atom,
725            label: transition.label(),
726            description: transition.description(),
727            input_type: type_name_of::<Out>(),
728            output_type: type_name_of::<Next>(),
729            resource_type: type_name_of::<Res>(),
730            metadata: Default::default(),
731            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
732            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
733            position: transition
734                .position()
735                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
736            compensation_node_id: None,
737            input_schema: transition.input_schema(),
738            output_schema: None,
739        };
740
741        let last_node_id = schematic
742            .nodes
743            .last()
744            .map(|n| n.id.clone())
745            .unwrap_or_default();
746
747        schematic.nodes.push(next_node);
748        schematic.edges.push(Edge {
749            from: last_node_id,
750            to: next_node_id.clone(),
751            kind: EdgeType::Linear,
752            label: Some("Next (retryable)".to_string()),
753        });
754
755        let node_id_for_exec = next_node_id.clone();
756        let node_label_for_exec = transition.label();
757        let bus_policy_for_exec = transition.bus_access_policy();
758        let bus_policy_clone = bus_policy_for_exec.clone();
759        let current_step_idx = schematic.nodes.len() as u64 - 1;
760        let next_executor: Executor<In, Next, E, Res> = Arc::new(
761            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
762                let prev = prev_executor.clone();
763                let trans = transition.clone();
764                let timeline_node_id = node_id_for_exec.clone();
765                let timeline_node_label = node_label_for_exec.clone();
766                let transition_bus_policy = bus_policy_clone.clone();
767                let step_idx = current_step_idx;
768                let retry_policy = policy.clone();
769
770                Box::pin(async move {
771                    // Run previous step
772                    let prev_result = prev(input, res, bus).await;
773                    let state = match prev_result {
774                        Outcome::Next(t) => t,
775                        other => return other.map(|_| unreachable!()),
776                    };
777
778                    // Attempt with retries
779                    let mut last_result = None;
780                    for attempt in 0..=retry_policy.max_retries {
781                        let attempt_state = state.clone();
782
783                        let result = run_this_step::<Out, Next, E, Res>(
784                            &trans,
785                            attempt_state,
786                            res,
787                            bus,
788                            &timeline_node_id,
789                            &timeline_node_label,
790                            &transition_bus_policy,
791                            step_idx,
792                        )
793                        .await;
794
795                        match &result {
796                            Outcome::Next(_) => return result,
797                            Outcome::Fault(_) if attempt < retry_policy.max_retries => {
798                                let delay = retry_policy.delay_for_attempt(attempt);
799                                tracing::warn!(
800                                    node_id = %timeline_node_id,
801                                    attempt = attempt + 1,
802                                    max = retry_policy.max_retries,
803                                    delay_ms = delay.as_millis() as u64,
804                                    "Transition failed, retrying"
805                                );
806                                if let Some(timeline) = bus.read_mut::<Timeline>() {
807                                    timeline.push(TimelineEvent::NodeRetry {
808                                        node_id: timeline_node_id.clone(),
809                                        attempt: attempt + 1,
810                                        max_attempts: retry_policy.max_retries,
811                                        backoff_ms: delay.as_millis() as u64,
812                                        timestamp: now_ms(),
813                                    });
814                                }
815                                tokio::time::sleep(delay).await;
816                            }
817                            _ => {
818                                last_result = Some(result);
819                                break;
820                            }
821                        }
822                    }
823
824                    last_result.unwrap_or_else(|| {
825                        Outcome::emit("execution.retry.exhausted", None)
826                    })
827                })
828            },
829        );
830        Axon {
831            schematic,
832            executor: next_executor,
833            execution_mode,
834            persistence_store,
835            audit_sink,
836            dlq_sink,
837            dlq_policy,
838            dynamic_dlq_policy,
839            saga_policy,
840            dynamic_saga_policy,
841            saga_compensation_registry,
842            iam_handle,
843        }
844    }
845
846    /// Chain a transition to this Axon with a timeout guard.
847    ///
848    /// If the transition does not complete within the specified duration,
849    /// the execution is cancelled and a `Fault` is returned using the
850    /// provided error factory.
851    ///
852    /// # Example
853    ///
854    /// ```rust,ignore
855    /// use std::time::Duration;
856    ///
857    /// let pipeline = Axon::simple::<String>("API")
858    ///     .then_with_timeout(
859    ///         slow_handler,
860    ///         Duration::from_secs(5),
861    ///         || "Request timed out after 5 seconds".to_string(),
862    ///     );
863    /// ```
864    #[track_caller]
865    pub fn then_with_timeout<Next, Trans, F>(
866        self,
867        transition: Trans,
868        duration: std::time::Duration,
869        make_timeout_error: F,
870    ) -> Axon<In, Next, E, Res>
871    where
872        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
873        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
874        F: Fn() -> E + Clone + Send + Sync + 'static,
875    {
876        let caller = Location::caller();
877        let Axon {
878            mut schematic,
879            executor: prev_executor,
880            execution_mode,
881            persistence_store,
882            audit_sink,
883            dlq_sink,
884            dlq_policy,
885            dynamic_dlq_policy,
886            saga_policy,
887            dynamic_saga_policy,
888            saga_compensation_registry,
889            iam_handle,
890        } = self;
891
892        let next_node_id = uuid::Uuid::new_v4().to_string();
893        let next_node = Node {
894            id: next_node_id.clone(),
895            kind: NodeKind::Atom,
896            label: transition.label(),
897            description: transition.description(),
898            input_type: type_name_of::<Out>(),
899            output_type: type_name_of::<Next>(),
900            resource_type: type_name_of::<Res>(),
901            metadata: Default::default(),
902            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
903            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
904            position: transition
905                .position()
906                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
907            compensation_node_id: None,
908            input_schema: transition.input_schema(),
909            output_schema: None,
910        };
911
912        let last_node_id = schematic
913            .nodes
914            .last()
915            .map(|n| n.id.clone())
916            .unwrap_or_default();
917
918        schematic.nodes.push(next_node);
919        schematic.edges.push(Edge {
920            from: last_node_id,
921            to: next_node_id.clone(),
922            kind: EdgeType::Linear,
923            label: Some("Next (timeout-guarded)".to_string()),
924        });
925
926        let node_id_for_exec = next_node_id.clone();
927        let node_label_for_exec = transition.label();
928        let bus_policy_for_exec = transition.bus_access_policy();
929        let bus_policy_clone = bus_policy_for_exec.clone();
930        let current_step_idx = schematic.nodes.len() as u64 - 1;
931        let next_executor: Executor<In, Next, E, Res> = Arc::new(
932            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
933                let prev = prev_executor.clone();
934                let trans = transition.clone();
935                let timeline_node_id = node_id_for_exec.clone();
936                let timeline_node_label = node_label_for_exec.clone();
937                let transition_bus_policy = bus_policy_clone.clone();
938                let step_idx = current_step_idx;
939                let timeout_duration = duration;
940                let error_factory = make_timeout_error.clone();
941
942                Box::pin(async move {
943                    // Run previous step
944                    let prev_result = prev(input, res, bus).await;
945                    let state = match prev_result {
946                        Outcome::Next(t) => t,
947                        other => return other.map(|_| unreachable!()),
948                    };
949
950                    // Execute with timeout
951                    match tokio::time::timeout(
952                        timeout_duration,
953                        run_this_step::<Out, Next, E, Res>(
954                            &trans,
955                            state,
956                            res,
957                            bus,
958                            &timeline_node_id,
959                            &timeline_node_label,
960                            &transition_bus_policy,
961                            step_idx,
962                        ),
963                    )
964                    .await
965                    {
966                        Ok(result) => result,
967                        Err(_elapsed) => {
968                            tracing::warn!(
969                                node_id = %timeline_node_id,
970                                timeout_ms = timeout_duration.as_millis() as u64,
971                                "Transition timed out"
972                            );
973                            if let Some(timeline) = bus.read_mut::<Timeline>() {
974                                timeline.push(TimelineEvent::NodeTimeout {
975                                    node_id: timeline_node_id.clone(),
976                                    timeout_ms: timeout_duration.as_millis() as u64,
977                                    timestamp: now_ms(),
978                                });
979                            }
980                            Outcome::Fault(error_factory())
981                        }
982                    }
983                })
984            },
985        );
986        Axon {
987            schematic,
988            executor: next_executor,
989            execution_mode,
990            persistence_store,
991            audit_sink,
992            dlq_sink,
993            dlq_policy,
994            dynamic_dlq_policy,
995            saga_policy,
996            dynamic_saga_policy,
997            saga_compensation_registry,
998            iam_handle,
999        }
1000    }
1001
1002    /// Chain a transition to this Axon with a Saga compensation step.
1003    ///
1004    /// If the transition fails, the compensation transition will be executed
1005    /// automatically if `CompensationAutoTrigger` is enabled in the Bus.
1006    #[track_caller]
1007    pub fn then_compensated<Next, Trans, Comp>(
1008        self,
1009        transition: Trans,
1010        compensation: Comp,
1011    ) -> Axon<In, Next, E, Res>
1012    where
1013        Out: Clone,
1014        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
1015        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1016        Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1017    {
1018        let caller = Location::caller();
1019        let Axon {
1020            mut schematic,
1021            executor: prev_executor,
1022            execution_mode,
1023            persistence_store,
1024            audit_sink,
1025            dlq_sink,
1026            dlq_policy,
1027            dynamic_dlq_policy,
1028            saga_policy,
1029            dynamic_saga_policy,
1030            saga_compensation_registry,
1031            iam_handle,
1032        } = self;
1033
1034        // 1. Add Primary Node
1035        let next_node_id = uuid::Uuid::new_v4().to_string();
1036        let comp_node_id = uuid::Uuid::new_v4().to_string();
1037
1038        let next_node = Node {
1039            id: next_node_id.clone(),
1040            kind: NodeKind::Atom,
1041            label: transition.label(),
1042            description: transition.description(),
1043            input_type: type_name_of::<Out>(),
1044            output_type: type_name_of::<Next>(),
1045            resource_type: type_name_of::<Res>(),
1046            metadata: Default::default(),
1047            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
1048            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1049            position: transition
1050                .position()
1051                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1052            compensation_node_id: Some(comp_node_id.clone()),
1053            input_schema: None,
1054            output_schema: None,
1055        };
1056
1057        // 2. Add Compensation Node
1058        let comp_node = Node {
1059            id: comp_node_id.clone(),
1060            kind: NodeKind::Atom,
1061            label: format!("Compensate: {}", compensation.label()),
1062            description: compensation.description(),
1063            input_type: type_name_of::<Out>(),
1064            output_type: "void".to_string(),
1065            resource_type: type_name_of::<Res>(),
1066            metadata: Default::default(),
1067            bus_capability: None,
1068            source_location: None,
1069            position: compensation
1070                .position()
1071                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1072            compensation_node_id: None,
1073            input_schema: None,
1074            output_schema: None,
1075        };
1076
1077        let last_node_id = schematic
1078            .nodes
1079            .last()
1080            .map(|n| n.id.clone())
1081            .unwrap_or_default();
1082
1083        schematic.nodes.push(next_node);
1084        schematic.nodes.push(comp_node);
1085        schematic.edges.push(Edge {
1086            from: last_node_id,
1087            to: next_node_id.clone(),
1088            kind: EdgeType::Linear,
1089            label: Some("Next".to_string()),
1090        });
1091
1092        // 3. Compose Executor with Compensation Logic
1093        let node_id_for_exec = next_node_id.clone();
1094        let comp_id_for_exec = comp_node_id.clone();
1095        let node_label_for_exec = transition.label();
1096        let bus_policy_for_exec = transition.bus_access_policy();
1097        let step_idx_for_exec = schematic.nodes.len() as u64 - 2;
1098        let comp_for_exec = compensation.clone();
1099        let bus_policy_for_executor = bus_policy_for_exec.clone();
1100        let bus_policy_for_registry = bus_policy_for_exec.clone();
1101        let next_executor: Executor<In, Next, E, Res> = Arc::new(
1102            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
1103                let prev = prev_executor.clone();
1104                let trans = transition.clone();
1105                let comp = comp_for_exec.clone();
1106                let timeline_node_id = node_id_for_exec.clone();
1107                let timeline_comp_id = comp_id_for_exec.clone();
1108                let timeline_node_label = node_label_for_exec.clone();
1109                let transition_bus_policy = bus_policy_for_executor.clone();
1110                let step_idx = step_idx_for_exec;
1111
1112                Box::pin(async move {
1113                    // Check for manual intervention jump
1114                    if let Some(jump) = bus.read::<ManualJump>()
1115                        && (jump.target_node == timeline_node_id
1116                            || jump.target_node == timeline_node_label)
1117                    {
1118                        tracing::info!(
1119                            node_id = %timeline_node_id,
1120                            node_label = %timeline_node_label,
1121                            "Manual jump target reached (compensated); skipping previous steps"
1122                        );
1123
1124                        let state = if let Some(ow) = jump.payload_override.clone() {
1125                            match serde_json::from_value::<Out>(ow) {
1126                                Ok(s) => s,
1127                                Err(e) => {
1128                                    tracing::error!(
1129                                        "Payload override deserialization failed: {}",
1130                                        e
1131                                    );
1132                                    return Outcome::emit(
1133                                        "execution.jump.payload_error",
1134                                        Some(serde_json::json!({"error": e.to_string()})),
1135                                    )
1136                                    .map(|_: ()| unreachable!());
1137                                }
1138                            }
1139                        } else {
1140                            return Outcome::emit(
1141                                "execution.jump.missing_payload",
1142                                Some(serde_json::json!({"node_id": timeline_node_id})),
1143                            )
1144                            .map(|_: ()| unreachable!());
1145                        };
1146
1147                        // Skip prev() and continue with trans.run(state, ...)
1148                        return run_this_compensated_step::<Out, Next, E, Res, Comp>(
1149                            &trans,
1150                            &comp,
1151                            state,
1152                            res,
1153                            bus,
1154                            &timeline_node_id,
1155                            &timeline_comp_id,
1156                            &timeline_node_label,
1157                            &transition_bus_policy,
1158                            step_idx,
1159                        )
1160                        .await;
1161                    }
1162
1163                    // Handle resumption skip
1164                    if let Some(start) = bus.read::<StartStep>()
1165                        && step_idx == start.0
1166                        && bus.read::<ResumptionState>().is_some()
1167                    {
1168                        let fresh_state = serde_json::to_value(&input)
1169                            .ok()
1170                            .and_then(|v| serde_json::from_value::<Out>(v).ok());
1171                        let persisted_state = bus
1172                            .read::<ResumptionState>()
1173                            .and_then(|r| r.payload.clone())
1174                            .and_then(|p| serde_json::from_value::<Out>(p).ok());
1175
1176                        if let Some(s) = fresh_state.or(persisted_state) {
1177                            tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint (compensated)");
1178                            return run_this_compensated_step::<Out, Next, E, Res, Comp>(
1179                                &trans,
1180                                &comp,
1181                                s,
1182                                res,
1183                                bus,
1184                                &timeline_node_id,
1185                                &timeline_comp_id,
1186                                &timeline_node_label,
1187                                &transition_bus_policy,
1188                                step_idx,
1189                            )
1190                            .await;
1191                        }
1192
1193                        return Outcome::emit(
1194                            "execution.resumption.payload_error",
1195                            Some(serde_json::json!({"error": "no compatible resumption state"})),
1196                        )
1197                        .map(|_: ()| unreachable!());
1198                    }
1199
1200                    // Run previous step
1201                    let prev_result = prev(input, res, bus).await;
1202
1203                    // Unpack
1204                    let state = match prev_result {
1205                        Outcome::Next(t) => t,
1206                        other => return other.map(|_| unreachable!()),
1207                    };
1208
1209                    run_this_compensated_step::<Out, Next, E, Res, Comp>(
1210                        &trans,
1211                        &comp,
1212                        state,
1213                        res,
1214                        bus,
1215                        &timeline_node_id,
1216                        &timeline_comp_id,
1217                        &timeline_node_label,
1218                        &transition_bus_policy,
1219                        step_idx,
1220                    )
1221                    .await
1222                })
1223            },
1224        );
1225        // 4. Register Saga Compensation if enabled
1226        {
1227            let mut registry = saga_compensation_registry.write().expect("saga compensation registry lock poisoned");
1228            let comp_fn = compensation.clone();
1229            let transition_bus_policy = bus_policy_for_registry.clone();
1230
1231            let handler: ranvier_core::saga::SagaCompensationFn<E, Res> =
1232                Arc::new(move |input_data, res, bus| {
1233                    let comp = comp_fn.clone();
1234                    let bus_policy = transition_bus_policy.clone();
1235                    Box::pin(async move {
1236                        let input: Out = serde_json::from_slice(&input_data).expect("saga compensation input deserialization failed — type mismatch between snapshot and compensation handler");
1237                        bus.set_access_policy(comp.label(), bus_policy);
1238                        let res = comp.run(input, res, bus).await;
1239                        bus.clear_access_policy();
1240                        res
1241                    })
1242                });
1243            registry.register(next_node_id.clone(), handler);
1244        }
1245
1246        Axon {
1247            schematic,
1248            executor: next_executor,
1249            execution_mode,
1250            persistence_store,
1251            audit_sink,
1252            dlq_sink,
1253            dlq_policy,
1254            dynamic_dlq_policy,
1255            saga_policy,
1256            dynamic_saga_policy,
1257            saga_compensation_registry,
1258            iam_handle,
1259        }
1260    }
1261
1262    /// Attach a compensation transition to the previously added node.
1263    /// This establishes a Schematic-level Saga compensation mapping.
1264    #[track_caller]
1265    pub fn compensate_with<Comp>(mut self, transition: Comp) -> Self
1266    where
1267        Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1268    {
1269        // NOTE: This currently only updates the Schematic.
1270        // For runtime compensation behavior, use `then_compensated`.
1271        let caller = Location::caller();
1272        let comp_node_id = uuid::Uuid::new_v4().to_string();
1273
1274        let comp_node = Node {
1275            id: comp_node_id.clone(),
1276            kind: NodeKind::Atom,
1277            label: transition.label(),
1278            description: transition.description(),
1279            input_type: type_name_of::<Out>(),
1280            output_type: "void".to_string(),
1281            resource_type: type_name_of::<Res>(),
1282            metadata: Default::default(),
1283            bus_capability: None,
1284            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1285            position: transition
1286                .position()
1287                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1288            compensation_node_id: None,
1289            input_schema: None,
1290            output_schema: None,
1291        };
1292
1293        if let Some(last_node) = self.schematic.nodes.last_mut() {
1294            last_node.compensation_node_id = Some(comp_node_id.clone());
1295        }
1296
1297        self.schematic.nodes.push(comp_node);
1298        self
1299    }
1300
1301    /// Add a branch point
1302    #[track_caller]
1303    pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
1304        let caller = Location::caller();
1305        let branch_id_str = branch_id.into();
1306        let last_node_id = self
1307            .schematic
1308            .nodes
1309            .last()
1310            .map(|n| n.id.clone())
1311            .unwrap_or_default();
1312
1313        let branch_node = Node {
1314            id: uuid::Uuid::new_v4().to_string(),
1315            kind: NodeKind::Synapse,
1316            label: label.to_string(),
1317            description: None,
1318            input_type: type_name_of::<Out>(),
1319            output_type: type_name_of::<Out>(),
1320            resource_type: type_name_of::<Res>(),
1321            metadata: Default::default(),
1322            bus_capability: None,
1323            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1324            position: None,
1325            compensation_node_id: None,
1326            input_schema: None,
1327            output_schema: None,
1328        };
1329
1330        self.schematic.nodes.push(branch_node);
1331        self.schematic.edges.push(Edge {
1332            from: last_node_id,
1333            to: branch_id_str.clone(),
1334            kind: EdgeType::Branch(branch_id_str),
1335            label: Some("Branch".to_string()),
1336        });
1337
1338        self
1339    }
1340
1341    /// Run multiple transitions in parallel (fan-out / fan-in).
1342    ///
1343    /// Each transition receives a clone of the current step's output and runs
1344    /// concurrently via [`futures_util::future::join_all`]. The strategy controls
1345    /// how faults are handled:
1346    ///
1347    /// - [`ParallelStrategy::AllMustSucceed`]: All branches must produce `Next`.
1348    ///   If any branch returns `Fault`, the first fault is propagated.
1349    /// - [`ParallelStrategy::AnyCanFail`]: Branches that fault are ignored as
1350    ///   long as at least one succeeds. If all branches fault, the first fault
1351    ///   is returned.
1352    ///
1353    /// The **first successful `Next` value** is forwarded to the next step in
1354    /// the pipeline. A custom merge can be layered on top via a subsequent
1355    /// `.then()` step.
1356    ///
1357    /// Each parallel branch receives its own fresh [`Bus`] instance. Resources
1358    /// should be injected via the shared `Res` bundle rather than the Bus for
1359    /// parallel steps.
1360    ///
1361    /// ## Schematic
1362    ///
1363    /// The method emits a `FanOut` node, one `Atom` node per branch (connected
1364    /// via `Parallel` edges), and a `FanIn` join node.
1365    ///
1366    /// ## Example
1367    ///
1368    /// ```rust,ignore
1369    /// let axon = Axon::new("Pipeline")
1370    ///     .then(ParseInput)
1371    ///     .parallel(
1372    ///         vec![Arc::new(EnrichA), Arc::new(EnrichB)],
1373    ///         ParallelStrategy::AllMustSucceed,
1374    ///     )
1375    ///     .then(MergeResults);
1376    /// ```
1377    #[track_caller]
1378    pub fn parallel(
1379        self,
1380        transitions: Vec<Arc<dyn Transition<Out, Out, Resources = Res, Error = E> + Send + Sync>>,
1381        strategy: ParallelStrategy,
1382    ) -> Axon<In, Out, E, Res>
1383    where
1384        Out: Clone,
1385    {
1386        let caller = Location::caller();
1387        let Axon {
1388            mut schematic,
1389            executor: prev_executor,
1390            execution_mode,
1391            persistence_store,
1392            audit_sink,
1393            dlq_sink,
1394            dlq_policy,
1395            dynamic_dlq_policy,
1396            saga_policy,
1397            dynamic_saga_policy,
1398            saga_compensation_registry,
1399            iam_handle,
1400        } = self;
1401
1402        // ── Schematic: FanOut node ─────────────────────────────────
1403        let fanout_id = uuid::Uuid::new_v4().to_string();
1404        let fanin_id = uuid::Uuid::new_v4().to_string();
1405
1406        let last_node_id = schematic
1407            .nodes
1408            .last()
1409            .map(|n| n.id.clone())
1410            .unwrap_or_default();
1411
1412        let fanout_node = Node {
1413            id: fanout_id.clone(),
1414            kind: NodeKind::FanOut,
1415            label: "FanOut".to_string(),
1416            description: Some(format!(
1417                "Parallel split ({} branches, {:?})",
1418                transitions.len(),
1419                strategy
1420            )),
1421            input_type: type_name_of::<Out>(),
1422            output_type: type_name_of::<Out>(),
1423            resource_type: type_name_of::<Res>(),
1424            metadata: Default::default(),
1425            bus_capability: None,
1426            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1427            position: None,
1428            compensation_node_id: None,
1429            input_schema: None,
1430            output_schema: None,
1431        };
1432
1433        schematic.nodes.push(fanout_node);
1434        schematic.edges.push(Edge {
1435            from: last_node_id,
1436            to: fanout_id.clone(),
1437            kind: EdgeType::Linear,
1438            label: Some("Next".to_string()),
1439        });
1440
1441        // ── Schematic: one Atom node per parallel branch ───────────
1442        let mut branch_node_ids = Vec::with_capacity(transitions.len());
1443        for (i, trans) in transitions.iter().enumerate() {
1444            let branch_id = uuid::Uuid::new_v4().to_string();
1445            let branch_node = Node {
1446                id: branch_id.clone(),
1447                kind: NodeKind::Atom,
1448                label: trans.label(),
1449                description: trans.description(),
1450                input_type: type_name_of::<Out>(),
1451                output_type: type_name_of::<Out>(),
1452                resource_type: type_name_of::<Res>(),
1453                metadata: Default::default(),
1454                bus_capability: bus_capability_schema_from_policy(trans.bus_access_policy()),
1455                source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1456                position: None,
1457                compensation_node_id: None,
1458                input_schema: trans.input_schema(),
1459                output_schema: None,
1460            };
1461            schematic.nodes.push(branch_node);
1462            schematic.edges.push(Edge {
1463                from: fanout_id.clone(),
1464                to: branch_id.clone(),
1465                kind: EdgeType::Parallel,
1466                label: Some(format!("Branch {}", i)),
1467            });
1468            branch_node_ids.push(branch_id);
1469        }
1470
1471        // ── Schematic: FanIn node ──────────────────────────────────
1472        let fanin_node = Node {
1473            id: fanin_id.clone(),
1474            kind: NodeKind::FanIn,
1475            label: "FanIn".to_string(),
1476            description: Some(format!("Parallel join ({:?})", strategy)),
1477            input_type: type_name_of::<Out>(),
1478            output_type: type_name_of::<Out>(),
1479            resource_type: type_name_of::<Res>(),
1480            metadata: Default::default(),
1481            bus_capability: None,
1482            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1483            position: None,
1484            compensation_node_id: None,
1485            input_schema: None,
1486            output_schema: None,
1487        };
1488
1489        schematic.nodes.push(fanin_node);
1490        for branch_id in &branch_node_ids {
1491            schematic.edges.push(Edge {
1492                from: branch_id.clone(),
1493                to: fanin_id.clone(),
1494                kind: EdgeType::Parallel,
1495                label: Some("Join".to_string()),
1496            });
1497        }
1498
1499        // ── Executor: parallel composition ─────────────────────────
1500        let fanout_node_id = fanout_id.clone();
1501        let fanin_node_id = fanin_id.clone();
1502        let branch_ids_for_exec = branch_node_ids.clone();
1503        let step_idx = schematic.nodes.len() as u64 - 1;
1504
1505        let next_executor: Executor<In, Out, E, Res> = Arc::new(
1506            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Out, E>> {
1507                let prev = prev_executor.clone();
1508                let branches = transitions.clone();
1509                let fanout_id = fanout_node_id.clone();
1510                let fanin_id = fanin_node_id.clone();
1511                let branch_ids = branch_ids_for_exec.clone();
1512
1513                Box::pin(async move {
1514                    // Run previous steps
1515                    let prev_result = prev(input, res, bus).await;
1516                    let state = match prev_result {
1517                        Outcome::Next(t) => t,
1518                        other => return other.map(|_| unreachable!()),
1519                    };
1520
1521                    // Timeline: FanOut enter
1522                    let fanout_enter_ts = now_ms();
1523                    if let Some(timeline) = bus.read_mut::<Timeline>() {
1524                        timeline.push(TimelineEvent::NodeEnter {
1525                            node_id: fanout_id.clone(),
1526                            node_label: "FanOut".to_string(),
1527                            timestamp: fanout_enter_ts,
1528                        });
1529                    }
1530
1531                    // Build futures for all branches.
1532                    // Each branch gets its own Bus so they can run concurrently
1533                    // without &mut Bus aliasing. Resources are shared via &Res.
1534                    let futs: Vec<_> = branches
1535                        .iter()
1536                        .enumerate()
1537                        .map(|(i, trans)| {
1538                            let branch_state = state.clone();
1539                            let branch_node_id = branch_ids[i].clone();
1540                            let trans = trans.clone();
1541
1542                            async move {
1543                                let mut branch_bus = Bus::new();
1544                                let label = trans.label();
1545                                let bus_policy = trans.bus_access_policy();
1546
1547                                branch_bus.set_access_policy(label.clone(), bus_policy);
1548                                let result = trans.run(branch_state, res, &mut branch_bus).await;
1549                                branch_bus.clear_access_policy();
1550
1551                                (i, branch_node_id, label, result)
1552                            }
1553                        })
1554                        .collect();
1555
1556                    // Run all branches concurrently within the current task
1557                    let results: Vec<(usize, String, String, Outcome<Out, E>)> =
1558                        futures_util::future::join_all(futs).await;
1559
1560                    // Timeline: record each branch's enter/exit
1561                    for (_, branch_node_id, branch_label, outcome) in &results {
1562                        if let Some(timeline) = bus.read_mut::<Timeline>() {
1563                            let ts = now_ms();
1564                            timeline.push(TimelineEvent::NodeEnter {
1565                                node_id: branch_node_id.clone(),
1566                                node_label: branch_label.clone(),
1567                                timestamp: ts,
1568                            });
1569                            timeline.push(TimelineEvent::NodeExit {
1570                                node_id: branch_node_id.clone(),
1571                                outcome_type: outcome_type_name(outcome),
1572                                duration_ms: 0,
1573                                timestamp: ts,
1574                            });
1575                        }
1576                    }
1577
1578                    // Timeline: FanOut exit
1579                    if let Some(timeline) = bus.read_mut::<Timeline>() {
1580                        timeline.push(TimelineEvent::NodeExit {
1581                            node_id: fanout_id.clone(),
1582                            outcome_type: "Next".to_string(),
1583                            duration_ms: 0,
1584                            timestamp: now_ms(),
1585                        });
1586                    }
1587
1588                    // ── Apply strategy ─────────────────────────────
1589                    let combined = match strategy {
1590                        ParallelStrategy::AllMustSucceed => {
1591                            let mut first_fault = None;
1592                            let mut first_success = None;
1593
1594                            for (_, _, _, outcome) in results {
1595                                match outcome {
1596                                    Outcome::Fault(e) => {
1597                                        if first_fault.is_none() {
1598                                            first_fault = Some(Outcome::Fault(e));
1599                                        }
1600                                    }
1601                                    Outcome::Next(val) => {
1602                                        if first_success.is_none() {
1603                                            first_success = Some(Outcome::Next(val));
1604                                        }
1605                                    }
1606                                    other => {
1607                                        // Non-Next/non-Fault outcomes (Branch, Emit, Jump)
1608                                        // treated as non-success in AllMustSucceed
1609                                        if first_fault.is_none() {
1610                                            first_fault = Some(other);
1611                                        }
1612                                    }
1613                                }
1614                            }
1615
1616                            if let Some(fault) = first_fault {
1617                                fault
1618                            } else {
1619                                first_success.unwrap_or_else(|| {
1620                                    Outcome::emit("execution.parallel.no_results", None)
1621                                })
1622                            }
1623                        }
1624                        ParallelStrategy::AnyCanFail => {
1625                            let mut first_success = None;
1626                            let mut first_fault = None;
1627
1628                            for (_, _, _, outcome) in results {
1629                                match outcome {
1630                                    Outcome::Next(val) => {
1631                                        if first_success.is_none() {
1632                                            first_success = Some(Outcome::Next(val));
1633                                        }
1634                                    }
1635                                    Outcome::Fault(e) => {
1636                                        if first_fault.is_none() {
1637                                            first_fault = Some(Outcome::Fault(e));
1638                                        }
1639                                    }
1640                                    _ => {}
1641                                }
1642                            }
1643
1644                            first_success.unwrap_or_else(|| {
1645                                first_fault.unwrap_or_else(|| {
1646                                    Outcome::emit("execution.parallel.no_results", None)
1647                                })
1648                            })
1649                        }
1650                    };
1651
1652                    // Timeline: FanIn
1653                    let fanin_enter_ts = now_ms();
1654                    if let Some(timeline) = bus.read_mut::<Timeline>() {
1655                        timeline.push(TimelineEvent::NodeEnter {
1656                            node_id: fanin_id.clone(),
1657                            node_label: "FanIn".to_string(),
1658                            timestamp: fanin_enter_ts,
1659                        });
1660                        timeline.push(TimelineEvent::NodeExit {
1661                            node_id: fanin_id.clone(),
1662                            outcome_type: outcome_type_name(&combined),
1663                            duration_ms: 0,
1664                            timestamp: fanin_enter_ts,
1665                        });
1666                    }
1667
1668                    // Persistence
1669                    if let Some(handle) = bus.read::<PersistenceHandle>() {
1670                        let trace_id = persistence_trace_id(bus);
1671                        let circuit = bus
1672                            .read::<ranvier_core::schematic::Schematic>()
1673                            .map(|s| s.name.clone())
1674                            .unwrap_or_default();
1675                        let version = bus
1676                            .read::<ranvier_core::schematic::Schematic>()
1677                            .map(|s| s.schema_version.clone())
1678                            .unwrap_or_default();
1679
1680                        persist_execution_event(
1681                            handle,
1682                            &trace_id,
1683                            &circuit,
1684                            &version,
1685                            step_idx,
1686                            Some(fanin_id.clone()),
1687                            outcome_kind_name(&combined),
1688                            Some(combined.to_json_value()),
1689                        )
1690                        .await;
1691                    }
1692
1693                    combined
1694                })
1695            },
1696        );
1697
1698        Axon {
1699            schematic,
1700            executor: next_executor,
1701            execution_mode,
1702            persistence_store,
1703            audit_sink,
1704            dlq_sink,
1705            dlq_policy,
1706            dynamic_dlq_policy,
1707            saga_policy,
1708            dynamic_saga_policy,
1709            saga_compensation_registry,
1710            iam_handle,
1711        }
1712    }
1713
1714    /// Execute the Axon with the given input and resources.
1715    pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
1716        if let ExecutionMode::Singleton {
1717            lock_key,
1718            ttl_ms,
1719            lock_provider,
1720        } = &self.execution_mode
1721        {
1722            let trace_span = tracing::info_span!("Singleton Execution", key = %lock_key);
1723            let _enter = trace_span.enter();
1724            match lock_provider.try_acquire(lock_key, *ttl_ms).await {
1725                Ok(true) => {
1726                    tracing::debug!("Successfully acquired singleton lock: {}", lock_key);
1727                }
1728                Ok(false) => {
1729                    tracing::debug!(
1730                        "Singleton lock {} already held, aborting execution.",
1731                        lock_key
1732                    );
1733                    // Emit a specific event indicating skip
1734                    return Outcome::emit(
1735                        "execution.skipped.lock_held",
1736                        Some(serde_json::json!({
1737                            "lock_key": lock_key
1738                        })),
1739                    );
1740                }
1741                Err(e) => {
1742                    tracing::error!("Failed to check singleton lock {}: {:?}", lock_key, e);
1743                    return Outcome::emit(
1744                        "execution.skipped.lock_error",
1745                        Some(serde_json::json!({
1746                            "error": e.to_string()
1747                        })),
1748                    );
1749                }
1750            }
1751        }
1752
1753        // ── IAM Boundary Check ────────────────────────────────────
1754        if let Some(iam) = &self.iam_handle {
1755            use ranvier_core::iam::{IamPolicy, IamToken, enforce_policy};
1756
1757            if matches!(iam.policy, IamPolicy::None) {
1758                // No verification required — skip
1759            } else {
1760                let token = bus.read::<IamToken>().map(|t| t.0.clone());
1761
1762                match token {
1763                    Some(raw_token) => {
1764                        match iam.verifier.verify(&raw_token).await {
1765                            Ok(identity) => {
1766                                if let Err(e) = enforce_policy(&iam.policy, &identity) {
1767                                    tracing::warn!(
1768                                        policy = ?iam.policy,
1769                                        subject = %identity.subject,
1770                                        "IAM policy enforcement failed: {}",
1771                                        e
1772                                    );
1773                                    return Outcome::emit(
1774                                        "iam.policy_denied",
1775                                        Some(serde_json::json!({
1776                                            "error": e.to_string(),
1777                                            "subject": identity.subject,
1778                                        })),
1779                                    );
1780                                }
1781                                // Insert verified identity into Bus for downstream access
1782                                bus.insert(identity);
1783                            }
1784                            Err(e) => {
1785                                tracing::warn!("IAM token verification failed: {}", e);
1786                                return Outcome::emit(
1787                                    "iam.verification_failed",
1788                                    Some(serde_json::json!({
1789                                        "error": e.to_string()
1790                                    })),
1791                                );
1792                            }
1793                        }
1794                    }
1795                    None => {
1796                        tracing::warn!("IAM policy requires token but none found in Bus");
1797                        return Outcome::emit("iam.missing_token", None);
1798                    }
1799                }
1800            }
1801        }
1802
1803        let trace_id = persistence_trace_id(bus);
1804        let label = self.schematic.name.clone();
1805
1806        // Inject DLQ config into Bus for step-level reporting
1807        if let Some(sink) = &self.dlq_sink {
1808            bus.insert(sink.clone());
1809        }
1810        // Prefer dynamic (hot-reloadable) policy if available, otherwise use static
1811        let effective_dlq_policy = self
1812            .dynamic_dlq_policy
1813            .as_ref()
1814            .map(|d| d.current())
1815            .unwrap_or_else(|| self.dlq_policy.clone());
1816        bus.insert(effective_dlq_policy);
1817        bus.insert(self.schematic.clone());
1818        let effective_saga_policy = self
1819            .dynamic_saga_policy
1820            .as_ref()
1821            .map(|d| d.current())
1822            .unwrap_or_else(|| self.saga_policy.clone());
1823        bus.insert(effective_saga_policy.clone());
1824
1825        // Initialize Saga stack if enabled and not already present (e.g. from resumption)
1826        if effective_saga_policy == SagaPolicy::Enabled && bus.read::<SagaStack>().is_none() {
1827            bus.insert(SagaStack::new());
1828        }
1829
1830        let persistence_handle = bus.read::<PersistenceHandle>().cloned();
1831        let compensation_handle = bus.read::<CompensationHandle>().cloned();
1832        let compensation_retry_policy = compensation_retry_policy(bus);
1833        let compensation_idempotency = bus.read::<CompensationIdempotencyHandle>().cloned();
1834        let version = self.schematic.schema_version.clone();
1835        let migration_registry = bus
1836            .read::<ranvier_core::schematic::MigrationRegistry>()
1837            .cloned();
1838
1839        let persistence_start_step = if let Some(handle) = persistence_handle.as_ref() {
1840            let (mut start_step, trace_version, intervention, last_node_id, mut last_payload) =
1841                load_persistence_version(handle, &trace_id).await;
1842
1843            if let Some(interv) = intervention {
1844                tracing::info!(
1845                    trace_id = %trace_id,
1846                    target_node = %interv.target_node,
1847                    "Applying manual intervention command"
1848                );
1849
1850                // Find the step index for the target node
1851                if let Some(target_idx) = self
1852                    .schematic
1853                    .nodes
1854                    .iter()
1855                    .position(|n| n.id == interv.target_node || n.label == interv.target_node)
1856                {
1857                    tracing::info!(
1858                        trace_id = %trace_id,
1859                        target_node = %interv.target_node,
1860                        target_step = target_idx,
1861                        "Intervention: Jumping to target node"
1862                    );
1863                    start_step = target_idx as u64;
1864
1865                    // Inject ManualJump into the bus so executors can handle skipping/payload overrides
1866                    bus.insert(ManualJump {
1867                        target_node: interv.target_node.clone(),
1868                        payload_override: interv.payload_override.clone(),
1869                    });
1870
1871                    // Log audit event for intervention application
1872                    if let Some(sink) = self.audit_sink.as_ref() {
1873                        let event = AuditEvent::new(
1874                            uuid::Uuid::new_v4().to_string(),
1875                            "System".to_string(),
1876                            "ApplyIntervention".to_string(),
1877                            trace_id.to_string(),
1878                        )
1879                        .with_metadata("target_node", interv.target_node.clone())
1880                        .with_metadata("target_step", target_idx);
1881
1882                        let _ = sink.append(&event).await;
1883                    }
1884                } else {
1885                    tracing::warn!(
1886                        trace_id = %trace_id,
1887                        target_node = %interv.target_node,
1888                        "Intervention target node not found in schematic; ignoring jump"
1889                    );
1890                }
1891            }
1892
1893            if let Some(old_version) = trace_version
1894                && old_version != version
1895            {
1896                tracing::info!(
1897                    trace_id = %trace_id,
1898                    old_version = %old_version,
1899                    current_version = %version,
1900                    "Version mismatch detected during resumption"
1901                );
1902
1903                // Try multi-hop migration path first, fall back to direct lookup
1904                let migration_path = migration_registry
1905                    .as_ref()
1906                    .and_then(|r| r.find_migration_path(&old_version, &version));
1907
1908                let (final_migration, mapped_payload) = if let Some(path) = migration_path {
1909                    if path.is_empty() {
1910                        (None, last_payload.clone())
1911                    } else {
1912                        // Apply payload mappers along the migration chain
1913                        let mut payload = last_payload.clone();
1914                        for hop in &path {
1915                            if let (Some(mapper), Some(p)) = (&hop.payload_mapper, payload.as_ref())
1916                            {
1917                                match mapper.map_state(p) {
1918                                    Ok(mapped) => payload = Some(mapped),
1919                                    Err(e) => {
1920                                        tracing::error!(
1921                                            trace_id = %trace_id,
1922                                            from = %hop.from_version,
1923                                            to = %hop.to_version,
1924                                            error = %e,
1925                                            "Payload migration mapper failed"
1926                                        );
1927                                        return Outcome::emit(
1928                                            "execution.resumption.payload_migration_failed",
1929                                            Some(serde_json::json!({
1930                                                "trace_id": trace_id,
1931                                                "from": hop.from_version,
1932                                                "to": hop.to_version,
1933                                                "error": e.to_string()
1934                                            })),
1935                                        );
1936                                    }
1937                                }
1938                            }
1939                        }
1940                        let hops: Vec<String> = path
1941                            .iter()
1942                            .map(|h| format!("{}->{}", h.from_version, h.to_version))
1943                            .collect();
1944                        tracing::info!(trace_id = %trace_id, hops = ?hops, "Applied multi-hop migration path");
1945                        (path.last().copied(), payload)
1946                    }
1947                } else {
1948                    (None, last_payload.clone())
1949                };
1950
1951                // Use the final migration in the path to determine strategy
1952                let migration = final_migration.or_else(|| {
1953                    migration_registry
1954                        .as_ref()
1955                        .and_then(|r| r.find_migration(&old_version, &version))
1956                });
1957
1958                // Update last_payload with mapped version
1959                if mapped_payload.is_some() {
1960                    last_payload = mapped_payload;
1961                }
1962
1963                let strategy = if let (Some(m), Some(node_id)) = (migration, last_node_id.as_ref())
1964                {
1965                    m.node_mapping
1966                        .get(node_id)
1967                        .cloned()
1968                        .unwrap_or(m.default_strategy.clone())
1969                } else {
1970                    migration
1971                        .map(|m| m.default_strategy.clone())
1972                        .unwrap_or(ranvier_core::schematic::MigrationStrategy::Fail)
1973                };
1974
1975                match strategy {
1976                    ranvier_core::schematic::MigrationStrategy::ResumeFromStart => {
1977                        tracing::info!(trace_id = %trace_id, "Applying ResumeFromStart migration strategy");
1978                        start_step = 0;
1979                    }
1980                    ranvier_core::schematic::MigrationStrategy::MigrateActiveNode {
1981                        new_node_id,
1982                        ..
1983                    } => {
1984                        tracing::info!(trace_id = %trace_id, to_node = %new_node_id, "Applying MigrateActiveNode strategy");
1985                        if let Some(target_idx) = self
1986                            .schematic
1987                            .nodes
1988                            .iter()
1989                            .position(|n| n.id == new_node_id || n.label == new_node_id)
1990                        {
1991                            start_step = target_idx as u64;
1992                        } else {
1993                            tracing::warn!(trace_id = %trace_id, "MigrateActiveNode: target node {} not found", new_node_id);
1994                            return Outcome::emit(
1995                                "execution.resumption.migration_target_not_found",
1996                                Some(serde_json::json!({ "node_id": new_node_id })),
1997                            );
1998                        }
1999                    }
2000                    ranvier_core::schematic::MigrationStrategy::FallbackToNode(node_id) => {
2001                        tracing::info!(trace_id = %trace_id, to_node = %node_id, "Applying FallbackToNode strategy");
2002                        if let Some(target_idx) = self
2003                            .schematic
2004                            .nodes
2005                            .iter()
2006                            .position(|n| n.id == node_id || n.label == node_id)
2007                        {
2008                            start_step = target_idx as u64;
2009                        } else {
2010                            tracing::warn!(trace_id = %trace_id, "FallbackToNode: node {} not found", node_id);
2011                            return Outcome::emit(
2012                                "execution.resumption.migration_target_not_found",
2013                                Some(serde_json::json!({ "node_id": node_id })),
2014                            );
2015                        }
2016                    }
2017                    ranvier_core::schematic::MigrationStrategy::Fail => {
2018                        tracing::error!(trace_id = %trace_id, "Version mismatch: no migration path found. Failing resumption.");
2019                        return Outcome::emit(
2020                            "execution.resumption.version_mismatch_failed",
2021                            Some(serde_json::json!({
2022                                "trace_id": trace_id,
2023                                "old_version": old_version,
2024                                "current_version": version
2025                            })),
2026                        );
2027                    }
2028                    _ => {
2029                        tracing::error!(trace_id = %trace_id, "Unsupported migration strategy: {:?}", strategy);
2030                        return Outcome::emit(
2031                            "execution.resumption.unsupported_migration",
2032                            Some(serde_json::json!({
2033                                "trace_id": trace_id,
2034                                "strategy": format!("{:?}", strategy)
2035                            })),
2036                        );
2037                    }
2038                }
2039            }
2040
2041            let ingress_node_id = self.schematic.nodes.first().map(|n| n.id.clone());
2042            persist_execution_event(
2043                handle,
2044                &trace_id,
2045                &label,
2046                &version,
2047                start_step,
2048                ingress_node_id,
2049                "Enter",
2050                None,
2051            )
2052            .await;
2053
2054            bus.insert(StartStep(start_step));
2055            if start_step > 0 {
2056                bus.insert(ResumptionState {
2057                    payload: last_payload,
2058                });
2059            }
2060
2061            Some(start_step)
2062        } else {
2063            None
2064        };
2065
2066        let should_capture = should_attach_timeline(bus);
2067        let inserted_timeline = if should_capture {
2068            ensure_timeline(bus)
2069        } else {
2070            false
2071        };
2072        let ingress_started = std::time::Instant::now();
2073        let ingress_enter_ts = now_ms();
2074        if should_capture
2075            && let (Some(timeline), Some(ingress)) =
2076                (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
2077        {
2078            timeline.push(TimelineEvent::NodeEnter {
2079                node_id: ingress.id.clone(),
2080                node_label: ingress.label.clone(),
2081                timestamp: ingress_enter_ts,
2082            });
2083        }
2084
2085        let circuit_span = tracing::info_span!(
2086            "Circuit",
2087            ranvier.circuit = %label,
2088            ranvier.outcome_kind = tracing::field::Empty,
2089            ranvier.outcome_target = tracing::field::Empty
2090        );
2091        let outcome = (self.executor)(input, resources, bus)
2092            .instrument(circuit_span.clone())
2093            .await;
2094        circuit_span.record("ranvier.outcome_kind", outcome_kind_name(&outcome));
2095        if let Some(target) = outcome_target(&outcome) {
2096            circuit_span.record("ranvier.outcome_target", tracing::field::display(&target));
2097        }
2098
2099        // Automated Saga Rollback (LIFO)
2100        if matches!(outcome, Outcome::Fault(_)) && self.saga_policy == SagaPolicy::Enabled {
2101            while let Some(task) = {
2102                let mut stack = bus.read_mut::<SagaStack>();
2103                stack.as_mut().and_then(|s| s.pop())
2104            } {
2105                tracing::info!(trace_id = %trace_id, node_id = %task.node_id, "Compensating step: {}", task.node_label);
2106
2107                let handler = {
2108                    let registry = self.saga_compensation_registry.read().expect("saga compensation registry lock poisoned");
2109                    registry.get(&task.node_id)
2110                };
2111                if let Some(handler) = handler {
2112                    let res = handler(task.input_snapshot, resources, bus).await;
2113                    if let Outcome::Fault(e) = res {
2114                        tracing::error!(trace_id = %trace_id, node_id = %task.node_id, "Saga compensation FAILED: {:?}", e);
2115                    }
2116                } else {
2117                    tracing::warn!(trace_id = %trace_id, node_id = %task.node_id, "No compensation handler found in registry for saga rollback");
2118                }
2119            }
2120            tracing::info!(trace_id = %trace_id, "Saga automated rollback completed");
2121        }
2122
2123        let ingress_exit_ts = now_ms();
2124        if should_capture
2125            && let (Some(timeline), Some(ingress)) =
2126                (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
2127        {
2128            timeline.push(TimelineEvent::NodeExit {
2129                node_id: ingress.id.clone(),
2130                outcome_type: outcome_type_name(&outcome),
2131                duration_ms: ingress_started.elapsed().as_millis() as u64,
2132                timestamp: ingress_exit_ts,
2133            });
2134        }
2135
2136        if let Some(handle) = persistence_handle.as_ref() {
2137            let fault_step = persistence_start_step.map(|s| s + 1).unwrap_or(1);
2138            persist_execution_event(
2139                handle,
2140                &trace_id,
2141                &label,
2142                &version,
2143                fault_step,
2144                None, // Outcome-level events might not have a single node_id context here
2145                outcome_kind_name(&outcome),
2146                Some(outcome.to_json_value()),
2147            )
2148            .await;
2149
2150            let mut completion = completion_from_outcome(&outcome);
2151            if matches!(outcome, Outcome::Fault(_))
2152                && let Some(compensation) = compensation_handle.as_ref()
2153                && compensation_auto_trigger(bus)
2154            {
2155                let context = CompensationContext {
2156                    trace_id: trace_id.clone(),
2157                    circuit: label.clone(),
2158                    fault_kind: outcome_kind_name(&outcome).to_string(),
2159                    fault_step,
2160                    timestamp_ms: now_ms(),
2161                };
2162
2163                if run_compensation(
2164                    compensation,
2165                    context,
2166                    compensation_retry_policy,
2167                    compensation_idempotency.clone(),
2168                )
2169                .await
2170                {
2171                    persist_execution_event(
2172                        handle,
2173                        &trace_id,
2174                        &label,
2175                        &version,
2176                        fault_step.saturating_add(1),
2177                        None,
2178                        "Compensated",
2179                        None,
2180                    )
2181                    .await;
2182                    completion = CompletionState::Compensated;
2183                }
2184            }
2185
2186            if persistence_auto_complete(bus) {
2187                persist_completion(handle, &trace_id, completion).await;
2188            }
2189        }
2190
2191        if should_capture {
2192            maybe_export_timeline(bus, &outcome);
2193        }
2194        if inserted_timeline {
2195            let _ = bus.remove::<Timeline>();
2196        }
2197
2198        outcome
2199    }
2200
2201    /// Starts the Ranvier Inspector for this Axon on the specified port.
2202    /// This spawns a background task to serve the Schematic.
2203    pub fn serve_inspector(self, port: u16) -> Self {
2204        if !inspector_dev_mode_from_env() {
2205            tracing::info!("Inspector disabled because RANVIER_MODE is production");
2206            return self;
2207        }
2208        if !inspector_enabled_from_env() {
2209            tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
2210            return self;
2211        }
2212
2213        let schematic = self.schematic.clone();
2214        let axon_inspector = Arc::new(self.clone());
2215        tokio::spawn(async move {
2216            if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
2217                .with_projection_files_from_env()
2218                .with_mode_from_env()
2219                .with_auth_policy_from_env()
2220                .with_state_inspector(axon_inspector)
2221                .serve()
2222                .await
2223            {
2224                tracing::error!("Inspector server failed: {}", e);
2225            }
2226        });
2227        self
2228    }
2229
2230    /// Get a reference to the Schematic (structural view).
2231    pub fn schematic(&self) -> &Schematic {
2232        &self.schematic
2233    }
2234
2235    /// Consume and return the Schematic.
2236    pub fn into_schematic(self) -> Schematic {
2237        self.schematic
2238    }
2239
2240    /// Detect schematic export mode from runtime flags.
2241    ///
2242    /// Supported triggers:
2243    /// - `RANVIER_SCHEMATIC=1|true|on|yes`
2244    /// - `--schematic`
2245    ///
2246    /// Optional output path:
2247    /// - `RANVIER_SCHEMATIC_OUTPUT=<path>`
2248    /// - `--schematic-output <path>` / `--schematic-output=<path>`
2249    /// - `--output <path>` / `--output=<path>` (only relevant in schematic mode)
2250    pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
2251        schematic_export_request_from_process()
2252    }
2253
2254    /// Export schematic and return `true` when schematic mode is active.
2255    ///
2256    /// Use this once after circuit construction and before server/custom loops:
2257    ///
2258    /// ```rust,ignore
2259    /// let axon = build_axon();
2260    /// if axon.maybe_export_and_exit()? {
2261    ///     return Ok(());
2262    /// }
2263    /// // Normal runtime path...
2264    /// ```
2265    pub fn maybe_export_and_exit(&self) -> anyhow::Result<bool> {
2266        self.maybe_export_and_exit_with(|_| ())
2267    }
2268
2269    /// Same as [`Self::maybe_export_and_exit`] but allows a custom hook right before export/exit.
2270    ///
2271    /// This is useful when your app has custom loop/bootstrap behavior and you want
2272    /// to skip or cleanup that logic in schematic mode.
2273    pub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> anyhow::Result<bool>
2274    where
2275        F: FnOnce(&SchematicExportRequest),
2276    {
2277        let Some(request) = self.schematic_export_request() else {
2278            return Ok(false);
2279        };
2280        on_before_exit(&request);
2281        self.export_schematic(&request)?;
2282        Ok(true)
2283    }
2284
2285    /// Export schematic according to the provided request.
2286    pub fn export_schematic(&self, request: &SchematicExportRequest) -> anyhow::Result<()> {
2287        let json = serde_json::to_string_pretty(self.schematic())?;
2288        if let Some(path) = &request.output {
2289            if let Some(parent) = path.parent()
2290                && !parent.as_os_str().is_empty()
2291            {
2292                fs::create_dir_all(parent)?;
2293            }
2294            fs::write(path, json.as_bytes())?;
2295            return Ok(());
2296        }
2297        println!("{}", json);
2298        Ok(())
2299    }
2300}
2301
2302fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
2303    let args: Vec<OsString> = std::env::args_os().skip(1).collect();
2304    let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
2305    let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
2306
2307    let mut i = 0;
2308    while i < args.len() {
2309        let arg = args[i].to_string_lossy();
2310
2311        if arg == "--schematic" {
2312            enabled = true;
2313            i += 1;
2314            continue;
2315        }
2316
2317        if arg == "--schematic-output" || arg == "--output" {
2318            if let Some(next) = args.get(i + 1) {
2319                output = Some(PathBuf::from(next));
2320                i += 2;
2321                continue;
2322            }
2323        } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
2324            output = Some(PathBuf::from(value));
2325        } else if let Some(value) = arg.strip_prefix("--output=") {
2326            output = Some(PathBuf::from(value));
2327        }
2328
2329        i += 1;
2330    }
2331
2332    if enabled {
2333        Some(SchematicExportRequest { output })
2334    } else {
2335        None
2336    }
2337}
2338
2339fn env_flag_is_true(key: &str) -> bool {
2340    match std::env::var(key) {
2341        Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
2342        Err(_) => false,
2343    }
2344}
2345
2346fn inspector_enabled_from_env() -> bool {
2347    let raw = std::env::var("RANVIER_INSPECTOR").ok();
2348    inspector_enabled_from_value(raw.as_deref())
2349}
2350
2351fn inspector_enabled_from_value(value: Option<&str>) -> bool {
2352    match value {
2353        Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
2354        None => true,
2355    }
2356}
2357
2358fn inspector_dev_mode_from_env() -> bool {
2359    let raw = std::env::var("RANVIER_MODE").ok();
2360    inspector_dev_mode_from_value(raw.as_deref())
2361}
2362
2363fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
2364    !matches!(
2365        value.map(|v| v.to_ascii_lowercase()),
2366        Some(mode) if mode == "prod" || mode == "production"
2367    )
2368}
2369
2370fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
2371    let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
2372        Ok(v) if !v.trim().is_empty() => v,
2373        _ => return,
2374    };
2375
2376    let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
2377    let policy = timeline_adaptive_policy();
2378    let forced = should_force_export(outcome, &policy);
2379    let should_export = sampled || forced;
2380    if !should_export {
2381        record_sampling_stats(false, sampled, forced, "none", &policy);
2382        return;
2383    }
2384
2385    let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
2386    timeline.sort();
2387
2388    let mode = std::env::var("RANVIER_TIMELINE_MODE")
2389        .unwrap_or_else(|_| "overwrite".to_string())
2390        .to_ascii_lowercase();
2391
2392    if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
2393        tracing::warn!(
2394            "Failed to persist timeline file {} (mode={}): {}",
2395            path,
2396            mode,
2397            err
2398        );
2399        record_sampling_stats(false, sampled, forced, &mode, &policy);
2400    } else {
2401        record_sampling_stats(true, sampled, forced, &mode, &policy);
2402    }
2403}
2404
2405fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
2406    match outcome {
2407        Outcome::Next(_) => "Next".to_string(),
2408        Outcome::Branch(id, _) => format!("Branch:{}", id),
2409        Outcome::Jump(id, _) => format!("Jump:{}", id),
2410        Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
2411        Outcome::Fault(_) => "Fault".to_string(),
2412    }
2413}
2414
2415fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
2416    match outcome {
2417        Outcome::Next(_) => "Next",
2418        Outcome::Branch(_, _) => "Branch",
2419        Outcome::Jump(_, _) => "Jump",
2420        Outcome::Emit(_, _) => "Emit",
2421        Outcome::Fault(_) => "Fault",
2422    }
2423}
2424
2425fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
2426    match outcome {
2427        Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
2428        Outcome::Jump(node_id, _) => Some(node_id.to_string()),
2429        Outcome::Emit(event_type, _) => Some(event_type.clone()),
2430        Outcome::Next(_) | Outcome::Fault(_) => None,
2431    }
2432}
2433
2434fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
2435    match outcome {
2436        Outcome::Fault(_) => CompletionState::Fault,
2437        _ => CompletionState::Success,
2438    }
2439}
2440
2441fn persistence_trace_id(bus: &Bus) -> String {
2442    if let Some(explicit) = bus.read::<PersistenceTraceId>() {
2443        explicit.0.clone()
2444    } else {
2445        format!("{}:{}", bus.id, now_ms())
2446    }
2447}
2448
2449fn persistence_auto_complete(bus: &Bus) -> bool {
2450    bus.read::<PersistenceAutoComplete>()
2451        .map(|v| v.0)
2452        .unwrap_or(true)
2453}
2454
2455fn compensation_auto_trigger(bus: &Bus) -> bool {
2456    bus.read::<CompensationAutoTrigger>()
2457        .map(|v| v.0)
2458        .unwrap_or(true)
2459}
2460
2461fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
2462    bus.read::<CompensationRetryPolicy>()
2463        .copied()
2464        .unwrap_or_default()
2465}
2466
2467/// Unwrap the Outcome enum layer from a persisted event payload.
2468///
2469/// Events are stored with `outcome.to_json_value()` which serializes the full
2470/// Outcome enum, e.g. `{"Next": {"order_id": "1001", ...}}`. The resumption
2471/// handler expects the raw inner value, so we extract it here.
2472fn unwrap_outcome_payload(payload: Option<&serde_json::Value>) -> Option<serde_json::Value> {
2473    payload.map(|p| {
2474        p.get("Next")
2475            .or_else(|| p.get("Branch"))
2476            .or_else(|| p.get("Jump"))
2477            .cloned()
2478            .unwrap_or_else(|| p.clone())
2479    })
2480}
2481
2482async fn load_persistence_version(
2483    handle: &PersistenceHandle,
2484    trace_id: &str,
2485) -> (
2486    u64,
2487    Option<String>,
2488    Option<crate::persistence::Intervention>,
2489    Option<String>,
2490    Option<serde_json::Value>,
2491) {
2492    let store = handle.store();
2493    match store.load(trace_id).await {
2494        Ok(Some(trace)) => {
2495            let (next_step, last_node_id, last_payload) =
2496                if let Some(resume_from_step) = trace.resumed_from_step {
2497                    let anchor_event = trace
2498                        .events
2499                        .iter()
2500                        .rev()
2501                        .find(|event| {
2502                            event.step <= resume_from_step
2503                                && event.outcome_kind == "Next"
2504                                && event.payload.is_some()
2505                        })
2506                        .or_else(|| {
2507                            trace.events.iter().rev().find(|event| {
2508                                event.step <= resume_from_step
2509                                    && event.outcome_kind != "Fault"
2510                                    && event.payload.is_some()
2511                            })
2512                        })
2513                        .or_else(|| {
2514                            trace.events.iter().rev().find(|event| {
2515                                event.step <= resume_from_step && event.payload.is_some()
2516                            })
2517                        })
2518                        .or_else(|| trace.events.last());
2519
2520                    (
2521                        resume_from_step.saturating_add(1),
2522                        anchor_event.and_then(|event| event.node_id.clone()),
2523                        anchor_event.and_then(|event| {
2524                            unwrap_outcome_payload(event.payload.as_ref())
2525                        }),
2526                    )
2527                } else {
2528                    let last_event = trace.events.last();
2529                    (
2530                        last_event
2531                            .map(|event| event.step.saturating_add(1))
2532                            .unwrap_or(0),
2533                        last_event.and_then(|event| event.node_id.clone()),
2534                        last_event.and_then(|event| {
2535                            unwrap_outcome_payload(event.payload.as_ref())
2536                        }),
2537                    )
2538                };
2539
2540            (
2541                next_step,
2542                Some(trace.schematic_version),
2543                trace.interventions.last().cloned(),
2544                last_node_id,
2545                last_payload,
2546            )
2547        }
2548        Ok(None) => (0, None, None, None, None),
2549        Err(err) => {
2550            tracing::warn!(
2551                trace_id = %trace_id,
2552                error = %err,
2553                "Failed to load persistence trace; falling back to step=0"
2554            );
2555            (0, None, None, None, None)
2556        }
2557    }
2558}
2559
2560#[allow(clippy::too_many_arguments)]
2561async fn run_this_step<In, Out, E, Res>(
2562    trans: &(impl Transition<In, Out, Resources = Res, Error = E> + Clone + 'static),
2563    state: In,
2564    res: &Res,
2565    bus: &mut Bus,
2566    node_id: &str,
2567    node_label: &str,
2568    bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2569    step_idx: u64,
2570) -> Outcome<Out, E>
2571where
2572    In: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2573    Out: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2574    E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2575    Res: ranvier_core::transition::ResourceRequirement,
2576{
2577    let label = trans.label();
2578    let res_type = std::any::type_name::<Res>()
2579        .split("::")
2580        .last()
2581        .unwrap_or("unknown");
2582
2583    // Debug pausing
2584    let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2585        debug.should_pause(node_id)
2586    } else {
2587        false
2588    };
2589
2590    if should_pause {
2591        let trace_id = persistence_trace_id(bus);
2592        tracing::event!(
2593            target: "ranvier.debugger",
2594            tracing::Level::INFO,
2595            trace_id = %trace_id,
2596            node_id = %node_id,
2597            "Node paused"
2598        );
2599
2600        if let Some(timeline) = bus.read_mut::<Timeline>() {
2601            timeline.push(TimelineEvent::NodePaused {
2602                node_id: node_id.to_string(),
2603                timestamp: now_ms(),
2604            });
2605        }
2606        if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2607            debug.wait().await;
2608        }
2609    }
2610
2611    let enter_ts = now_ms();
2612    if let Some(timeline) = bus.read_mut::<Timeline>() {
2613        timeline.push(TimelineEvent::NodeEnter {
2614            node_id: node_id.to_string(),
2615            node_label: node_label.to_string(),
2616            timestamp: enter_ts,
2617        });
2618    }
2619
2620    // Check DLQ retry policy and pre-serialize state for potential retries
2621    let dlq_retry_config = bus.read::<DlqPolicy>().and_then(|p| {
2622        if let DlqPolicy::RetryThenDlq {
2623            max_attempts,
2624            backoff_ms,
2625        } = p
2626        {
2627            Some((*max_attempts, *backoff_ms))
2628        } else {
2629            None
2630        }
2631    });
2632    let retry_state_snapshot = if dlq_retry_config.is_some() {
2633        serde_json::to_value(&state).ok()
2634    } else {
2635        None
2636    };
2637
2638    // State capture for Saga - SERIALIZE BEFORE CONSUMPTION
2639    let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2640        Some(serde_json::to_vec(&state).unwrap_or_default())
2641    } else {
2642        None
2643    };
2644
2645    let node_span = tracing::info_span!(
2646        "Node",
2647        ranvier.node = %label,
2648        ranvier.resource_type = %res_type,
2649        ranvier.outcome_kind = tracing::field::Empty,
2650        ranvier.outcome_target = tracing::field::Empty
2651    );
2652    let started = std::time::Instant::now();
2653    bus.set_access_policy(label.clone(), bus_policy.clone());
2654    let result = trans
2655        .run(state, res, bus)
2656        .instrument(node_span.clone())
2657        .await;
2658    bus.clear_access_policy();
2659
2660    // DLQ Retry loop: if first attempt faulted and RetryThenDlq is configured,
2661    // retry with exponential backoff before giving up.
2662    let result = if let Outcome::Fault(_) = &result {
2663        if let (Some((max_attempts, backoff_ms)), Some(snapshot)) =
2664            (dlq_retry_config, &retry_state_snapshot)
2665        {
2666            let mut final_result = result;
2667            // attempt 1 already done; retry from 2..=max_attempts
2668            for attempt in 2..=max_attempts {
2669                let delay = backoff_ms.saturating_mul(2u64.saturating_pow(attempt - 2));
2670
2671                tracing::info!(
2672                    ranvier.node = %label,
2673                    attempt = attempt,
2674                    max_attempts = max_attempts,
2675                    backoff_ms = delay,
2676                    "Retrying faulted node"
2677                );
2678
2679                if let Some(timeline) = bus.read_mut::<Timeline>() {
2680                    timeline.push(TimelineEvent::NodeRetry {
2681                        node_id: node_id.to_string(),
2682                        attempt,
2683                        max_attempts,
2684                        backoff_ms: delay,
2685                        timestamp: now_ms(),
2686                    });
2687                }
2688
2689                tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
2690
2691                if let Ok(retry_state) = serde_json::from_value::<In>(snapshot.clone()) {
2692                    bus.set_access_policy(label.clone(), bus_policy.clone());
2693                    let retry_result = trans
2694                        .run(retry_state, res, bus)
2695                        .instrument(tracing::info_span!(
2696                            "NodeRetry",
2697                            ranvier.node = %label,
2698                            attempt = attempt
2699                        ))
2700                        .await;
2701                    bus.clear_access_policy();
2702
2703                    match &retry_result {
2704                        Outcome::Fault(_) => {
2705                            final_result = retry_result;
2706                        }
2707                        _ => {
2708                            final_result = retry_result;
2709                            break;
2710                        }
2711                    }
2712                }
2713            }
2714            final_result
2715        } else {
2716            result
2717        }
2718    } else {
2719        result
2720    };
2721
2722    node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
2723    if let Some(target) = outcome_target(&result) {
2724        node_span.record("ranvier.outcome_target", tracing::field::display(&target));
2725    }
2726    let duration_ms = started.elapsed().as_millis() as u64;
2727    let exit_ts = now_ms();
2728
2729    if let Some(timeline) = bus.read_mut::<Timeline>() {
2730        timeline.push(TimelineEvent::NodeExit {
2731            node_id: node_id.to_string(),
2732            outcome_type: outcome_type_name(&result),
2733            duration_ms,
2734            timestamp: exit_ts,
2735        });
2736
2737        if let Outcome::Branch(branch_id, _) = &result {
2738            timeline.push(TimelineEvent::Branchtaken {
2739                branch_id: branch_id.clone(),
2740                timestamp: exit_ts,
2741            });
2742        }
2743    }
2744
2745    // Push to Saga Stack if Next outcome and snapshot taken
2746    if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot)
2747        && let Some(stack) = bus.read_mut::<SagaStack>()
2748    {
2749        stack.push(node_id.to_string(), label.clone(), snapshot);
2750    }
2751
2752    if let Some(handle) = bus.read::<PersistenceHandle>() {
2753        let trace_id = persistence_trace_id(bus);
2754        let circuit = bus
2755            .read::<ranvier_core::schematic::Schematic>()
2756            .map(|s| s.name.clone())
2757            .unwrap_or_default();
2758        let version = bus
2759            .read::<ranvier_core::schematic::Schematic>()
2760            .map(|s| s.schema_version.clone())
2761            .unwrap_or_default();
2762
2763        persist_execution_event(
2764            handle,
2765            &trace_id,
2766            &circuit,
2767            &version,
2768            step_idx,
2769            Some(node_id.to_string()),
2770            outcome_kind_name(&result),
2771            Some(result.to_json_value()),
2772        )
2773        .await;
2774    }
2775
2776    // DLQ reporting — only fires after all retries are exhausted (RetryThenDlq)
2777    // or immediately (SendToDlq). Drop policy skips entirely.
2778    if let Outcome::Fault(f) = &result {
2779        // Read policy and sink, then drop the borrows before mutable timeline access
2780        let dlq_action = {
2781            let policy = bus.read::<DlqPolicy>();
2782            let sink = bus.read::<Arc<dyn DlqSink>>();
2783            match (sink, policy) {
2784                (Some(s), Some(p)) if !matches!(p, DlqPolicy::Drop) => Some(s.clone()),
2785                _ => None,
2786            }
2787        };
2788
2789        if let Some(sink) = dlq_action {
2790            if let Some((max_attempts, _)) = dlq_retry_config
2791                && let Some(timeline) = bus.read_mut::<Timeline>()
2792            {
2793                timeline.push(TimelineEvent::DlqExhausted {
2794                    node_id: node_id.to_string(),
2795                    total_attempts: max_attempts,
2796                    timestamp: now_ms(),
2797                });
2798            }
2799
2800            let trace_id = persistence_trace_id(bus);
2801            let circuit = bus
2802                .read::<ranvier_core::schematic::Schematic>()
2803                .map(|s| s.name.clone())
2804                .unwrap_or_default();
2805            let _ = sink
2806                .store_dead_letter(
2807                    &trace_id,
2808                    &circuit,
2809                    node_id,
2810                    &format!("{:?}", f),
2811                    &serde_json::to_vec(&f).unwrap_or_default(),
2812                )
2813                .await;
2814        }
2815    }
2816
2817    result
2818}
2819
2820#[allow(clippy::too_many_arguments)]
2821async fn run_this_compensated_step<Out, Next, E, Res, Comp>(
2822    trans: &(impl Transition<Out, Next, Resources = Res, Error = E> + Clone + 'static),
2823    comp: &Comp,
2824    state: Out,
2825    res: &Res,
2826    bus: &mut Bus,
2827    node_id: &str,
2828    comp_node_id: &str,
2829    node_label: &str,
2830    bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2831    step_idx: u64,
2832) -> Outcome<Next, E>
2833where
2834    Out: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
2835    Next: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2836    E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2837    Res: ranvier_core::transition::ResourceRequirement,
2838    Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
2839{
2840    let label = trans.label();
2841
2842    // Debug pausing
2843    let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2844        debug.should_pause(node_id)
2845    } else {
2846        false
2847    };
2848
2849    if should_pause {
2850        let trace_id = persistence_trace_id(bus);
2851        tracing::event!(
2852            target: "ranvier.debugger",
2853            tracing::Level::INFO,
2854            trace_id = %trace_id,
2855            node_id = %node_id,
2856            "Node paused (compensated)"
2857        );
2858
2859        if let Some(timeline) = bus.read_mut::<Timeline>() {
2860            timeline.push(TimelineEvent::NodePaused {
2861                node_id: node_id.to_string(),
2862                timestamp: now_ms(),
2863            });
2864        }
2865        if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2866            debug.wait().await;
2867        }
2868    }
2869
2870    let enter_ts = now_ms();
2871    if let Some(timeline) = bus.read_mut::<Timeline>() {
2872        timeline.push(TimelineEvent::NodeEnter {
2873            node_id: node_id.to_string(),
2874            node_label: node_label.to_string(),
2875            timestamp: enter_ts,
2876        });
2877    }
2878
2879    // State capture for Saga - SERIALIZE BEFORE CONSUMPTION
2880    let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2881        Some(serde_json::to_vec(&state).unwrap_or_default())
2882    } else {
2883        None
2884    };
2885
2886    let node_span = tracing::info_span!("Node", ranvier.node = %label);
2887    bus.set_access_policy(label.clone(), bus_policy.clone());
2888    let result = trans
2889        .run(state.clone(), res, bus)
2890        .instrument(node_span)
2891        .await;
2892    bus.clear_access_policy();
2893
2894    let duration_ms = 0; // Simplified
2895    let exit_ts = now_ms();
2896
2897    if let Some(timeline) = bus.read_mut::<Timeline>() {
2898        timeline.push(TimelineEvent::NodeExit {
2899            node_id: node_id.to_string(),
2900            outcome_type: outcome_type_name(&result),
2901            duration_ms,
2902            timestamp: exit_ts,
2903        });
2904    }
2905
2906    // Automated Compensation Trigger
2907    if let Outcome::Fault(ref err) = result {
2908        if compensation_auto_trigger(bus) {
2909            tracing::info!(
2910                ranvier.node = %label,
2911                ranvier.compensation.trigger = "saga",
2912                error = ?err,
2913                "Saga compensation triggered"
2914            );
2915
2916            if let Some(timeline) = bus.read_mut::<Timeline>() {
2917                timeline.push(TimelineEvent::NodeEnter {
2918                    node_id: comp_node_id.to_string(),
2919                    node_label: format!("Compensate: {}", comp.label()),
2920                    timestamp: exit_ts,
2921                });
2922            }
2923
2924            // Run compensation
2925            let _ = comp.run(state, res, bus).await;
2926
2927            if let Some(timeline) = bus.read_mut::<Timeline>() {
2928                timeline.push(TimelineEvent::NodeExit {
2929                    node_id: comp_node_id.to_string(),
2930                    outcome_type: "Compensated".to_string(),
2931                    duration_ms: 0,
2932                    timestamp: now_ms(),
2933                });
2934            }
2935
2936            if let Some(handle) = bus.read::<PersistenceHandle>() {
2937                let trace_id = persistence_trace_id(bus);
2938                let circuit = bus
2939                    .read::<ranvier_core::schematic::Schematic>()
2940                    .map(|s| s.name.clone())
2941                    .unwrap_or_default();
2942                let version = bus
2943                    .read::<ranvier_core::schematic::Schematic>()
2944                    .map(|s| s.schema_version.clone())
2945                    .unwrap_or_default();
2946
2947                persist_execution_event(
2948                    handle,
2949                    &trace_id,
2950                    &circuit,
2951                    &version,
2952                    step_idx + 1, // Compensation node index
2953                    Some(comp_node_id.to_string()),
2954                    "Compensated",
2955                    None,
2956                )
2957                .await;
2958            }
2959        }
2960    } else if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot) {
2961        // Push to Saga Stack if Next outcome and snapshot taken
2962        if let Some(stack) = bus.read_mut::<SagaStack>() {
2963            stack.push(node_id.to_string(), label.clone(), snapshot);
2964        }
2965
2966        if let Some(handle) = bus.read::<PersistenceHandle>() {
2967            let trace_id = persistence_trace_id(bus);
2968            let circuit = bus
2969                .read::<ranvier_core::schematic::Schematic>()
2970                .map(|s| s.name.clone())
2971                .unwrap_or_default();
2972            let version = bus
2973                .read::<ranvier_core::schematic::Schematic>()
2974                .map(|s| s.schema_version.clone())
2975                .unwrap_or_default();
2976
2977            persist_execution_event(
2978                handle,
2979                &trace_id,
2980                &circuit,
2981                &version,
2982                step_idx,
2983                Some(node_id.to_string()),
2984                outcome_kind_name(&result),
2985                Some(result.to_json_value()),
2986            )
2987            .await;
2988        }
2989    }
2990
2991    // DLQ reporting for compensated steps
2992    if let Outcome::Fault(f) = &result
2993        && let (Some(sink), Some(policy)) =
2994            (bus.read::<Arc<dyn DlqSink>>(), bus.read::<DlqPolicy>())
2995    {
2996        let should_dlq = !matches!(policy, DlqPolicy::Drop);
2997        if should_dlq {
2998            let trace_id = persistence_trace_id(bus);
2999            let circuit = bus
3000                .read::<ranvier_core::schematic::Schematic>()
3001                .map(|s| s.name.clone())
3002                .unwrap_or_default();
3003            let _ = sink
3004                .store_dead_letter(
3005                    &trace_id,
3006                    &circuit,
3007                    node_id,
3008                    &format!("{:?}", f),
3009                    &serde_json::to_vec(&f).unwrap_or_default(),
3010                )
3011                .await;
3012        }
3013    }
3014
3015    result
3016}
3017
3018#[allow(clippy::too_many_arguments)]
3019pub async fn persist_execution_event(
3020    handle: &PersistenceHandle,
3021    trace_id: &str,
3022    circuit: &str,
3023    schematic_version: &str,
3024    step: u64,
3025    node_id: Option<String>,
3026    outcome_kind: &str,
3027    payload: Option<serde_json::Value>,
3028) {
3029    let store = handle.store();
3030    let envelope = PersistenceEnvelope {
3031        trace_id: trace_id.to_string(),
3032        circuit: circuit.to_string(),
3033        schematic_version: schematic_version.to_string(),
3034        step,
3035        node_id,
3036        outcome_kind: outcome_kind.to_string(),
3037        timestamp_ms: now_ms(),
3038        payload_hash: None,
3039        payload,
3040    };
3041
3042    if let Err(err) = store.append(envelope).await {
3043        tracing::warn!(
3044            trace_id = %trace_id,
3045            circuit = %circuit,
3046            step,
3047            outcome_kind = %outcome_kind,
3048            error = %err,
3049            "Failed to append persistence envelope"
3050        );
3051    }
3052}
3053
3054async fn persist_completion(
3055    handle: &PersistenceHandle,
3056    trace_id: &str,
3057    completion: CompletionState,
3058) {
3059    let store = handle.store();
3060    if let Err(err) = store.complete(trace_id, completion).await {
3061        tracing::warn!(
3062            trace_id = %trace_id,
3063            error = %err,
3064            "Failed to complete persistence trace"
3065        );
3066    }
3067}
3068
3069fn compensation_idempotency_key(context: &CompensationContext) -> String {
3070    format!(
3071        "{}:{}:{}",
3072        context.trace_id, context.circuit, context.fault_kind
3073    )
3074}
3075
3076async fn run_compensation(
3077    handle: &CompensationHandle,
3078    context: CompensationContext,
3079    retry_policy: CompensationRetryPolicy,
3080    idempotency: Option<CompensationIdempotencyHandle>,
3081) -> bool {
3082    let hook = handle.hook();
3083    let key = compensation_idempotency_key(&context);
3084
3085    if let Some(store_handle) = idempotency.as_ref() {
3086        let store = store_handle.store();
3087        match store.was_compensated(&key).await {
3088            Ok(true) => {
3089                tracing::info!(
3090                    trace_id = %context.trace_id,
3091                    circuit = %context.circuit,
3092                    key = %key,
3093                    "Compensation already recorded; skipping duplicate hook execution"
3094                );
3095                return true;
3096            }
3097            Ok(false) => {}
3098            Err(err) => {
3099                tracing::warn!(
3100                    trace_id = %context.trace_id,
3101                    key = %key,
3102                    error = %err,
3103                    "Failed to check compensation idempotency state"
3104                );
3105            }
3106        }
3107    }
3108
3109    let max_attempts = retry_policy.max_attempts.max(1);
3110    for attempt in 1..=max_attempts {
3111        match hook.compensate(context.clone()).await {
3112            Ok(()) => {
3113                if let Some(store_handle) = idempotency.as_ref() {
3114                    let store = store_handle.store();
3115                    if let Err(err) = store.mark_compensated(&key).await {
3116                        tracing::warn!(
3117                            trace_id = %context.trace_id,
3118                            key = %key,
3119                            error = %err,
3120                            "Failed to mark compensation idempotency state"
3121                        );
3122                    }
3123                }
3124                return true;
3125            }
3126            Err(err) => {
3127                let is_last = attempt == max_attempts;
3128                tracing::warn!(
3129                    trace_id = %context.trace_id,
3130                    circuit = %context.circuit,
3131                    fault_kind = %context.fault_kind,
3132                    fault_step = context.fault_step,
3133                    attempt,
3134                    max_attempts,
3135                    error = %err,
3136                    "Compensation hook attempt failed"
3137                );
3138                if !is_last && retry_policy.backoff_ms > 0 {
3139                    tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
3140                        .await;
3141                }
3142            }
3143        }
3144    }
3145    false
3146}
3147
3148fn ensure_timeline(bus: &mut Bus) -> bool {
3149    if bus.has::<Timeline>() {
3150        false
3151    } else {
3152        bus.insert(Timeline::new());
3153        true
3154    }
3155}
3156
3157fn should_attach_timeline(bus: &Bus) -> bool {
3158    // Respect explicitly provided timeline collector from caller.
3159    if bus.has::<Timeline>() {
3160        return true;
3161    }
3162
3163    // Attach timeline when runtime export path exists.
3164    has_timeline_output_path()
3165}
3166
3167fn has_timeline_output_path() -> bool {
3168    std::env::var("RANVIER_TIMELINE_OUTPUT")
3169        .ok()
3170        .map(|v| !v.trim().is_empty())
3171        .unwrap_or(false)
3172}
3173
3174fn timeline_sample_rate() -> f64 {
3175    std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
3176        .ok()
3177        .and_then(|v| v.parse::<f64>().ok())
3178        .map(|v| v.clamp(0.0, 1.0))
3179        .unwrap_or(1.0)
3180}
3181
3182fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
3183    if rate <= 0.0 {
3184        return false;
3185    }
3186    if rate >= 1.0 {
3187        return true;
3188    }
3189    let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
3190    bucket < rate
3191}
3192
3193fn timeline_adaptive_policy() -> String {
3194    std::env::var("RANVIER_TIMELINE_ADAPTIVE")
3195        .unwrap_or_else(|_| "fault_branch".to_string())
3196        .to_ascii_lowercase()
3197}
3198
3199fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
3200    match policy {
3201        "off" => false,
3202        "fault_only" => matches!(outcome, Outcome::Fault(_)),
3203        "fault_branch_emit" => {
3204            matches!(
3205                outcome,
3206                Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
3207            )
3208        }
3209        _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
3210    }
3211}
3212
3213#[derive(Default, Clone)]
3214struct SamplingStats {
3215    total_decisions: u64,
3216    exported: u64,
3217    skipped: u64,
3218    sampled_exports: u64,
3219    forced_exports: u64,
3220    last_mode: String,
3221    last_policy: String,
3222    last_updated_ms: u64,
3223}
3224
3225static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
3226
3227fn stats_cell() -> &'static Mutex<SamplingStats> {
3228    TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
3229}
3230
3231fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
3232    let snapshot = {
3233        let mut stats = match stats_cell().lock() {
3234            Ok(guard) => guard,
3235            Err(_) => return,
3236        };
3237
3238        stats.total_decisions += 1;
3239        if exported {
3240            stats.exported += 1;
3241        } else {
3242            stats.skipped += 1;
3243        }
3244        if sampled && exported {
3245            stats.sampled_exports += 1;
3246        }
3247        if forced && exported {
3248            stats.forced_exports += 1;
3249        }
3250        stats.last_mode = mode.to_string();
3251        stats.last_policy = policy.to_string();
3252        stats.last_updated_ms = now_ms();
3253        stats.clone()
3254    };
3255
3256    tracing::debug!(
3257        ranvier.timeline.total_decisions = snapshot.total_decisions,
3258        ranvier.timeline.exported = snapshot.exported,
3259        ranvier.timeline.skipped = snapshot.skipped,
3260        ranvier.timeline.sampled_exports = snapshot.sampled_exports,
3261        ranvier.timeline.forced_exports = snapshot.forced_exports,
3262        ranvier.timeline.mode = %snapshot.last_mode,
3263        ranvier.timeline.policy = %snapshot.last_policy,
3264        "Timeline sampling stats updated"
3265    );
3266
3267    if let Some(path) = timeline_stats_output_path() {
3268        let payload = serde_json::json!({
3269            "total_decisions": snapshot.total_decisions,
3270            "exported": snapshot.exported,
3271            "skipped": snapshot.skipped,
3272            "sampled_exports": snapshot.sampled_exports,
3273            "forced_exports": snapshot.forced_exports,
3274            "last_mode": snapshot.last_mode,
3275            "last_policy": snapshot.last_policy,
3276            "last_updated_ms": snapshot.last_updated_ms
3277        });
3278        if let Some(parent) = Path::new(&path).parent() {
3279            let _ = fs::create_dir_all(parent);
3280        }
3281        if let Err(err) = fs::write(&path, payload.to_string()) {
3282            tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
3283        }
3284    }
3285}
3286
3287fn timeline_stats_output_path() -> Option<String> {
3288    std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
3289        .ok()
3290        .filter(|v| !v.trim().is_empty())
3291}
3292
3293fn write_timeline_with_policy(
3294    path: &str,
3295    mode: &str,
3296    mut timeline: Timeline,
3297) -> Result<(), String> {
3298    match mode {
3299        "append" => {
3300            if Path::new(path).exists() {
3301                let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
3302                match serde_json::from_str::<Timeline>(&content) {
3303                    Ok(mut existing) => {
3304                        existing.events.append(&mut timeline.events);
3305                        existing.sort();
3306                        if let Some(max_events) = max_events_limit() {
3307                            truncate_timeline_events(&mut existing, max_events);
3308                        }
3309                        write_timeline_json(path, &existing)
3310                    }
3311                    Err(_) => {
3312                        // Fallback: if existing is invalid, replace with current timeline
3313                        if let Some(max_events) = max_events_limit() {
3314                            truncate_timeline_events(&mut timeline, max_events);
3315                        }
3316                        write_timeline_json(path, &timeline)
3317                    }
3318                }
3319            } else {
3320                if let Some(max_events) = max_events_limit() {
3321                    truncate_timeline_events(&mut timeline, max_events);
3322                }
3323                write_timeline_json(path, &timeline)
3324            }
3325        }
3326        "rotate" => {
3327            let rotated_path = rotated_path(path, now_ms());
3328            write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
3329            if let Some(keep) = rotate_keep_limit() {
3330                cleanup_rotated_files(path, keep)?;
3331            }
3332            Ok(())
3333        }
3334        _ => write_timeline_json(path, &timeline),
3335    }
3336}
3337
3338fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
3339    if let Some(parent) = Path::new(path).parent()
3340        && !parent.as_os_str().is_empty()
3341    {
3342        fs::create_dir_all(parent).map_err(|e| e.to_string())?;
3343    }
3344    let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
3345    fs::write(path, json).map_err(|e| e.to_string())
3346}
3347
3348fn rotated_path(path: &str, suffix: u64) -> PathBuf {
3349    let p = Path::new(path);
3350    let parent = p.parent().unwrap_or_else(|| Path::new(""));
3351    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
3352    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
3353    parent.join(format!("{}_{}.{}", stem, suffix, ext))
3354}
3355
3356fn max_events_limit() -> Option<usize> {
3357    std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
3358        .ok()
3359        .and_then(|v| v.parse::<usize>().ok())
3360        .filter(|v| *v > 0)
3361}
3362
3363fn rotate_keep_limit() -> Option<usize> {
3364    std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
3365        .ok()
3366        .and_then(|v| v.parse::<usize>().ok())
3367        .filter(|v| *v > 0)
3368}
3369
3370fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
3371    let len = timeline.events.len();
3372    if len > max_events {
3373        let keep_from = len - max_events;
3374        timeline.events = timeline.events.split_off(keep_from);
3375    }
3376}
3377
3378fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
3379    let p = Path::new(base_path);
3380    let parent = p.parent().unwrap_or_else(|| Path::new("."));
3381    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
3382    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
3383    let prefix = format!("{}_", stem);
3384    let suffix = format!(".{}", ext);
3385
3386    let mut files = fs::read_dir(parent)
3387        .map_err(|e| e.to_string())?
3388        .filter_map(|entry| entry.ok())
3389        .filter(|entry| {
3390            let name = entry.file_name();
3391            let name = name.to_string_lossy();
3392            name.starts_with(&prefix) && name.ends_with(&suffix)
3393        })
3394        .map(|entry| {
3395            let modified = entry
3396                .metadata()
3397                .ok()
3398                .and_then(|m| m.modified().ok())
3399                .unwrap_or(SystemTime::UNIX_EPOCH);
3400            (entry.path(), modified)
3401        })
3402        .collect::<Vec<_>>();
3403
3404    files.sort_by(|a, b| b.1.cmp(&a.1));
3405    for (path, _) in files.into_iter().skip(keep) {
3406        let _ = fs::remove_file(path);
3407    }
3408    Ok(())
3409}
3410
3411fn bus_capability_schema_from_policy(
3412    policy: Option<ranvier_core::bus::BusAccessPolicy>,
3413) -> Option<BusCapabilitySchema> {
3414    let policy = policy?;
3415
3416    let mut allow = policy
3417        .allow
3418        .unwrap_or_default()
3419        .into_iter()
3420        .map(|entry| entry.type_name.to_string())
3421        .collect::<Vec<_>>();
3422    let mut deny = policy
3423        .deny
3424        .into_iter()
3425        .map(|entry| entry.type_name.to_string())
3426        .collect::<Vec<_>>();
3427    allow.sort();
3428    deny.sort();
3429
3430    if allow.is_empty() && deny.is_empty() {
3431        return None;
3432    }
3433
3434    Some(BusCapabilitySchema { allow, deny })
3435}
3436
3437fn now_ms() -> u64 {
3438    SystemTime::now()
3439        .duration_since(UNIX_EPOCH)
3440        .map(|d| d.as_millis() as u64)
3441        .unwrap_or(0)
3442}
3443
3444#[cfg(test)]
3445mod tests {
3446    use super::{
3447        Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
3448        should_force_export,
3449    };
3450    use crate::persistence::{
3451        CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
3452        CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
3453        InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
3454        PersistenceHandle, PersistenceStore, PersistenceTraceId,
3455    };
3456    use anyhow::Result;
3457    use async_trait::async_trait;
3458    use ranvier_audit::{AuditError, AuditEvent, AuditSink};
3459    use ranvier_core::event::{DlqPolicy, DlqSink};
3460    use ranvier_core::saga::SagaStack;
3461    use ranvier_core::timeline::{Timeline, TimelineEvent};
3462    use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
3463    use serde::{Deserialize, Serialize};
3464    use std::sync::Arc;
3465    use tokio::sync::Mutex;
3466    use uuid::Uuid;
3467
3468    struct MockAuditSink {
3469        events: Arc<Mutex<Vec<AuditEvent>>>,
3470    }
3471
3472    #[async_trait]
3473    impl AuditSink for MockAuditSink {
3474        async fn append(&self, event: &AuditEvent) -> Result<(), AuditError> {
3475            self.events.lock().await.push(event.clone());
3476            Ok(())
3477        }
3478    }
3479
3480    #[tokio::test]
3481    async fn execute_logs_audit_events_for_intervention() {
3482        use ranvier_inspector::StateInspector;
3483
3484        let trace_id = "test-audit-trace";
3485        let store_impl = InMemoryPersistenceStore::new();
3486        let events = Arc::new(Mutex::new(Vec::new()));
3487        let sink = MockAuditSink {
3488            events: events.clone(),
3489        };
3490
3491        let axon = Axon::<i32, i32, TestInfallible>::start("AuditTest")
3492            .then(AddOne)
3493            .with_persistence_store(store_impl.clone())
3494            .with_audit_sink(sink);
3495
3496        let mut bus = Bus::new();
3497        bus.insert(PersistenceHandle::from_arc(Arc::new(store_impl.clone())));
3498        bus.insert(PersistenceTraceId::new(trace_id));
3499        let target_node_id = axon.schematic.nodes[0].id.clone();
3500
3501        // 0. Pre-requisite: Save an initial trace state so intervention has a target to attach to
3502        store_impl
3503            .append(crate::persistence::PersistenceEnvelope {
3504                trace_id: trace_id.to_string(),
3505                circuit: "AuditTest".to_string(),
3506                schematic_version: "v1.0".to_string(),
3507                step: 0,
3508                node_id: None,
3509                outcome_kind: "Next".to_string(),
3510                timestamp_ms: 0,
3511                payload_hash: None,
3512                payload: None,
3513            })
3514            .await
3515            .unwrap();
3516
3517        // 1. Trigger force_resume (should log ForceResume)
3518        axon.force_resume(trace_id, &target_node_id, None)
3519            .await
3520            .unwrap();
3521
3522        // 2. Execute (should log ApplyIntervention)
3523        axon.execute(10, &(), &mut bus).await;
3524
3525        let recorded = events.lock().await;
3526        assert_eq!(
3527            recorded.len(),
3528            2,
3529            "Should have 2 audit events: ForceResume and ApplyIntervention"
3530        );
3531        assert_eq!(recorded[0].action, "ForceResume");
3532        assert_eq!(recorded[0].target, trace_id);
3533        assert_eq!(recorded[1].action, "ApplyIntervention");
3534        assert_eq!(recorded[1].target, trace_id);
3535    }
3536
3537    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
3538    pub enum TestInfallible {}
3539
3540    #[test]
3541    fn inspector_enabled_flag_matrix() {
3542        assert!(inspector_enabled_from_value(None));
3543        assert!(inspector_enabled_from_value(Some("1")));
3544        assert!(inspector_enabled_from_value(Some("true")));
3545        assert!(inspector_enabled_from_value(Some("on")));
3546        assert!(!inspector_enabled_from_value(Some("0")));
3547        assert!(!inspector_enabled_from_value(Some("false")));
3548    }
3549
3550    #[test]
3551    fn inspector_dev_mode_matrix() {
3552        assert!(inspector_dev_mode_from_value(None));
3553        assert!(inspector_dev_mode_from_value(Some("dev")));
3554        assert!(inspector_dev_mode_from_value(Some("staging")));
3555        assert!(!inspector_dev_mode_from_value(Some("prod")));
3556        assert!(!inspector_dev_mode_from_value(Some("production")));
3557    }
3558
3559    #[test]
3560    fn adaptive_policy_force_export_matrix() {
3561        let next = Outcome::<(), &'static str>::Next(());
3562        let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
3563        let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
3564        let fault = Outcome::<(), &'static str>::Fault("boom");
3565
3566        assert!(!should_force_export(&next, "off"));
3567        assert!(!should_force_export(&fault, "off"));
3568
3569        assert!(!should_force_export(&branch, "fault_only"));
3570        assert!(should_force_export(&fault, "fault_only"));
3571
3572        assert!(should_force_export(&branch, "fault_branch"));
3573        assert!(!should_force_export(&emit, "fault_branch"));
3574        assert!(should_force_export(&fault, "fault_branch"));
3575
3576        assert!(should_force_export(&branch, "fault_branch_emit"));
3577        assert!(should_force_export(&emit, "fault_branch_emit"));
3578        assert!(should_force_export(&fault, "fault_branch_emit"));
3579    }
3580
3581    #[test]
3582    fn sampling_and_adaptive_combination_decisions() {
3583        let bus_id = Uuid::nil();
3584        let next = Outcome::<(), &'static str>::Next(());
3585        let fault = Outcome::<(), &'static str>::Fault("boom");
3586
3587        let sampled_never = sampled_by_bus_id(bus_id, 0.0);
3588        assert!(!sampled_never);
3589        assert!(!(sampled_never || should_force_export(&next, "off")));
3590        assert!(sampled_never || should_force_export(&fault, "fault_only"));
3591
3592        let sampled_always = sampled_by_bus_id(bus_id, 1.0);
3593        assert!(sampled_always);
3594        assert!(sampled_always || should_force_export(&next, "off"));
3595        assert!(sampled_always || should_force_export(&fault, "off"));
3596    }
3597
3598    #[derive(Clone)]
3599    struct AddOne;
3600
3601    #[async_trait]
3602    impl Transition<i32, i32> for AddOne {
3603        type Error = TestInfallible;
3604        type Resources = ();
3605
3606        async fn run(
3607            &self,
3608            state: i32,
3609            _resources: &Self::Resources,
3610            _bus: &mut Bus,
3611        ) -> Outcome<i32, Self::Error> {
3612            Outcome::Next(state + 1)
3613        }
3614    }
3615
3616    #[derive(Clone)]
3617    struct AlwaysFault;
3618
3619    #[async_trait]
3620    impl Transition<i32, i32> for AlwaysFault {
3621        type Error = String;
3622        type Resources = ();
3623
3624        async fn run(
3625            &self,
3626            _state: i32,
3627            _resources: &Self::Resources,
3628            _bus: &mut Bus,
3629        ) -> Outcome<i32, Self::Error> {
3630            Outcome::Fault("boom".to_string())
3631        }
3632    }
3633
3634    #[derive(Clone)]
3635    struct CapabilityGuarded;
3636
3637    #[async_trait]
3638    impl Transition<(), ()> for CapabilityGuarded {
3639        type Error = String;
3640        type Resources = ();
3641
3642        fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
3643            Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
3644        }
3645
3646        async fn run(
3647            &self,
3648            _state: (),
3649            _resources: &Self::Resources,
3650            bus: &mut Bus,
3651        ) -> Outcome<(), Self::Error> {
3652            match bus.get::<String>() {
3653                Ok(_) => Outcome::Next(()),
3654                Err(err) => Outcome::Fault(err.to_string()),
3655            }
3656        }
3657    }
3658
3659    #[derive(Clone)]
3660    struct RecordingCompensationHook {
3661        calls: Arc<Mutex<Vec<CompensationContext>>>,
3662        should_fail: bool,
3663    }
3664
3665    #[async_trait]
3666    impl CompensationHook for RecordingCompensationHook {
3667        async fn compensate(&self, context: CompensationContext) -> Result<()> {
3668            self.calls.lock().await.push(context);
3669            if self.should_fail {
3670                return Err(anyhow::anyhow!("compensation failed"));
3671            }
3672            Ok(())
3673        }
3674    }
3675
3676    #[derive(Clone)]
3677    struct FlakyCompensationHook {
3678        calls: Arc<Mutex<u32>>,
3679        failures_remaining: Arc<Mutex<u32>>,
3680    }
3681
3682    #[async_trait]
3683    impl CompensationHook for FlakyCompensationHook {
3684        async fn compensate(&self, _context: CompensationContext) -> Result<()> {
3685            {
3686                let mut calls = self.calls.lock().await;
3687                *calls += 1;
3688            }
3689            let mut failures_remaining = self.failures_remaining.lock().await;
3690            if *failures_remaining > 0 {
3691                *failures_remaining -= 1;
3692                return Err(anyhow::anyhow!("transient compensation failure"));
3693            }
3694            Ok(())
3695        }
3696    }
3697
3698    #[derive(Clone)]
3699    struct FailingCompensationIdempotencyStore {
3700        read_calls: Arc<Mutex<u32>>,
3701        write_calls: Arc<Mutex<u32>>,
3702    }
3703
3704    #[async_trait]
3705    impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
3706        async fn was_compensated(&self, _key: &str) -> Result<bool> {
3707            let mut read_calls = self.read_calls.lock().await;
3708            *read_calls += 1;
3709            Err(anyhow::anyhow!("forced idempotency read failure"))
3710        }
3711
3712        async fn mark_compensated(&self, _key: &str) -> Result<()> {
3713            let mut write_calls = self.write_calls.lock().await;
3714            *write_calls += 1;
3715            Err(anyhow::anyhow!("forced idempotency write failure"))
3716        }
3717    }
3718
3719    #[tokio::test]
3720    async fn execute_persists_success_trace_when_handle_exists() {
3721        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3722        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3723
3724        let mut bus = Bus::new();
3725        bus.insert(PersistenceHandle::from_arc(store_dyn));
3726        bus.insert(PersistenceTraceId::new("trace-success"));
3727
3728        let axon = Axon::<i32, i32, TestInfallible>::start("PersistSuccess").then(AddOne);
3729        let outcome = axon.execute(41, &(), &mut bus).await;
3730        assert!(matches!(outcome, Outcome::Next(42)));
3731
3732        let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
3733        assert_eq!(persisted.events.len(), 3); // Enter + step-level Next + final Next
3734        assert_eq!(persisted.events[0].outcome_kind, "Enter");
3735        assert_eq!(persisted.events[1].outcome_kind, "Next"); // step-level
3736        assert_eq!(persisted.events[2].outcome_kind, "Next"); // final
3737        assert_eq!(persisted.completion, Some(CompletionState::Success));
3738    }
3739
3740    #[tokio::test]
3741    async fn execute_persists_fault_completion_state() {
3742        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3743        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3744
3745        let mut bus = Bus::new();
3746        bus.insert(PersistenceHandle::from_arc(store_dyn));
3747        bus.insert(PersistenceTraceId::new("trace-fault"));
3748
3749        let axon = Axon::<i32, i32, String>::start("PersistFault").then(AlwaysFault);
3750        let outcome = axon.execute(41, &(), &mut bus).await;
3751        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3752
3753        let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
3754        assert_eq!(persisted.events.len(), 3); // Enter + step-level Fault + final Fault
3755        assert_eq!(persisted.events[1].outcome_kind, "Fault"); // step-level
3756        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3757        assert_eq!(persisted.completion, Some(CompletionState::Fault));
3758    }
3759
3760    #[tokio::test]
3761    async fn execute_respects_persistence_auto_complete_off() {
3762        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3763        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3764
3765        let mut bus = Bus::new();
3766        bus.insert(PersistenceHandle::from_arc(store_dyn));
3767        bus.insert(PersistenceTraceId::new("trace-no-complete"));
3768        bus.insert(PersistenceAutoComplete(false));
3769
3770        let axon = Axon::<i32, i32, TestInfallible>::start("PersistNoComplete").then(AddOne);
3771        let outcome = axon.execute(1, &(), &mut bus).await;
3772        assert!(matches!(outcome, Outcome::Next(2)));
3773
3774        let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
3775        assert_eq!(persisted.events.len(), 3); // Enter + step-level Next + final Next
3776        assert_eq!(persisted.completion, None);
3777    }
3778
3779    #[tokio::test]
3780    async fn fault_triggers_compensation_and_marks_compensated() {
3781        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3782        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3783        let calls = Arc::new(Mutex::new(Vec::new()));
3784        let compensation = RecordingCompensationHook {
3785            calls: calls.clone(),
3786            should_fail: false,
3787        };
3788
3789        let mut bus = Bus::new();
3790        bus.insert(PersistenceHandle::from_arc(store_dyn));
3791        bus.insert(PersistenceTraceId::new("trace-compensated"));
3792        bus.insert(CompensationHandle::from_hook(compensation));
3793
3794        let axon = Axon::<i32, i32, String>::start("CompensatedFault").then(AlwaysFault);
3795        let outcome = axon.execute(7, &(), &mut bus).await;
3796        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3797
3798        let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
3799        assert_eq!(persisted.events.len(), 4); // Enter + step-level Fault + final Fault + Compensated
3800        assert_eq!(persisted.events[0].outcome_kind, "Enter");
3801        assert_eq!(persisted.events[1].outcome_kind, "Fault"); // step-level
3802        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3803        assert_eq!(persisted.events[3].outcome_kind, "Compensated");
3804        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3805
3806        let recorded = calls.lock().await;
3807        assert_eq!(recorded.len(), 1);
3808        assert_eq!(recorded[0].trace_id, "trace-compensated");
3809        assert_eq!(recorded[0].fault_kind, "Fault");
3810    }
3811
3812    #[tokio::test]
3813    async fn failed_compensation_keeps_fault_completion() {
3814        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3815        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3816        let calls = Arc::new(Mutex::new(Vec::new()));
3817        let compensation = RecordingCompensationHook {
3818            calls: calls.clone(),
3819            should_fail: true,
3820        };
3821
3822        let mut bus = Bus::new();
3823        bus.insert(PersistenceHandle::from_arc(store_dyn));
3824        bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
3825        bus.insert(CompensationHandle::from_hook(compensation));
3826
3827        let axon = Axon::<i32, i32, String>::start("CompensationFails").then(AlwaysFault);
3828        let outcome = axon.execute(7, &(), &mut bus).await;
3829        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3830
3831        let persisted = store_impl
3832            .load("trace-compensation-failed")
3833            .await
3834            .unwrap()
3835            .unwrap();
3836        assert_eq!(persisted.events.len(), 3); // Enter + step-level Fault + final Fault
3837        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3838        assert_eq!(persisted.completion, Some(CompletionState::Fault));
3839
3840        let recorded = calls.lock().await;
3841        assert_eq!(recorded.len(), 1);
3842    }
3843
3844    #[tokio::test]
3845    async fn compensation_retry_policy_succeeds_after_retries() {
3846        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3847        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3848        let calls = Arc::new(Mutex::new(0u32));
3849        let failures_remaining = Arc::new(Mutex::new(2u32));
3850        let compensation = FlakyCompensationHook {
3851            calls: calls.clone(),
3852            failures_remaining,
3853        };
3854
3855        let mut bus = Bus::new();
3856        bus.insert(PersistenceHandle::from_arc(store_dyn));
3857        bus.insert(PersistenceTraceId::new("trace-retry-success"));
3858        bus.insert(CompensationHandle::from_hook(compensation));
3859        bus.insert(CompensationRetryPolicy {
3860            max_attempts: 3,
3861            backoff_ms: 0,
3862        });
3863
3864        let axon = Axon::<i32, i32, String>::start("CompensationRetry").then(AlwaysFault);
3865        let outcome = axon.execute(7, &(), &mut bus).await;
3866        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3867
3868        let persisted = store_impl
3869            .load("trace-retry-success")
3870            .await
3871            .unwrap()
3872            .unwrap();
3873        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3874        assert_eq!(
3875            persisted.events.last().map(|e| e.outcome_kind.as_str()),
3876            Some("Compensated")
3877        );
3878
3879        let attempt_count = calls.lock().await;
3880        assert_eq!(*attempt_count, 3);
3881    }
3882
3883    #[tokio::test]
3884    async fn compensation_idempotency_skips_duplicate_hook_execution() {
3885        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3886        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3887        let calls = Arc::new(Mutex::new(Vec::new()));
3888        let compensation = RecordingCompensationHook {
3889            calls: calls.clone(),
3890            should_fail: false,
3891        };
3892        let idempotency = InMemoryCompensationIdempotencyStore::new();
3893
3894        let mut bus = Bus::new();
3895        bus.insert(PersistenceHandle::from_arc(store_dyn));
3896        bus.insert(PersistenceTraceId::new("trace-idempotent"));
3897        bus.insert(PersistenceAutoComplete(false));
3898        bus.insert(CompensationHandle::from_hook(compensation));
3899        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3900
3901        let axon = Axon::<i32, i32, String>::start("CompensationIdempotency").then(AlwaysFault);
3902
3903        let outcome1 = axon.execute(7, &(), &mut bus).await;
3904        let outcome2 = axon.execute(8, &(), &mut bus).await;
3905        assert!(matches!(outcome1, Outcome::Fault(msg) if msg == "boom"));
3906        assert!(matches!(outcome2, Outcome::Fault(msg) if msg == "boom"));
3907
3908        let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
3909        assert_eq!(persisted.completion, None);
3910        // Verify that "Compensated" events are present for both executions
3911        let compensated_count = persisted
3912            .events
3913            .iter()
3914            .filter(|e| e.outcome_kind == "Compensated")
3915            .count();
3916        assert_eq!(
3917            compensated_count, 2,
3918            "Should have 2 Compensated events (one per execution)"
3919        );
3920
3921        let recorded = calls.lock().await;
3922        assert_eq!(recorded.len(), 1);
3923    }
3924
3925    #[tokio::test]
3926    async fn compensation_idempotency_store_failure_does_not_block_compensation() {
3927        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3928        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3929        let calls = Arc::new(Mutex::new(Vec::new()));
3930        let read_calls = Arc::new(Mutex::new(0u32));
3931        let write_calls = Arc::new(Mutex::new(0u32));
3932        let compensation = RecordingCompensationHook {
3933            calls: calls.clone(),
3934            should_fail: false,
3935        };
3936        let idempotency = FailingCompensationIdempotencyStore {
3937            read_calls: read_calls.clone(),
3938            write_calls: write_calls.clone(),
3939        };
3940
3941        let mut bus = Bus::new();
3942        bus.insert(PersistenceHandle::from_arc(store_dyn));
3943        bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
3944        bus.insert(CompensationHandle::from_hook(compensation));
3945        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3946
3947        let axon = Axon::<i32, i32, String>::start("IdempotencyStoreFailure").then(AlwaysFault);
3948        let outcome = axon.execute(9, &(), &mut bus).await;
3949        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3950
3951        let persisted = store_impl
3952            .load("trace-idempotency-store-failure")
3953            .await
3954            .unwrap()
3955            .unwrap();
3956        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3957        assert_eq!(
3958            persisted.events.last().map(|e| e.outcome_kind.as_str()),
3959            Some("Compensated")
3960        );
3961
3962        let recorded = calls.lock().await;
3963        assert_eq!(recorded.len(), 1);
3964        assert_eq!(*read_calls.lock().await, 1);
3965        assert_eq!(*write_calls.lock().await, 1);
3966    }
3967
3968    #[tokio::test]
3969    async fn transition_bus_policy_blocks_unauthorized_resource_access() {
3970        let mut bus = Bus::new();
3971        bus.insert(1_i32);
3972        bus.insert("secret".to_string());
3973
3974        let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
3975        let outcome = axon.execute((), &(), &mut bus).await;
3976
3977        match outcome {
3978            Outcome::Fault(msg) => {
3979                assert!(msg.contains("Bus access denied"), "{msg}");
3980                assert!(msg.contains("CapabilityGuarded"), "{msg}");
3981                assert!(msg.contains("alloc::string::String"), "{msg}");
3982            }
3983            other => panic!("expected fault, got {other:?}"),
3984        }
3985    }
3986
3987    #[tokio::test]
3988    async fn execute_fails_on_version_mismatch_without_migration() {
3989        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3990        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3991
3992        let trace_id = "v-mismatch";
3993        // Create an existing trace with an older version
3994        let old_envelope = crate::persistence::PersistenceEnvelope {
3995            trace_id: trace_id.to_string(),
3996            circuit: "TestCircuit".to_string(),
3997            schematic_version: "0.9".to_string(),
3998            step: 0,
3999            node_id: None,
4000            outcome_kind: "Enter".to_string(),
4001            timestamp_ms: 0,
4002            payload_hash: None,
4003            payload: None,
4004        };
4005        store_impl.append(old_envelope).await.unwrap();
4006
4007        let mut bus = Bus::new();
4008        bus.insert(PersistenceHandle::from_arc(store_dyn));
4009        bus.insert(PersistenceTraceId::new(trace_id));
4010
4011        // Current axon is version 1.0
4012        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
4013        let outcome = axon.execute(10, &(), &mut bus).await;
4014
4015        if let Outcome::Emit(kind, _) = outcome {
4016            assert_eq!(kind, "execution.resumption.version_mismatch_failed");
4017        } else {
4018            panic!("Expected version mismatch emission, got {:?}", outcome);
4019        }
4020    }
4021
4022    #[tokio::test]
4023    async fn execute_resumes_from_start_on_migration_strategy() {
4024        let store_impl = Arc::new(InMemoryPersistenceStore::new());
4025        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
4026
4027        let trace_id = "v-migration";
4028        // Create an existing trace with an older version at step 5
4029        let old_envelope = crate::persistence::PersistenceEnvelope {
4030            trace_id: trace_id.to_string(),
4031            circuit: "TestCircuit".to_string(),
4032            schematic_version: "0.9".to_string(),
4033            step: 5,
4034            node_id: None,
4035            outcome_kind: "Next".to_string(),
4036            timestamp_ms: 0,
4037            payload_hash: None,
4038            payload: None,
4039        };
4040        store_impl.append(old_envelope).await.unwrap();
4041
4042        let mut registry = ranvier_core::schematic::MigrationRegistry::new("TestCircuit");
4043        registry.register(ranvier_core::schematic::SnapshotMigration {
4044            name: Some("v0.9 to v1.0".to_string()),
4045            from_version: "0.9".to_string(),
4046            to_version: "1.0".to_string(),
4047            default_strategy: ranvier_core::schematic::MigrationStrategy::ResumeFromStart,
4048            node_mapping: std::collections::HashMap::new(),
4049            payload_mapper: None,
4050        });
4051
4052        let mut bus = Bus::new();
4053        bus.insert(PersistenceHandle::from_arc(store_dyn));
4054        bus.insert(PersistenceTraceId::new(trace_id));
4055        bus.insert(registry);
4056
4057        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
4058        let outcome = axon.execute(10, &(), &mut bus).await;
4059
4060        // Should have resumed from start (step 0), resulting in 11
4061        assert!(matches!(outcome, Outcome::Next(11)));
4062
4063        // Verify new event has current version
4064        let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
4065        assert_eq!(persisted.schematic_version, "1.0");
4066    }
4067
4068    #[tokio::test]
4069    async fn execute_applies_manual_intervention_jump_and_payload() {
4070        let store_impl = Arc::new(InMemoryPersistenceStore::new());
4071        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
4072
4073        let trace_id = "intervention-test";
4074        // 1. Run a normal trace part-way
4075        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit")
4076            .then(AddOne)
4077            .then(AddOne);
4078
4079        let mut bus = Bus::new();
4080        bus.insert(PersistenceHandle::from_arc(store_dyn));
4081        bus.insert(PersistenceTraceId::new(trace_id));
4082
4083        // Save an intervention: Jump to the second 'AddOne' node (which has the label 'AddOne')
4084        // with a payload override of 100.
4085        // The first node is 'AddOne', the second is ALSO 'AddOne'.
4086        // Schematic position: 0=Ingress, 1=AddOne, 2=AddOne
4087        let _target_node_label = "AddOne";
4088        // To be precise, let's find the ID of the second node.
4089        let target_node_id = axon.schematic.nodes[2].id.clone();
4090
4091        // Pre-seed an initial trace entry so save_intervention doesn't fail
4092        store_impl
4093            .append(crate::persistence::PersistenceEnvelope {
4094                trace_id: trace_id.to_string(),
4095                circuit: "TestCircuit".to_string(),
4096                schematic_version: "1.0".to_string(),
4097                step: 0,
4098                node_id: None,
4099                outcome_kind: "Enter".to_string(),
4100                timestamp_ms: 0,
4101                payload_hash: None,
4102                payload: None,
4103            })
4104            .await
4105            .unwrap();
4106
4107        store_impl
4108            .save_intervention(
4109                trace_id,
4110                crate::persistence::Intervention {
4111                    target_node: target_node_id.clone(),
4112                    payload_override: Some(serde_json::json!(100)),
4113                    timestamp_ms: 0,
4114                },
4115            )
4116            .await
4117            .unwrap();
4118
4119        // 2. Execute. It should skip the first AddOne and use 100 for the second AddOne.
4120        // Result should be 100 + 1 = 101.
4121        let outcome = axon.execute(10, &(), &mut bus).await;
4122
4123        match outcome {
4124            Outcome::Next(val) => assert_eq!(val, 101, "Should have used payload 100 and added 1"),
4125            other => panic!("Expected Outcome::Next(101), got {:?}", other),
4126        }
4127
4128        // Verify the jump was logged in trace
4129        let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
4130        // The last event should be from the jump target's execution.
4131        assert_eq!(persisted.interventions.len(), 1);
4132        assert_eq!(persisted.interventions[0].target_node, target_node_id);
4133    }
4134
4135    // ── DLQ Retry Tests ──────────────────────────────────────────────
4136
4137    /// A transition that fails a configurable number of times before succeeding.
4138    #[derive(Clone)]
4139    struct FailNThenSucceed {
4140        remaining: Arc<tokio::sync::Mutex<u32>>,
4141    }
4142
4143    #[async_trait]
4144    impl Transition<i32, i32> for FailNThenSucceed {
4145        type Error = String;
4146        type Resources = ();
4147
4148        async fn run(
4149            &self,
4150            state: i32,
4151            _resources: &Self::Resources,
4152            _bus: &mut Bus,
4153        ) -> Outcome<i32, Self::Error> {
4154            let mut rem = self.remaining.lock().await;
4155            if *rem > 0 {
4156                *rem -= 1;
4157                Outcome::Fault("transient failure".to_string())
4158            } else {
4159                Outcome::Next(state + 1)
4160            }
4161        }
4162    }
4163
4164    /// A mock DLQ sink that records all dead letters.
4165    #[derive(Clone)]
4166    struct MockDlqSink {
4167        letters: Arc<tokio::sync::Mutex<Vec<String>>>,
4168    }
4169
4170    #[async_trait]
4171    impl DlqSink for MockDlqSink {
4172        async fn store_dead_letter(
4173            &self,
4174            workflow_id: &str,
4175            _circuit_label: &str,
4176            node_id: &str,
4177            error_msg: &str,
4178            _payload: &[u8],
4179        ) -> Result<(), String> {
4180            let entry = format!("{}:{}:{}", workflow_id, node_id, error_msg);
4181            self.letters.lock().await.push(entry);
4182            Ok(())
4183        }
4184    }
4185
4186    #[tokio::test]
4187    async fn retry_then_dlq_retries_and_succeeds_before_exhaustion() {
4188        // Fail 2 times, succeed on 3rd attempt. Policy allows 5 attempts.
4189        let remaining = Arc::new(tokio::sync::Mutex::new(2u32));
4190        let trans = FailNThenSucceed { remaining };
4191
4192        let dlq_sink = MockDlqSink {
4193            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4194        };
4195
4196        let mut bus = Bus::new();
4197        bus.insert(Timeline::new());
4198
4199        let axon = Axon::<i32, i32, String>::start("RetrySucceed")
4200            .then(trans)
4201            .with_dlq_policy(DlqPolicy::RetryThenDlq {
4202                max_attempts: 5,
4203                backoff_ms: 1,
4204            })
4205            .with_dlq_sink(dlq_sink.clone());
4206        let outcome = axon.execute(10, &(), &mut bus).await;
4207
4208        // Should succeed (10 + 1 = 11)
4209        assert!(
4210            matches!(outcome, Outcome::Next(11)),
4211            "Expected Next(11), got {:?}",
4212            outcome
4213        );
4214
4215        // No dead letters since it recovered
4216        let letters = dlq_sink.letters.lock().await;
4217        assert!(
4218            letters.is_empty(),
4219            "Should have 0 dead letters, got {}",
4220            letters.len()
4221        );
4222
4223        // Timeline should contain NodeRetry events
4224        let timeline = bus.read::<Timeline>().unwrap();
4225        let retry_count = timeline
4226            .events
4227            .iter()
4228            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4229            .count();
4230        assert_eq!(retry_count, 2, "Should have 2 retry events");
4231    }
4232
4233    #[tokio::test]
4234    async fn retry_then_dlq_exhausts_retries_and_sends_to_dlq() {
4235        // Always fails. Policy allows 3 attempts (1 initial + 2 retries).
4236        let mut bus = Bus::new();
4237        bus.insert(Timeline::new());
4238
4239        let dlq_sink = MockDlqSink {
4240            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4241        };
4242
4243        let axon = Axon::<i32, i32, String>::start("RetryExhaust")
4244            .then(AlwaysFault)
4245            .with_dlq_policy(DlqPolicy::RetryThenDlq {
4246                max_attempts: 3,
4247                backoff_ms: 1,
4248            })
4249            .with_dlq_sink(dlq_sink.clone());
4250        let outcome = axon.execute(42, &(), &mut bus).await;
4251
4252        assert!(
4253            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4254            "Expected Fault(boom), got {:?}",
4255            outcome
4256        );
4257
4258        // Should have exactly 1 dead letter
4259        let letters = dlq_sink.letters.lock().await;
4260        assert_eq!(letters.len(), 1, "Should have 1 dead letter");
4261
4262        // Timeline should have 2 retry events and 1 DlqExhausted event
4263        let timeline = bus.read::<Timeline>().unwrap();
4264        let retry_count = timeline
4265            .events
4266            .iter()
4267            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4268            .count();
4269        let dlq_count = timeline
4270            .events
4271            .iter()
4272            .filter(|e| matches!(e, TimelineEvent::DlqExhausted { .. }))
4273            .count();
4274        assert_eq!(
4275            retry_count, 2,
4276            "Should have 2 retry events (attempts 2 and 3)"
4277        );
4278        assert_eq!(dlq_count, 1, "Should have 1 DlqExhausted event");
4279    }
4280
4281    #[tokio::test]
4282    async fn send_to_dlq_policy_sends_immediately_without_retry() {
4283        let mut bus = Bus::new();
4284        bus.insert(Timeline::new());
4285
4286        let dlq_sink = MockDlqSink {
4287            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4288        };
4289
4290        let axon = Axon::<i32, i32, String>::start("SendDlq")
4291            .then(AlwaysFault)
4292            .with_dlq_policy(DlqPolicy::SendToDlq)
4293            .with_dlq_sink(dlq_sink.clone());
4294        let outcome = axon.execute(1, &(), &mut bus).await;
4295
4296        assert!(matches!(outcome, Outcome::Fault(_)));
4297
4298        // Should have exactly 1 dead letter (immediate, no retries)
4299        let letters = dlq_sink.letters.lock().await;
4300        assert_eq!(letters.len(), 1);
4301
4302        // No retry or DlqExhausted events
4303        let timeline = bus.read::<Timeline>().unwrap();
4304        let retry_count = timeline
4305            .events
4306            .iter()
4307            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4308            .count();
4309        assert_eq!(retry_count, 0);
4310    }
4311
4312    #[tokio::test]
4313    async fn drop_policy_does_not_send_to_dlq() {
4314        let mut bus = Bus::new();
4315
4316        let dlq_sink = MockDlqSink {
4317            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4318        };
4319
4320        let axon = Axon::<i32, i32, String>::start("DropDlq")
4321            .then(AlwaysFault)
4322            .with_dlq_policy(DlqPolicy::Drop)
4323            .with_dlq_sink(dlq_sink.clone());
4324        let outcome = axon.execute(1, &(), &mut bus).await;
4325
4326        assert!(matches!(outcome, Outcome::Fault(_)));
4327
4328        // No dead letters
4329        let letters = dlq_sink.letters.lock().await;
4330        assert!(letters.is_empty());
4331    }
4332
4333    #[tokio::test]
4334    async fn dynamic_policy_hot_reload_changes_dlq_behavior() {
4335        use ranvier_core::policy::DynamicPolicy;
4336
4337        // Start with Drop policy (no DLQ)
4338        let (tx, dynamic) = DynamicPolicy::new(DlqPolicy::Drop);
4339        let dlq_sink = MockDlqSink {
4340            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4341        };
4342
4343        let axon = Axon::<i32, i32, String>::start("DynamicDlq")
4344            .then(AlwaysFault)
4345            .with_dynamic_dlq_policy(dynamic)
4346            .with_dlq_sink(dlq_sink.clone());
4347
4348        // First execution: Drop policy → no dead letters
4349        let mut bus = Bus::new();
4350        let outcome = axon.execute(1, &(), &mut bus).await;
4351        assert!(matches!(outcome, Outcome::Fault(_)));
4352        assert!(
4353            dlq_sink.letters.lock().await.is_empty(),
4354            "Drop policy should produce no DLQ entries"
4355        );
4356
4357        // Hot-reload: switch to SendToDlq
4358        tx.send(DlqPolicy::SendToDlq).unwrap();
4359
4360        // Second execution: SendToDlq policy → dead letter captured
4361        let mut bus2 = Bus::new();
4362        let outcome2 = axon.execute(2, &(), &mut bus2).await;
4363        assert!(matches!(outcome2, Outcome::Fault(_)));
4364        assert_eq!(
4365            dlq_sink.letters.lock().await.len(),
4366            1,
4367            "SendToDlq policy should produce 1 DLQ entry"
4368        );
4369    }
4370
4371    #[tokio::test]
4372    async fn dynamic_saga_policy_hot_reload() {
4373        use ranvier_core::policy::DynamicPolicy;
4374        use ranvier_core::saga::SagaPolicy;
4375
4376        // Start with Disabled saga
4377        let (tx, dynamic) = DynamicPolicy::new(SagaPolicy::Disabled);
4378
4379        let axon = Axon::<i32, i32, TestInfallible>::start("DynamicSaga")
4380            .then(AddOne)
4381            .with_dynamic_saga_policy(dynamic);
4382
4383        // First execution: Disabled → no SagaStack in bus
4384        let mut bus = Bus::new();
4385        let _outcome = axon.execute(1, &(), &mut bus).await;
4386        assert!(
4387            bus.read::<SagaStack>().is_none() || bus.read::<SagaStack>().unwrap().is_empty(),
4388            "SagaStack should be absent or empty when disabled"
4389        );
4390
4391        // Hot-reload: enable saga
4392        tx.send(SagaPolicy::Enabled).unwrap();
4393
4394        // Second execution: Enabled → SagaStack populated
4395        let mut bus2 = Bus::new();
4396        let _outcome2 = axon.execute(10, &(), &mut bus2).await;
4397        assert!(
4398            bus2.read::<SagaStack>().is_some(),
4399            "SagaStack should exist when saga is enabled"
4400        );
4401    }
4402
4403    // ── IAM Boundary Tests ──────────────────────────────────────
4404
4405    mod iam_tests {
4406        use super::*;
4407        use ranvier_core::iam::{IamError, IamIdentity, IamPolicy, IamToken, IamVerifier};
4408
4409        /// Mock IamVerifier that returns a fixed identity.
4410        #[derive(Clone)]
4411        struct MockVerifier {
4412            identity: IamIdentity,
4413            should_fail: bool,
4414        }
4415
4416        #[async_trait]
4417        impl IamVerifier for MockVerifier {
4418            async fn verify(&self, _token: &str) -> Result<IamIdentity, IamError> {
4419                if self.should_fail {
4420                    Err(IamError::InvalidToken("mock verification failure".into()))
4421                } else {
4422                    Ok(self.identity.clone())
4423                }
4424            }
4425        }
4426
4427        #[tokio::test]
4428        async fn iam_require_identity_passes_with_valid_token() {
4429            let verifier = MockVerifier {
4430                identity: IamIdentity::new("alice").with_role("user"),
4431                should_fail: false,
4432            };
4433
4434            let axon = Axon::<i32, i32, TestInfallible>::new("IamTest")
4435                .with_iam(IamPolicy::RequireIdentity, verifier)
4436                .then(AddOne);
4437
4438            let mut bus = Bus::new();
4439            bus.insert(IamToken("valid-token".to_string()));
4440            let outcome = axon.execute(10, &(), &mut bus).await;
4441
4442            assert!(matches!(outcome, Outcome::Next(11)));
4443            // Verify IamIdentity was injected into Bus
4444            let identity = bus
4445                .read::<IamIdentity>()
4446                .expect("IamIdentity should be in Bus");
4447            assert_eq!(identity.subject, "alice");
4448        }
4449
4450        #[tokio::test]
4451        async fn iam_require_identity_rejects_missing_token() {
4452            let verifier = MockVerifier {
4453                identity: IamIdentity::new("ignored"),
4454                should_fail: false,
4455            };
4456
4457            let axon = Axon::<i32, i32, TestInfallible>::new("IamNoToken")
4458                .with_iam(IamPolicy::RequireIdentity, verifier)
4459                .then(AddOne);
4460
4461            let mut bus = Bus::new();
4462            // No IamToken inserted
4463            let outcome = axon.execute(10, &(), &mut bus).await;
4464
4465            // Should emit missing_token event
4466            match &outcome {
4467                Outcome::Emit(label, _) => {
4468                    assert_eq!(label, "iam.missing_token");
4469                }
4470                other => panic!("Expected Emit(iam.missing_token), got {:?}", other),
4471            }
4472        }
4473
4474        #[tokio::test]
4475        async fn iam_rejects_failed_verification() {
4476            let verifier = MockVerifier {
4477                identity: IamIdentity::new("ignored"),
4478                should_fail: true,
4479            };
4480
4481            let axon = Axon::<i32, i32, TestInfallible>::new("IamBadToken")
4482                .with_iam(IamPolicy::RequireIdentity, verifier)
4483                .then(AddOne);
4484
4485            let mut bus = Bus::new();
4486            bus.insert(IamToken("bad-token".to_string()));
4487            let outcome = axon.execute(10, &(), &mut bus).await;
4488
4489            match &outcome {
4490                Outcome::Emit(label, _) => {
4491                    assert_eq!(label, "iam.verification_failed");
4492                }
4493                other => panic!("Expected Emit(iam.verification_failed), got {:?}", other),
4494            }
4495        }
4496
4497        #[tokio::test]
4498        async fn iam_require_role_passes_with_matching_role() {
4499            let verifier = MockVerifier {
4500                identity: IamIdentity::new("bob").with_role("admin").with_role("user"),
4501                should_fail: false,
4502            };
4503
4504            let axon = Axon::<i32, i32, TestInfallible>::new("IamRole")
4505                .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
4506                .then(AddOne);
4507
4508            let mut bus = Bus::new();
4509            bus.insert(IamToken("token".to_string()));
4510            let outcome = axon.execute(5, &(), &mut bus).await;
4511
4512            assert!(matches!(outcome, Outcome::Next(6)));
4513        }
4514
4515        #[tokio::test]
4516        async fn iam_require_role_denies_without_role() {
4517            let verifier = MockVerifier {
4518                identity: IamIdentity::new("carol").with_role("user"),
4519                should_fail: false,
4520            };
4521
4522            let axon = Axon::<i32, i32, TestInfallible>::new("IamRoleDeny")
4523                .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
4524                .then(AddOne);
4525
4526            let mut bus = Bus::new();
4527            bus.insert(IamToken("token".to_string()));
4528            let outcome = axon.execute(5, &(), &mut bus).await;
4529
4530            match &outcome {
4531                Outcome::Emit(label, _) => {
4532                    assert_eq!(label, "iam.policy_denied");
4533                }
4534                other => panic!("Expected Emit(iam.policy_denied), got {:?}", other),
4535            }
4536        }
4537
4538        #[tokio::test]
4539        async fn iam_policy_none_skips_verification() {
4540            let verifier = MockVerifier {
4541                identity: IamIdentity::new("ignored"),
4542                should_fail: true, // would fail if actually called
4543            };
4544
4545            let axon = Axon::<i32, i32, TestInfallible>::new("IamNone")
4546                .with_iam(IamPolicy::None, verifier)
4547                .then(AddOne);
4548
4549            let mut bus = Bus::new();
4550            // No token needed when policy is None
4551            let outcome = axon.execute(10, &(), &mut bus).await;
4552
4553            assert!(matches!(outcome, Outcome::Next(11)));
4554        }
4555    }
4556
4557    // ── Schema Propagation Tests (M201-RQ9, RQ12) ──────────────────
4558
4559    #[derive(Clone)]
4560    struct SchemaTransition;
4561
4562    #[async_trait]
4563    impl Transition<String, String> for SchemaTransition {
4564        type Error = String;
4565        type Resources = ();
4566
4567        fn input_schema(&self) -> Option<serde_json::Value> {
4568            Some(serde_json::json!({
4569                "type": "object",
4570                "required": ["name"],
4571                "properties": {
4572                    "name": { "type": "string" }
4573                }
4574            }))
4575        }
4576
4577        async fn run(
4578            &self,
4579            state: String,
4580            _resources: &Self::Resources,
4581            _bus: &mut Bus,
4582        ) -> Outcome<String, Self::Error> {
4583            Outcome::Next(state)
4584        }
4585    }
4586
4587    #[test]
4588    fn then_auto_populates_input_schema_from_transition() {
4589        let axon = Axon::<String, String, String>::new("SchemaTest").then(SchemaTransition);
4590
4591        // The last node (added by .then()) should have input_schema
4592        let last_node = axon.schematic.nodes.last().unwrap();
4593        assert!(last_node.input_schema.is_some());
4594        let schema = last_node.input_schema.as_ref().unwrap();
4595        assert_eq!(schema["type"], "object");
4596        assert_eq!(schema["required"][0], "name");
4597    }
4598
4599    #[test]
4600    fn then_leaves_input_schema_none_when_not_provided() {
4601        let axon = Axon::<i32, i32, TestInfallible>::new("NoSchema").then(AddOne);
4602
4603        let last_node = axon.schematic.nodes.last().unwrap();
4604        assert!(last_node.input_schema.is_none());
4605    }
4606
4607    #[test]
4608    fn with_input_schema_value_sets_on_last_node() {
4609        let schema = serde_json::json!({"type": "integer"});
4610        let axon = Axon::<i32, i32, TestInfallible>::new("ManualSchema")
4611            .then(AddOne)
4612            .with_input_schema_value(schema.clone());
4613
4614        let last_node = axon.schematic.nodes.last().unwrap();
4615        assert_eq!(last_node.input_schema.as_ref().unwrap(), &schema);
4616    }
4617
4618    #[test]
4619    fn with_output_schema_value_sets_on_last_node() {
4620        let schema = serde_json::json!({"type": "integer"});
4621        let axon = Axon::<i32, i32, TestInfallible>::new("OutputSchema")
4622            .then(AddOne)
4623            .with_output_schema_value(schema.clone());
4624
4625        let last_node = axon.schematic.nodes.last().unwrap();
4626        assert_eq!(last_node.output_schema.as_ref().unwrap(), &schema);
4627    }
4628
4629    #[test]
4630    fn schematic_export_includes_schema_fields() {
4631        let axon = Axon::<String, String, String>::new("ExportTest")
4632            .then(SchemaTransition)
4633            .with_output_schema_value(serde_json::json!({"type": "string"}));
4634
4635        let json = serde_json::to_value(&axon.schematic).unwrap();
4636        let nodes = json["nodes"].as_array().unwrap();
4637        // Find the SchemaTransition node (last one)
4638        let last = nodes.last().unwrap();
4639        assert!(last.get("input_schema").is_some());
4640        assert_eq!(last["input_schema"]["type"], "object");
4641        assert_eq!(last["output_schema"]["type"], "string");
4642    }
4643
4644    #[test]
4645    fn schematic_export_omits_schema_fields_when_none() {
4646        let axon = Axon::<i32, i32, TestInfallible>::new("NoSchemaExport").then(AddOne);
4647
4648        let json = serde_json::to_value(&axon.schematic).unwrap();
4649        let nodes = json["nodes"].as_array().unwrap();
4650        let last = nodes.last().unwrap();
4651        let obj = last.as_object().unwrap();
4652        assert!(!obj.contains_key("input_schema"));
4653        assert!(!obj.contains_key("output_schema"));
4654    }
4655
4656    #[test]
4657    fn schematic_json_roundtrip_preserves_schemas() {
4658        let axon = Axon::<String, String, String>::new("Roundtrip")
4659            .then(SchemaTransition)
4660            .with_output_schema_value(serde_json::json!({"type": "string"}));
4661
4662        let json_str = serde_json::to_string(&axon.schematic).unwrap();
4663        let deserialized: ranvier_core::schematic::Schematic =
4664            serde_json::from_str(&json_str).unwrap();
4665
4666        let last = deserialized.nodes.last().unwrap();
4667        assert!(last.input_schema.is_some());
4668        assert!(last.output_schema.is_some());
4669        assert_eq!(last.input_schema.as_ref().unwrap()["required"][0], "name");
4670        assert_eq!(last.output_schema.as_ref().unwrap()["type"], "string");
4671    }
4672
4673    // Test transitions for new unit tests
4674    #[derive(Clone)]
4675    struct MultiplyByTwo;
4676
4677    #[async_trait]
4678    impl Transition<i32, i32> for MultiplyByTwo {
4679        type Error = TestInfallible;
4680        type Resources = ();
4681
4682        async fn run(
4683            &self,
4684            state: i32,
4685            _resources: &Self::Resources,
4686            _bus: &mut Bus,
4687        ) -> Outcome<i32, Self::Error> {
4688            Outcome::Next(state * 2)
4689        }
4690    }
4691
4692    #[derive(Clone)]
4693    struct AddTen;
4694
4695    #[async_trait]
4696    impl Transition<i32, i32> for AddTen {
4697        type Error = TestInfallible;
4698        type Resources = ();
4699
4700        async fn run(
4701            &self,
4702            state: i32,
4703            _resources: &Self::Resources,
4704            _bus: &mut Bus,
4705        ) -> Outcome<i32, Self::Error> {
4706            Outcome::Next(state + 10)
4707        }
4708    }
4709
4710    #[derive(Clone)]
4711    struct AddOneString;
4712
4713    #[async_trait]
4714    impl Transition<i32, i32> for AddOneString {
4715        type Error = String;
4716        type Resources = ();
4717
4718        async fn run(
4719            &self,
4720            state: i32,
4721            _resources: &Self::Resources,
4722            _bus: &mut Bus,
4723        ) -> Outcome<i32, Self::Error> {
4724            Outcome::Next(state + 1)
4725        }
4726    }
4727
4728    #[derive(Clone)]
4729    struct AddTenString;
4730
4731    #[async_trait]
4732    impl Transition<i32, i32> for AddTenString {
4733        type Error = String;
4734        type Resources = ();
4735
4736        async fn run(
4737            &self,
4738            state: i32,
4739            _resources: &Self::Resources,
4740            _bus: &mut Bus,
4741        ) -> Outcome<i32, Self::Error> {
4742            Outcome::Next(state + 10)
4743        }
4744    }
4745
4746    #[tokio::test]
4747    async fn axon_single_step_chain_executes_and_returns_next() {
4748        let mut bus = Bus::new();
4749        let axon = Axon::<i32, i32, TestInfallible>::start("SingleStep").then(AddOne);
4750
4751        let outcome = axon.execute(5, &(), &mut bus).await;
4752        assert!(matches!(outcome, Outcome::Next(6)));
4753    }
4754
4755    #[tokio::test]
4756    async fn axon_three_step_chain_executes_in_order() {
4757        let mut bus = Bus::new();
4758        let axon = Axon::<i32, i32, TestInfallible>::start("ThreeStep")
4759            .then(AddOne)
4760            .then(MultiplyByTwo)
4761            .then(AddTen);
4762
4763        // Starting with 5: AddOne -> 6, MultiplyByTwo -> 12, AddTen -> 22
4764        let outcome = axon.execute(5, &(), &mut bus).await;
4765        assert!(matches!(outcome, Outcome::Next(22)));
4766    }
4767
4768    #[tokio::test]
4769    async fn axon_with_fault_in_middle_step_propagates_error() {
4770        let mut bus = Bus::new();
4771
4772        // Create a 3-step chain where the middle step faults
4773        // Step 1: AddOneString (5 -> 6)
4774        // Step 2: AlwaysFault (should fault here)
4775        // Step 3: AddTenString (never reached)
4776        let axon = Axon::<i32, i32, String>::start("FaultInMiddle")
4777            .then(AddOneString)
4778            .then(AlwaysFault)
4779            .then(AddTenString);
4780
4781        let outcome = axon.execute(5, &(), &mut bus).await;
4782        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
4783    }
4784
4785    #[test]
4786    fn axon_schematic_has_correct_node_count_after_chaining() {
4787        let axon = Axon::<i32, i32, TestInfallible>::start("NodeCount")
4788            .then(AddOne)
4789            .then(MultiplyByTwo)
4790            .then(AddTen);
4791
4792        // Should have 4 nodes: ingress + 3 transitions
4793        assert_eq!(axon.schematic.nodes.len(), 4);
4794        assert_eq!(axon.schematic.name, "NodeCount");
4795    }
4796
4797    #[tokio::test]
4798    async fn axon_execution_records_timeline_events() {
4799        let mut bus = Bus::new();
4800        bus.insert(Timeline::new());
4801
4802        let axon = Axon::<i32, i32, TestInfallible>::start("TimelineTest")
4803            .then(AddOne)
4804            .then(MultiplyByTwo);
4805
4806        let outcome = axon.execute(3, &(), &mut bus).await;
4807        assert!(matches!(outcome, Outcome::Next(8))); // (3 + 1) * 2 = 8
4808
4809        let timeline = bus.read::<Timeline>().unwrap();
4810
4811        // Should have NodeEnter and NodeExit events
4812        let enter_count = timeline
4813            .events
4814            .iter()
4815            .filter(|e| matches!(e, TimelineEvent::NodeEnter { .. }))
4816            .count();
4817        let exit_count = timeline
4818            .events
4819            .iter()
4820            .filter(|e| matches!(e, TimelineEvent::NodeExit { .. }))
4821            .count();
4822
4823        // Expect at least 1 enter and 1 exit (ingress node)
4824        assert!(enter_count >= 1, "Should have at least 1 NodeEnter event");
4825        assert!(exit_count >= 1, "Should have at least 1 NodeExit event");
4826    }
4827
4828    // ── Parallel Step Tests (M231) ───────────────────────────────
4829
4830    #[tokio::test]
4831    async fn parallel_all_succeed_returns_first_next() {
4832        use super::ParallelStrategy;
4833
4834        let mut bus = Bus::new();
4835        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelAllSucceed")
4836            .parallel(
4837                vec![
4838                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4839                    Arc::new(MultiplyByTwo),
4840                ],
4841                ParallelStrategy::AllMustSucceed,
4842            );
4843
4844        // Input 5: AddOne -> 6, MultiplyByTwo -> 10.
4845        // AllMustSucceed returns the first Next (AddOne = 6).
4846        let outcome = axon.execute(5, &(), &mut bus).await;
4847        assert!(matches!(outcome, Outcome::Next(6)));
4848    }
4849
4850    #[tokio::test]
4851    async fn parallel_all_must_succeed_returns_fault_when_any_fails() {
4852        use super::ParallelStrategy;
4853
4854        let mut bus = Bus::new();
4855        let axon = Axon::<i32, i32, String>::start("ParallelAllFault")
4856            .parallel(
4857                vec![
4858                    Arc::new(AddOneString) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4859                    Arc::new(AlwaysFault),
4860                ],
4861                ParallelStrategy::AllMustSucceed,
4862            );
4863
4864        let outcome = axon.execute(5, &(), &mut bus).await;
4865        assert!(
4866            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4867            "Expected Fault(boom), got {:?}",
4868            outcome
4869        );
4870    }
4871
4872    #[tokio::test]
4873    async fn parallel_any_can_fail_returns_success_despite_fault() {
4874        use super::ParallelStrategy;
4875
4876        let mut bus = Bus::new();
4877        let axon = Axon::<i32, i32, String>::start("ParallelAnyCanFail")
4878            .parallel(
4879                vec![
4880                    Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4881                    Arc::new(AddOneString),
4882                ],
4883                ParallelStrategy::AnyCanFail,
4884            );
4885
4886        // AlwaysFault faults, but AddOneString succeeds (5 + 1 = 6).
4887        let outcome = axon.execute(5, &(), &mut bus).await;
4888        assert!(
4889            matches!(outcome, Outcome::Next(6)),
4890            "Expected Next(6), got {:?}",
4891            outcome
4892        );
4893    }
4894
4895    #[tokio::test]
4896    async fn parallel_any_can_fail_all_fault_returns_first_fault() {
4897        use super::ParallelStrategy;
4898
4899        #[derive(Clone)]
4900        struct AlwaysFault2;
4901        #[async_trait]
4902        impl Transition<i32, i32> for AlwaysFault2 {
4903            type Error = String;
4904            type Resources = ();
4905            async fn run(&self, _state: i32, _resources: &(), _bus: &mut Bus) -> Outcome<i32, String> {
4906                Outcome::Fault("boom2".to_string())
4907            }
4908        }
4909
4910        let mut bus = Bus::new();
4911        let axon = Axon::<i32, i32, String>::start("ParallelAllFault2")
4912            .parallel(
4913                vec![
4914                    Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4915                    Arc::new(AlwaysFault2),
4916                ],
4917                ParallelStrategy::AnyCanFail,
4918            );
4919
4920        let outcome = axon.execute(5, &(), &mut bus).await;
4921        // Should return the first fault
4922        assert!(
4923            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4924            "Expected Fault(boom), got {:?}",
4925            outcome
4926        );
4927    }
4928
4929    #[test]
4930    fn parallel_schematic_has_fanout_fanin_nodes() {
4931        use super::ParallelStrategy;
4932        use ranvier_core::schematic::{EdgeType, NodeKind};
4933
4934        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelSchematic")
4935            .parallel(
4936                vec![
4937                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4938                    Arc::new(MultiplyByTwo),
4939                    Arc::new(AddTen),
4940                ],
4941                ParallelStrategy::AllMustSucceed,
4942            );
4943
4944        // Should have: Ingress + FanOut + 3 branch Atoms + FanIn = 6 nodes
4945        assert_eq!(axon.schematic.nodes.len(), 6);
4946        assert!(matches!(axon.schematic.nodes[1].kind, NodeKind::FanOut));
4947        assert!(matches!(axon.schematic.nodes[2].kind, NodeKind::Atom));
4948        assert!(matches!(axon.schematic.nodes[3].kind, NodeKind::Atom));
4949        assert!(matches!(axon.schematic.nodes[4].kind, NodeKind::Atom));
4950        assert!(matches!(axon.schematic.nodes[5].kind, NodeKind::FanIn));
4951
4952        // Check FanOut description
4953        assert!(axon.schematic.nodes[1]
4954            .description
4955            .as_ref()
4956            .unwrap()
4957            .contains("3 branches"));
4958
4959        // Check parallel edges from FanOut to branches
4960        let parallel_edges: Vec<_> = axon
4961            .schematic
4962            .edges
4963            .iter()
4964            .filter(|e| matches!(e.kind, EdgeType::Parallel))
4965            .collect();
4966        // 3 from FanOut->branches + 3 from branches->FanIn = 6
4967        assert_eq!(parallel_edges.len(), 6);
4968    }
4969
4970    #[tokio::test]
4971    async fn parallel_then_chain_composes_correctly() {
4972        use super::ParallelStrategy;
4973
4974        let mut bus = Bus::new();
4975        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelThenChain")
4976            .then(AddOne)
4977            .parallel(
4978                vec![
4979                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4980                    Arc::new(MultiplyByTwo),
4981                ],
4982                ParallelStrategy::AllMustSucceed,
4983            )
4984            .then(AddTen);
4985
4986        // 5 -> AddOne -> 6 -> Parallel(AddOne=7, MultiplyByTwo=12) -> first=7 -> AddTen -> 17
4987        let outcome = axon.execute(5, &(), &mut bus).await;
4988        assert!(
4989            matches!(outcome, Outcome::Next(17)),
4990            "Expected Next(17), got {:?}",
4991            outcome
4992        );
4993    }
4994
4995    #[tokio::test]
4996    async fn parallel_records_timeline_events() {
4997        use super::ParallelStrategy;
4998        use ranvier_core::timeline::TimelineEvent;
4999
5000        let mut bus = Bus::new();
5001        bus.insert(Timeline::new());
5002
5003        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelTimeline")
5004            .parallel(
5005                vec![
5006                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
5007                    Arc::new(MultiplyByTwo),
5008                ],
5009                ParallelStrategy::AllMustSucceed,
5010            );
5011
5012        let outcome = axon.execute(3, &(), &mut bus).await;
5013        assert!(matches!(outcome, Outcome::Next(4)));
5014
5015        let timeline = bus.read::<Timeline>().unwrap();
5016
5017        // Check for FanOut enter/exit and FanIn enter/exit
5018        let fanout_enters = timeline
5019            .events
5020            .iter()
5021            .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanOut"))
5022            .count();
5023        let fanin_enters = timeline
5024            .events
5025            .iter()
5026            .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanIn"))
5027            .count();
5028
5029        assert_eq!(fanout_enters, 1, "Should have 1 FanOut enter");
5030        assert_eq!(fanin_enters, 1, "Should have 1 FanIn enter");
5031    }
5032
5033    // ── Axon::simple() convenience constructor ───────────────────────────────
5034
5035    #[derive(Clone)]
5036    struct Greet;
5037
5038    #[async_trait]
5039    impl Transition<(), String> for Greet {
5040        type Error = String;
5041        type Resources = ();
5042
5043        async fn run(
5044            &self,
5045            _state: (),
5046            _resources: &Self::Resources,
5047            _bus: &mut Bus,
5048        ) -> Outcome<String, Self::Error> {
5049            Outcome::Next("Hello from simple!".to_string())
5050        }
5051    }
5052
5053    #[tokio::test]
5054    async fn axon_simple_creates_pipeline() {
5055        let axon = Axon::simple::<String>("SimpleTest").then(Greet);
5056
5057        let mut bus = Bus::new();
5058        let result = axon.execute((), &(), &mut bus).await;
5059
5060        match result {
5061            Outcome::Next(msg) => assert_eq!(msg, "Hello from simple!"),
5062            other => panic!("Expected Outcome::Next, got {:?}", other),
5063        }
5064    }
5065
5066    #[tokio::test]
5067    async fn axon_simple_equivalent_to_explicit() {
5068        // Axon::simple::<E>("label") should behave identically to Axon::<(), (), E>::new("label")
5069        let simple = Axon::simple::<String>("Equiv").then(Greet);
5070        let explicit = Axon::<(), (), String>::new("Equiv").then(Greet);
5071
5072        let mut bus1 = Bus::new();
5073        let mut bus2 = Bus::new();
5074
5075        let r1 = simple.execute((), &(), &mut bus1).await;
5076        let r2 = explicit.execute((), &(), &mut bus2).await;
5077
5078        match (r1, r2) {
5079            (Outcome::Next(a), Outcome::Next(b)) => assert_eq!(a, b),
5080            _ => panic!("Both should produce Outcome::Next"),
5081        }
5082    }
5083}