Skip to main content

ranvier_runtime/
axon.rs

1//! # Axon: Executable Decision Tree
2//!
3//! The `Axon` is the **runtime execution path** of a Typed Decision Tree.
4//! It functions as a reusable Typed Decision flow (Axon<In, Out, E>).
5//!
6//! ## Design Philosophy
7//!
8//! * **Axon flows, Schematic shows**: Axon executes; Schematic describes
9//! * **Builder pattern**: `Axon::start().then().then()`
10//! * **Schematic extraction**: Every Axon carries its structural metadata
11//!
12//! "Axon is the flowing thing, Schematic is the visible thing."
13
14use crate::persistence::{
15    CompensationAutoTrigger, CompensationContext, CompensationHandle,
16    CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17    PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use async_trait::async_trait;
20use ranvier_audit::{AuditEvent, AuditSink};
21use ranvier_core::bus::Bus;
22use ranvier_core::cluster::DistributedLock;
23use ranvier_core::event::{DlqPolicy, DlqSink};
24use ranvier_core::outcome::Outcome;
25use ranvier_core::policy::DynamicPolicy;
26use ranvier_core::saga::{SagaPolicy, SagaStack};
27use ranvier_core::schematic::{
28    BusCapabilitySchema, Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation,
29};
30use ranvier_core::timeline::{Timeline, TimelineEvent};
31use ranvier_core::transition::Transition;
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use std::any::type_name;
35use std::ffi::OsString;
36use std::fs;
37use std::future::Future;
38use std::panic::Location;
39use std::path::{Path, PathBuf};
40use std::pin::Pin;
41use std::sync::{Arc, Mutex, OnceLock};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tracing::Instrument;
44
45/// Configuration for Execution Mode.
46#[derive(Clone)]
47pub enum ExecutionMode {
48    /// Normal, unpartitioned local execution.
49    Local,
50    /// Singleton execution, ensures only one instance runs across the entire cluster.
51    Singleton {
52        lock_key: String,
53        ttl_ms: u64,
54        lock_provider: Arc<dyn DistributedLock>,
55    },
56}
57
58/// Type alias for async boxed futures used in Axon execution.
59pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
60
61/// Executor type for Axon steps.
62/// Now takes an input state `In`, a resource bundle `Res`, and returns an `Outcome<Out, E>`.
63/// Must be Send + Sync to be reusable across threads and clones.
64pub type Executor<In, Out, E, Res> =
65    Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
66
67/// Manual intervention jump command injected into the Bus.
68#[derive(Debug, Clone)]
69pub struct ManualJump {
70    pub target_node: String,
71    pub payload_override: Option<serde_json::Value>,
72}
73
74/// Start step index for resumption, injected into the Bus.
75#[derive(Debug, Clone, Copy)]
76struct StartStep(u64);
77
78/// Persisted state for resumption, injected into the Bus.
79#[derive(Debug, Clone)]
80struct ResumptionState {
81    payload: Option<serde_json::Value>,
82}
83
84/// Helper to extract a readable type name from a type.
85fn type_name_of<T: ?Sized>() -> String {
86    let full = type_name::<T>();
87    full.split("::").last().unwrap_or(full).to_string()
88}
89
90/// The Axon Builder and Runtime.
91///
92/// `Axon` represents an executable decision tree.
93/// It is reusable and thread-safe.
94///
95/// ## Example
96///
97/// ```rust,ignore
98/// use ranvier_core::prelude::*;
99/// // ...
100/// // Start with an identity Axon (In -> In)
101/// let axon = Axon::<(), (), _>::new("My Axon")
102///     .then(StepA)
103///     .then(StepB);
104///
105/// // Execute multiple times
106/// let res1 = axon.execute((), &mut bus1).await;
107/// let res2 = axon.execute((), &mut bus2).await;
108/// ```
109pub struct Axon<In, Out, E, Res = ()> {
110    /// The static structure (for visualization/analysis)
111    pub schematic: Schematic,
112    /// The runtime executor
113    executor: Executor<In, Out, E, Res>,
114    /// How this Axon is executed across the cluster
115    pub execution_mode: ExecutionMode,
116    /// Optional persistence store for state inspection
117    pub persistence_store: Option<Arc<dyn crate::persistence::PersistenceStore>>,
118    /// Optional audit sink for tamper-evident logging of interventions
119    pub audit_sink: Option<Arc<dyn AuditSink>>,
120    /// Optional dead-letter queue sink for storing failed events
121    pub dlq_sink: Option<Arc<dyn DlqSink>>,
122    /// Policy for handling event failures
123    pub dlq_policy: DlqPolicy,
124    /// Optional dynamic (hot-reloadable) DLQ policy — takes precedence over static `dlq_policy`
125    pub dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>,
126    /// Policy for automated saga compensation
127    pub saga_policy: SagaPolicy,
128    /// Optional dynamic (hot-reloadable) Saga policy — takes precedence over static `saga_policy`
129    pub dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>,
130    /// Registry for Saga compensation handlers
131    pub saga_compensation_registry:
132        Arc<std::sync::RwLock<ranvier_core::saga::SagaCompensationRegistry<E, Res>>>,
133    /// Optional IAM handle for identity verification at the Schematic boundary
134    pub iam_handle: Option<ranvier_core::iam::IamHandle>,
135}
136
137/// Schematic export request derived from command-line args/env.
138#[derive(Debug, Clone)]
139pub struct SchematicExportRequest {
140    /// Optional output file path. If omitted, schematic is written to stdout.
141    pub output: Option<PathBuf>,
142}
143
144impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
145    fn clone(&self) -> Self {
146        Self {
147            schematic: self.schematic.clone(),
148            executor: self.executor.clone(),
149            execution_mode: self.execution_mode.clone(),
150            persistence_store: self.persistence_store.clone(),
151            audit_sink: self.audit_sink.clone(),
152            dlq_sink: self.dlq_sink.clone(),
153            dlq_policy: self.dlq_policy.clone(),
154            dynamic_dlq_policy: self.dynamic_dlq_policy.clone(),
155            saga_policy: self.saga_policy.clone(),
156            dynamic_saga_policy: self.dynamic_saga_policy.clone(),
157            saga_compensation_registry: self.saga_compensation_registry.clone(),
158            iam_handle: self.iam_handle.clone(),
159        }
160    }
161}
162
163impl<In, E, Res> Axon<In, In, E, Res>
164where
165    In: Send + Sync + Serialize + DeserializeOwned + 'static,
166    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
167    Res: ranvier_core::transition::ResourceRequirement,
168{
169    /// Create a new Axon flow with the given label.
170    /// This is the preferred entry point per Flat API guidelines.
171    #[track_caller]
172    pub fn new(label: &str) -> Self {
173        let caller = Location::caller();
174        Self::start_with_source(label, caller)
175    }
176
177    /// Start defining a new Axon flow.
178    /// This creates an Identity Axon (In -> In) with no initial resource requirements.
179    #[track_caller]
180    pub fn start(label: &str) -> Self {
181        let caller = Location::caller();
182        Self::start_with_source(label, caller)
183    }
184
185    fn start_with_source(label: &str, caller: &'static Location<'static>) -> Self {
186        let node_id = uuid::Uuid::new_v4().to_string();
187        let node = Node {
188            id: node_id,
189            kind: NodeKind::Ingress,
190            label: label.to_string(),
191            description: None,
192            input_type: "void".to_string(),
193            output_type: type_name_of::<In>(),
194            resource_type: type_name_of::<Res>(),
195            metadata: Default::default(),
196            bus_capability: None,
197            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
198            position: None,
199            compensation_node_id: None,
200            input_schema: None,
201            output_schema: None,
202        };
203
204        let mut schematic = Schematic::new(label);
205        schematic.nodes.push(node);
206
207        let executor: Executor<In, In, E, Res> =
208            Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
209
210        Self {
211            schematic,
212            executor,
213            execution_mode: ExecutionMode::Local,
214            persistence_store: None,
215            audit_sink: None,
216            dlq_sink: None,
217            dlq_policy: DlqPolicy::default(),
218            dynamic_dlq_policy: None,
219            saga_policy: SagaPolicy::default(),
220            dynamic_saga_policy: None,
221            saga_compensation_registry: Arc::new(std::sync::RwLock::new(
222                ranvier_core::saga::SagaCompensationRegistry::new(),
223            )),
224            iam_handle: None,
225        }
226    }
227}
228
229impl<In, Out, E, Res> Axon<In, Out, E, Res>
230where
231    In: Send + Sync + Serialize + DeserializeOwned + 'static,
232    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
233    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
234    Res: ranvier_core::transition::ResourceRequirement,
235{
236    /// Update the Execution Mode for this Axon (e.g., Local vs Singleton).
237    pub fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
238        self.execution_mode = mode;
239        self
240    }
241
242    /// Set the schematic version for this Axon.
243    pub fn with_version(mut self, version: impl Into<String>) -> Self {
244        self.schematic.schema_version = version.into();
245        self
246    }
247
248    /// Attach a persistence store to enable state inspection via the Inspector.
249    pub fn with_persistence_store<S>(mut self, store: S) -> Self
250    where
251        S: crate::persistence::PersistenceStore + 'static,
252    {
253        self.persistence_store = Some(Arc::new(store));
254        self
255    }
256
257    /// Attach an audit sink for tamper-evident logging.
258    pub fn with_audit_sink<S>(mut self, sink: S) -> Self
259    where
260        S: AuditSink + 'static,
261    {
262        self.audit_sink = Some(Arc::new(sink));
263        self
264    }
265
266    /// Set the Dead Letter Queue sink for this Axon.
267    pub fn with_dlq_sink<S>(mut self, sink: S) -> Self
268    where
269        S: DlqSink + 'static,
270    {
271        self.dlq_sink = Some(Arc::new(sink));
272        self
273    }
274
275    /// Set the Dead Letter Queue policy for this Axon.
276    pub fn with_dlq_policy(mut self, policy: DlqPolicy) -> Self {
277        self.dlq_policy = policy;
278        self
279    }
280
281    /// Set the Saga compensation policy for this Axon.
282    pub fn with_saga_policy(mut self, policy: SagaPolicy) -> Self {
283        self.saga_policy = policy;
284        self
285    }
286
287    /// Set a dynamic (hot-reloadable) DLQ policy. When set, the dynamic policy's
288    /// current value is read at each execution, overriding the static `dlq_policy`.
289    pub fn with_dynamic_dlq_policy(mut self, dynamic: DynamicPolicy<DlqPolicy>) -> Self {
290        self.dynamic_dlq_policy = Some(dynamic);
291        self
292    }
293
294    /// Set a dynamic (hot-reloadable) Saga policy. When set, the dynamic policy's
295    /// current value is read at each execution, overriding the static `saga_policy`.
296    pub fn with_dynamic_saga_policy(mut self, dynamic: DynamicPolicy<SagaPolicy>) -> Self {
297        self.dynamic_saga_policy = Some(dynamic);
298        self
299    }
300
301    /// Set an IAM policy and verifier for identity verification at the Axon boundary.
302    ///
303    /// When set, `execute()` will:
304    /// 1. Read `IamToken` from the Bus (injected by the HTTP layer or test harness)
305    /// 2. Verify the token using the provided verifier
306    /// 3. Enforce the policy against the verified identity
307    /// 4. Insert the resulting `IamIdentity` into the Bus for downstream Transitions
308    pub fn with_iam(
309        mut self,
310        policy: ranvier_core::iam::IamPolicy,
311        verifier: impl ranvier_core::iam::IamVerifier + 'static,
312    ) -> Self {
313        self.iam_handle = Some(ranvier_core::iam::IamHandle::new(
314            policy,
315            Arc::new(verifier),
316        ));
317        self
318    }
319
320    /// Attach a JSON Schema for the **last node's input type** in the schematic.
321    ///
322    /// Requires the `schema` feature and `T: schemars::JsonSchema`.
323    ///
324    /// ```rust,ignore
325    /// let axon = Axon::new("My Circuit")
326    ///     .then(ProcessStep)
327    ///     .with_input_schema::<CreateUserRequest>()
328    ///     .with_output_schema::<UserResponse>();
329    /// ```
330    #[cfg(feature = "schema")]
331    pub fn with_input_schema<T>(mut self) -> Self
332    where
333        T: schemars::JsonSchema,
334    {
335        if let Some(last_node) = self.schematic.nodes.last_mut() {
336            let schema = schemars::schema_for!(T);
337            last_node.input_schema =
338                Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
339        }
340        self
341    }
342
343    /// Attach a JSON Schema for the **last node's output type** in the schematic.
344    ///
345    /// Requires the `schema` feature and `T: schemars::JsonSchema`.
346    #[cfg(feature = "schema")]
347    pub fn with_output_schema<T>(mut self) -> Self
348    where
349        T: schemars::JsonSchema,
350    {
351        if let Some(last_node) = self.schematic.nodes.last_mut() {
352            let schema = schemars::schema_for!(T);
353            last_node.output_schema =
354                Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
355        }
356        self
357    }
358
359    /// Attach a raw JSON Schema value for the **last node's input type** in the schematic.
360    ///
361    /// Use this for pre-built schemas without requiring the `schema` feature.
362    pub fn with_input_schema_value(mut self, schema: serde_json::Value) -> Self {
363        if let Some(last_node) = self.schematic.nodes.last_mut() {
364            last_node.input_schema = Some(schema);
365        }
366        self
367    }
368
369    /// Attach a raw JSON Schema value for the **last node's output type** in the schematic.
370    pub fn with_output_schema_value(mut self, schema: serde_json::Value) -> Self {
371        if let Some(last_node) = self.schematic.nodes.last_mut() {
372            last_node.output_schema = Some(schema);
373        }
374        self
375    }
376}
377
378#[async_trait]
379impl<In, Out, E, Res> ranvier_inspector::StateInspector for Axon<In, Out, E, Res>
380where
381    In: Send + Sync + Serialize + DeserializeOwned + 'static,
382    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
383    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
384    Res: ranvier_core::transition::ResourceRequirement,
385{
386    async fn get_state(&self, trace_id: &str) -> Option<serde_json::Value> {
387        let store = self.persistence_store.as_ref()?;
388        let trace = store.load(trace_id).await.ok().flatten()?;
389        Some(serde_json::to_value(trace).unwrap_or(serde_json::Value::Null))
390    }
391
392    async fn force_resume(
393        &self,
394        trace_id: &str,
395        target_node: &str,
396        payload_override: Option<Value>,
397    ) -> Result<(), String> {
398        let store = self
399            .persistence_store
400            .as_ref()
401            .ok_or("No persistence store attached to Axon")?;
402
403        let intervention = crate::persistence::Intervention {
404            target_node: target_node.to_string(),
405            payload_override,
406            timestamp_ms: now_ms(),
407        };
408
409        store
410            .save_intervention(trace_id, intervention)
411            .await
412            .map_err(|e| format!("Failed to save intervention: {}", e))?;
413
414        if let Some(sink) = self.audit_sink.as_ref() {
415            let event = AuditEvent::new(
416                uuid::Uuid::new_v4().to_string(),
417                "Inspector".to_string(),
418                "ForceResume".to_string(),
419                trace_id.to_string(),
420            )
421            .with_metadata("target_node", target_node);
422
423            let _ = sink.append(&event).await;
424        }
425
426        tracing::info!(trace_id = %trace_id, target_node = %target_node, "Force resume requested via Inspector");
427        Ok(())
428    }
429}
430
431impl<In, Out, E, Res> Axon<In, Out, E, Res>
432where
433    In: Send + Sync + Serialize + DeserializeOwned + 'static,
434    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
435    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
436    Res: ranvier_core::transition::ResourceRequirement,
437{
438    /// Chain a transition to this Axon.
439    ///
440    /// Requires the transition to use the SAME resource bundle as the previous steps.
441    #[track_caller]
442    pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
443    where
444        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
445        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
446    {
447        let caller = Location::caller();
448        // Decompose self to avoid partial move issues
449        let Axon {
450            mut schematic,
451            executor: prev_executor,
452            execution_mode,
453            persistence_store,
454            audit_sink,
455            dlq_sink,
456            dlq_policy,
457            dynamic_dlq_policy,
458            saga_policy,
459            dynamic_saga_policy,
460            saga_compensation_registry,
461            iam_handle,
462        } = self;
463
464        // Update Schematic
465        let next_node_id = uuid::Uuid::new_v4().to_string();
466        let next_node = Node {
467            id: next_node_id.clone(),
468            kind: NodeKind::Atom,
469            label: transition.label(),
470            description: transition.description(),
471            input_type: type_name_of::<Out>(),
472            output_type: type_name_of::<Next>(),
473            resource_type: type_name_of::<Res>(),
474            metadata: Default::default(),
475            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
476            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
477            position: transition
478                .position()
479                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
480            compensation_node_id: None,
481            input_schema: transition.input_schema(),
482            output_schema: None,
483        };
484
485        let last_node_id = schematic
486            .nodes
487            .last()
488            .map(|n| n.id.clone())
489            .unwrap_or_default();
490
491        schematic.nodes.push(next_node);
492        schematic.edges.push(Edge {
493            from: last_node_id,
494            to: next_node_id.clone(),
495            kind: EdgeType::Linear,
496            label: Some("Next".to_string()),
497        });
498
499        // Compose Executor
500        let node_id_for_exec = next_node_id.clone();
501        let node_label_for_exec = transition.label();
502        let bus_policy_for_exec = transition.bus_access_policy();
503        let bus_policy_clone = bus_policy_for_exec.clone();
504        let current_step_idx = schematic.nodes.len() as u64 - 1;
505        let next_executor: Executor<In, Next, E, Res> = Arc::new(
506            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
507                let prev = prev_executor.clone();
508                let trans = transition.clone();
509                let timeline_node_id = node_id_for_exec.clone();
510                let timeline_node_label = node_label_for_exec.clone();
511                let transition_bus_policy = bus_policy_clone.clone();
512                let step_idx = current_step_idx;
513
514                Box::pin(async move {
515                    // Check for manual intervention jump
516                    if let Some(jump) = bus.read::<ManualJump>()
517                        && (jump.target_node == timeline_node_id
518                            || jump.target_node == timeline_node_label)
519                    {
520                        tracing::info!(
521                            node_id = %timeline_node_id,
522                            node_label = %timeline_node_label,
523                            "Manual jump target reached; skipping previous steps"
524                        );
525
526                        let state = if let Some(ow) = jump.payload_override.clone() {
527                            match serde_json::from_value::<Out>(ow) {
528                                Ok(s) => s,
529                                Err(e) => {
530                                    tracing::error!(
531                                        "Payload override deserialization failed: {}",
532                                        e
533                                    );
534                                    return Outcome::emit(
535                                        "execution.jump.payload_error",
536                                        Some(serde_json::json!({"error": e.to_string()})),
537                                    )
538                                    .map(|_: ()| unreachable!());
539                                }
540                            }
541                        } else {
542                            // Default back to the provided input if this is an identity jump or types match
543                            // For now, treat missing payload on a mid-flow jump as an avoidable error if Possible.
544                            // In a better implementation, we'd try to load the last persisted Out for the previous step.
545                            return Outcome::emit(
546                                "execution.jump.missing_payload",
547                                Some(serde_json::json!({"node_id": timeline_node_id})),
548                            );
549                        };
550
551                        // Skip prev() and continue with trans.run(state, ...)
552                        return run_this_step::<Out, Next, E, Res>(
553                            &trans,
554                            state,
555                            res,
556                            bus,
557                            &timeline_node_id,
558                            &timeline_node_label,
559                            &transition_bus_policy,
560                            step_idx,
561                        )
562                        .await;
563                    }
564
565                    // Handle resumption skip
566                    if let Some(start) = bus.read::<StartStep>()
567                        && step_idx == start.0
568                        && bus.read::<ResumptionState>().is_some()
569                    {
570                        // Prefer fresh input (In → Out via JSON round-trip).
571                        // The caller provides updated state (e.g., corrected data after a fault).
572                        // Falls back to persisted checkpoint state when types are incompatible.
573                        let fresh_state = serde_json::to_value(&input)
574                            .ok()
575                            .and_then(|v| serde_json::from_value::<Out>(v).ok());
576                        let persisted_state = bus
577                            .read::<ResumptionState>()
578                            .and_then(|r| r.payload.clone())
579                            .and_then(|p| serde_json::from_value::<Out>(p).ok());
580
581                        if let Some(s) = fresh_state.or(persisted_state) {
582                            tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint");
583                            return run_this_step::<Out, Next, E, Res>(
584                                &trans,
585                                s,
586                                res,
587                                bus,
588                                &timeline_node_id,
589                                &timeline_node_label,
590                                &transition_bus_policy,
591                                step_idx,
592                            )
593                            .await;
594                        }
595
596                        return Outcome::emit(
597                            "execution.resumption.payload_error",
598                            Some(serde_json::json!({"error": "no compatible resumption state"})),
599                        )
600                        .map(|_: ()| unreachable!());
601                    }
602
603                    // Run previous step
604                    let prev_result = prev(input, res, bus).await;
605
606                    // Unpack
607                    let state = match prev_result {
608                        Outcome::Next(t) => t,
609                        other => return other.map(|_| unreachable!()),
610                    };
611
612                    run_this_step::<Out, Next, E, Res>(
613                        &trans,
614                        state,
615                        res,
616                        bus,
617                        &timeline_node_id,
618                        &timeline_node_label,
619                        &transition_bus_policy,
620                        step_idx,
621                    )
622                    .await
623                })
624            },
625        );
626        Axon {
627            schematic,
628            executor: next_executor,
629            execution_mode,
630            persistence_store,
631            audit_sink,
632            dlq_sink,
633            dlq_policy,
634            dynamic_dlq_policy,
635            saga_policy,
636            dynamic_saga_policy,
637            saga_compensation_registry,
638            iam_handle,
639        }
640    }
641
642    /// Chain a transition with a retry policy.
643    ///
644    /// If the transition returns `Outcome::Fault`, it will be retried up to
645    /// `policy.max_retries` times with the configured backoff strategy.
646    /// Timeline events are recorded for each retry attempt.
647    ///
648    /// # Example
649    ///
650    /// ```rust,ignore
651    /// use ranvier_runtime::{Axon, RetryPolicy};
652    /// use std::time::Duration;
653    ///
654    /// let axon = Axon::new("pipeline")
655    ///     .then_with_retry(my_transition, RetryPolicy::fixed(3, Duration::from_millis(100)));
656    /// ```
657    #[track_caller]
658    pub fn then_with_retry<Next, Trans>(
659        self,
660        transition: Trans,
661        policy: crate::retry::RetryPolicy,
662    ) -> Axon<In, Next, E, Res>
663    where
664        Out: Clone,
665        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
666        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
667    {
668        let caller = Location::caller();
669        let Axon {
670            mut schematic,
671            executor: prev_executor,
672            execution_mode,
673            persistence_store,
674            audit_sink,
675            dlq_sink,
676            dlq_policy,
677            dynamic_dlq_policy,
678            saga_policy,
679            dynamic_saga_policy,
680            saga_compensation_registry,
681            iam_handle,
682        } = self;
683
684        let next_node_id = uuid::Uuid::new_v4().to_string();
685        let next_node = Node {
686            id: next_node_id.clone(),
687            kind: NodeKind::Atom,
688            label: transition.label(),
689            description: transition.description(),
690            input_type: type_name_of::<Out>(),
691            output_type: type_name_of::<Next>(),
692            resource_type: type_name_of::<Res>(),
693            metadata: Default::default(),
694            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
695            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
696            position: transition
697                .position()
698                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
699            compensation_node_id: None,
700            input_schema: transition.input_schema(),
701            output_schema: None,
702        };
703
704        let last_node_id = schematic
705            .nodes
706            .last()
707            .map(|n| n.id.clone())
708            .unwrap_or_default();
709
710        schematic.nodes.push(next_node);
711        schematic.edges.push(Edge {
712            from: last_node_id,
713            to: next_node_id.clone(),
714            kind: EdgeType::Linear,
715            label: Some("Next (retryable)".to_string()),
716        });
717
718        let node_id_for_exec = next_node_id.clone();
719        let node_label_for_exec = transition.label();
720        let bus_policy_for_exec = transition.bus_access_policy();
721        let bus_policy_clone = bus_policy_for_exec.clone();
722        let current_step_idx = schematic.nodes.len() as u64 - 1;
723        let next_executor: Executor<In, Next, E, Res> = Arc::new(
724            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
725                let prev = prev_executor.clone();
726                let trans = transition.clone();
727                let timeline_node_id = node_id_for_exec.clone();
728                let timeline_node_label = node_label_for_exec.clone();
729                let transition_bus_policy = bus_policy_clone.clone();
730                let step_idx = current_step_idx;
731                let retry_policy = policy.clone();
732
733                Box::pin(async move {
734                    // Run previous step
735                    let prev_result = prev(input, res, bus).await;
736                    let state = match prev_result {
737                        Outcome::Next(t) => t,
738                        other => return other.map(|_| unreachable!()),
739                    };
740
741                    // Attempt with retries
742                    let mut last_result = None;
743                    for attempt in 0..=retry_policy.max_retries {
744                        let attempt_state = state.clone();
745
746                        let result = run_this_step::<Out, Next, E, Res>(
747                            &trans,
748                            attempt_state,
749                            res,
750                            bus,
751                            &timeline_node_id,
752                            &timeline_node_label,
753                            &transition_bus_policy,
754                            step_idx,
755                        )
756                        .await;
757
758                        match &result {
759                            Outcome::Next(_) => return result,
760                            Outcome::Fault(_) if attempt < retry_policy.max_retries => {
761                                let delay = retry_policy.delay_for_attempt(attempt);
762                                tracing::warn!(
763                                    node_id = %timeline_node_id,
764                                    attempt = attempt + 1,
765                                    max = retry_policy.max_retries,
766                                    delay_ms = delay.as_millis() as u64,
767                                    "Transition failed, retrying"
768                                );
769                                if let Some(timeline) = bus.read_mut::<Timeline>() {
770                                    timeline.push(TimelineEvent::NodeRetry {
771                                        node_id: timeline_node_id.clone(),
772                                        attempt: attempt + 1,
773                                        max_attempts: retry_policy.max_retries,
774                                        backoff_ms: delay.as_millis() as u64,
775                                        timestamp: now_ms(),
776                                    });
777                                }
778                                tokio::time::sleep(delay).await;
779                            }
780                            _ => {
781                                last_result = Some(result);
782                                break;
783                            }
784                        }
785                    }
786
787                    last_result.unwrap_or_else(|| {
788                        Outcome::emit("execution.retry.exhausted", None)
789                    })
790                })
791            },
792        );
793        Axon {
794            schematic,
795            executor: next_executor,
796            execution_mode,
797            persistence_store,
798            audit_sink,
799            dlq_sink,
800            dlq_policy,
801            dynamic_dlq_policy,
802            saga_policy,
803            dynamic_saga_policy,
804            saga_compensation_registry,
805            iam_handle,
806        }
807    }
808
809    /// Chain a transition to this Axon with a Saga compensation step.
810    ///
811    /// If the transition fails, the compensation transition will be executed
812    /// automatically if `CompensationAutoTrigger` is enabled in the Bus.
813    #[track_caller]
814    pub fn then_compensated<Next, Trans, Comp>(
815        self,
816        transition: Trans,
817        compensation: Comp,
818    ) -> Axon<In, Next, E, Res>
819    where
820        Out: Clone,
821        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
822        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
823        Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
824    {
825        let caller = Location::caller();
826        let Axon {
827            mut schematic,
828            executor: prev_executor,
829            execution_mode,
830            persistence_store,
831            audit_sink,
832            dlq_sink,
833            dlq_policy,
834            dynamic_dlq_policy,
835            saga_policy,
836            dynamic_saga_policy,
837            saga_compensation_registry,
838            iam_handle,
839        } = self;
840
841        // 1. Add Primary Node
842        let next_node_id = uuid::Uuid::new_v4().to_string();
843        let comp_node_id = uuid::Uuid::new_v4().to_string();
844
845        let next_node = Node {
846            id: next_node_id.clone(),
847            kind: NodeKind::Atom,
848            label: transition.label(),
849            description: transition.description(),
850            input_type: type_name_of::<Out>(),
851            output_type: type_name_of::<Next>(),
852            resource_type: type_name_of::<Res>(),
853            metadata: Default::default(),
854            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
855            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
856            position: transition
857                .position()
858                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
859            compensation_node_id: Some(comp_node_id.clone()),
860            input_schema: None,
861            output_schema: None,
862        };
863
864        // 2. Add Compensation Node
865        let comp_node = Node {
866            id: comp_node_id.clone(),
867            kind: NodeKind::Atom,
868            label: format!("Compensate: {}", compensation.label()),
869            description: compensation.description(),
870            input_type: type_name_of::<Out>(),
871            output_type: "void".to_string(),
872            resource_type: type_name_of::<Res>(),
873            metadata: Default::default(),
874            bus_capability: None,
875            source_location: None,
876            position: compensation
877                .position()
878                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
879            compensation_node_id: None,
880            input_schema: None,
881            output_schema: None,
882        };
883
884        let last_node_id = schematic
885            .nodes
886            .last()
887            .map(|n| n.id.clone())
888            .unwrap_or_default();
889
890        schematic.nodes.push(next_node);
891        schematic.nodes.push(comp_node);
892        schematic.edges.push(Edge {
893            from: last_node_id,
894            to: next_node_id.clone(),
895            kind: EdgeType::Linear,
896            label: Some("Next".to_string()),
897        });
898
899        // 3. Compose Executor with Compensation Logic
900        let node_id_for_exec = next_node_id.clone();
901        let comp_id_for_exec = comp_node_id.clone();
902        let node_label_for_exec = transition.label();
903        let bus_policy_for_exec = transition.bus_access_policy();
904        let step_idx_for_exec = schematic.nodes.len() as u64 - 2;
905        let comp_for_exec = compensation.clone();
906        let bus_policy_for_executor = bus_policy_for_exec.clone();
907        let bus_policy_for_registry = bus_policy_for_exec.clone();
908        let next_executor: Executor<In, Next, E, Res> = Arc::new(
909            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
910                let prev = prev_executor.clone();
911                let trans = transition.clone();
912                let comp = comp_for_exec.clone();
913                let timeline_node_id = node_id_for_exec.clone();
914                let timeline_comp_id = comp_id_for_exec.clone();
915                let timeline_node_label = node_label_for_exec.clone();
916                let transition_bus_policy = bus_policy_for_executor.clone();
917                let step_idx = step_idx_for_exec;
918
919                Box::pin(async move {
920                    // Check for manual intervention jump
921                    if let Some(jump) = bus.read::<ManualJump>()
922                        && (jump.target_node == timeline_node_id
923                            || jump.target_node == timeline_node_label)
924                    {
925                        tracing::info!(
926                            node_id = %timeline_node_id,
927                            node_label = %timeline_node_label,
928                            "Manual jump target reached (compensated); skipping previous steps"
929                        );
930
931                        let state = if let Some(ow) = jump.payload_override.clone() {
932                            match serde_json::from_value::<Out>(ow) {
933                                Ok(s) => s,
934                                Err(e) => {
935                                    tracing::error!(
936                                        "Payload override deserialization failed: {}",
937                                        e
938                                    );
939                                    return Outcome::emit(
940                                        "execution.jump.payload_error",
941                                        Some(serde_json::json!({"error": e.to_string()})),
942                                    )
943                                    .map(|_: ()| unreachable!());
944                                }
945                            }
946                        } else {
947                            return Outcome::emit(
948                                "execution.jump.missing_payload",
949                                Some(serde_json::json!({"node_id": timeline_node_id})),
950                            )
951                            .map(|_: ()| unreachable!());
952                        };
953
954                        // Skip prev() and continue with trans.run(state, ...)
955                        return run_this_compensated_step::<Out, Next, E, Res, Comp>(
956                            &trans,
957                            &comp,
958                            state,
959                            res,
960                            bus,
961                            &timeline_node_id,
962                            &timeline_comp_id,
963                            &timeline_node_label,
964                            &transition_bus_policy,
965                            step_idx,
966                        )
967                        .await;
968                    }
969
970                    // Handle resumption skip
971                    if let Some(start) = bus.read::<StartStep>()
972                        && step_idx == start.0
973                        && bus.read::<ResumptionState>().is_some()
974                    {
975                        let fresh_state = serde_json::to_value(&input)
976                            .ok()
977                            .and_then(|v| serde_json::from_value::<Out>(v).ok());
978                        let persisted_state = bus
979                            .read::<ResumptionState>()
980                            .and_then(|r| r.payload.clone())
981                            .and_then(|p| serde_json::from_value::<Out>(p).ok());
982
983                        if let Some(s) = fresh_state.or(persisted_state) {
984                            tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint (compensated)");
985                            return run_this_compensated_step::<Out, Next, E, Res, Comp>(
986                                &trans,
987                                &comp,
988                                s,
989                                res,
990                                bus,
991                                &timeline_node_id,
992                                &timeline_comp_id,
993                                &timeline_node_label,
994                                &transition_bus_policy,
995                                step_idx,
996                            )
997                            .await;
998                        }
999
1000                        return Outcome::emit(
1001                            "execution.resumption.payload_error",
1002                            Some(serde_json::json!({"error": "no compatible resumption state"})),
1003                        )
1004                        .map(|_: ()| unreachable!());
1005                    }
1006
1007                    // Run previous step
1008                    let prev_result = prev(input, res, bus).await;
1009
1010                    // Unpack
1011                    let state = match prev_result {
1012                        Outcome::Next(t) => t,
1013                        other => return other.map(|_| unreachable!()),
1014                    };
1015
1016                    run_this_compensated_step::<Out, Next, E, Res, Comp>(
1017                        &trans,
1018                        &comp,
1019                        state,
1020                        res,
1021                        bus,
1022                        &timeline_node_id,
1023                        &timeline_comp_id,
1024                        &timeline_node_label,
1025                        &transition_bus_policy,
1026                        step_idx,
1027                    )
1028                    .await
1029                })
1030            },
1031        );
1032        // 4. Register Saga Compensation if enabled
1033        {
1034            let mut registry = saga_compensation_registry.write().unwrap();
1035            let comp_fn = compensation.clone();
1036            let transition_bus_policy = bus_policy_for_registry.clone();
1037
1038            let handler: ranvier_core::saga::SagaCompensationFn<E, Res> =
1039                Arc::new(move |input_data, res, bus| {
1040                    let comp = comp_fn.clone();
1041                    let bus_policy = transition_bus_policy.clone();
1042                    Box::pin(async move {
1043                        let input: Out = serde_json::from_slice(&input_data).unwrap();
1044                        bus.set_access_policy(comp.label(), bus_policy);
1045                        let res = comp.run(input, res, bus).await;
1046                        bus.clear_access_policy();
1047                        res
1048                    })
1049                });
1050            registry.register(next_node_id.clone(), handler);
1051        }
1052
1053        Axon {
1054            schematic,
1055            executor: next_executor,
1056            execution_mode,
1057            persistence_store,
1058            audit_sink,
1059            dlq_sink,
1060            dlq_policy,
1061            dynamic_dlq_policy,
1062            saga_policy,
1063            dynamic_saga_policy,
1064            saga_compensation_registry,
1065            iam_handle,
1066        }
1067    }
1068
1069    /// Attach a compensation transition to the previously added node.
1070    /// This establishes a Schematic-level Saga compensation mapping.
1071    #[track_caller]
1072    pub fn compensate_with<Comp>(mut self, transition: Comp) -> Self
1073    where
1074        Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1075    {
1076        // NOTE: This currently only updates the Schematic.
1077        // For runtime compensation behavior, use `then_compensated`.
1078        let caller = Location::caller();
1079        let comp_node_id = uuid::Uuid::new_v4().to_string();
1080
1081        let comp_node = Node {
1082            id: comp_node_id.clone(),
1083            kind: NodeKind::Atom,
1084            label: transition.label(),
1085            description: transition.description(),
1086            input_type: type_name_of::<Out>(),
1087            output_type: "void".to_string(),
1088            resource_type: type_name_of::<Res>(),
1089            metadata: Default::default(),
1090            bus_capability: None,
1091            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1092            position: transition
1093                .position()
1094                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1095            compensation_node_id: None,
1096            input_schema: None,
1097            output_schema: None,
1098        };
1099
1100        if let Some(last_node) = self.schematic.nodes.last_mut() {
1101            last_node.compensation_node_id = Some(comp_node_id.clone());
1102        }
1103
1104        self.schematic.nodes.push(comp_node);
1105        self
1106    }
1107
1108    /// Add a branch point
1109    #[track_caller]
1110    pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
1111        let caller = Location::caller();
1112        let branch_id_str = branch_id.into();
1113        let last_node_id = self
1114            .schematic
1115            .nodes
1116            .last()
1117            .map(|n| n.id.clone())
1118            .unwrap_or_default();
1119
1120        let branch_node = Node {
1121            id: uuid::Uuid::new_v4().to_string(),
1122            kind: NodeKind::Synapse,
1123            label: label.to_string(),
1124            description: None,
1125            input_type: type_name_of::<Out>(),
1126            output_type: type_name_of::<Out>(),
1127            resource_type: type_name_of::<Res>(),
1128            metadata: Default::default(),
1129            bus_capability: None,
1130            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1131            position: None,
1132            compensation_node_id: None,
1133            input_schema: None,
1134            output_schema: None,
1135        };
1136
1137        self.schematic.nodes.push(branch_node);
1138        self.schematic.edges.push(Edge {
1139            from: last_node_id,
1140            to: branch_id_str.clone(),
1141            kind: EdgeType::Branch(branch_id_str),
1142            label: Some("Branch".to_string()),
1143        });
1144
1145        self
1146    }
1147
1148    /// Execute the Axon with the given input and resources.
1149    pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
1150        if let ExecutionMode::Singleton {
1151            lock_key,
1152            ttl_ms,
1153            lock_provider,
1154        } = &self.execution_mode
1155        {
1156            let trace_span = tracing::info_span!("Singleton Execution", key = %lock_key);
1157            let _enter = trace_span.enter();
1158            match lock_provider.try_acquire(lock_key, *ttl_ms).await {
1159                Ok(true) => {
1160                    tracing::debug!("Successfully acquired singleton lock: {}", lock_key);
1161                }
1162                Ok(false) => {
1163                    tracing::debug!(
1164                        "Singleton lock {} already held, aborting execution.",
1165                        lock_key
1166                    );
1167                    // Emit a specific event indicating skip
1168                    return Outcome::emit(
1169                        "execution.skipped.lock_held",
1170                        Some(serde_json::json!({
1171                            "lock_key": lock_key
1172                        })),
1173                    );
1174                }
1175                Err(e) => {
1176                    tracing::error!("Failed to check singleton lock {}: {:?}", lock_key, e);
1177                    return Outcome::emit(
1178                        "execution.skipped.lock_error",
1179                        Some(serde_json::json!({
1180                            "error": e.to_string()
1181                        })),
1182                    );
1183                }
1184            }
1185        }
1186
1187        // ── IAM Boundary Check ────────────────────────────────────
1188        if let Some(iam) = &self.iam_handle {
1189            use ranvier_core::iam::{IamPolicy, IamToken, enforce_policy};
1190
1191            if matches!(iam.policy, IamPolicy::None) {
1192                // No verification required — skip
1193            } else {
1194                let token = bus.read::<IamToken>().map(|t| t.0.clone());
1195
1196                match token {
1197                    Some(raw_token) => {
1198                        match iam.verifier.verify(&raw_token).await {
1199                            Ok(identity) => {
1200                                if let Err(e) = enforce_policy(&iam.policy, &identity) {
1201                                    tracing::warn!(
1202                                        policy = ?iam.policy,
1203                                        subject = %identity.subject,
1204                                        "IAM policy enforcement failed: {}",
1205                                        e
1206                                    );
1207                                    return Outcome::emit(
1208                                        "iam.policy_denied",
1209                                        Some(serde_json::json!({
1210                                            "error": e.to_string(),
1211                                            "subject": identity.subject,
1212                                        })),
1213                                    );
1214                                }
1215                                // Insert verified identity into Bus for downstream access
1216                                bus.insert(identity);
1217                            }
1218                            Err(e) => {
1219                                tracing::warn!("IAM token verification failed: {}", e);
1220                                return Outcome::emit(
1221                                    "iam.verification_failed",
1222                                    Some(serde_json::json!({
1223                                        "error": e.to_string()
1224                                    })),
1225                                );
1226                            }
1227                        }
1228                    }
1229                    None => {
1230                        tracing::warn!("IAM policy requires token but none found in Bus");
1231                        return Outcome::emit("iam.missing_token", None);
1232                    }
1233                }
1234            }
1235        }
1236
1237        let trace_id = persistence_trace_id(bus);
1238        let label = self.schematic.name.clone();
1239
1240        // Inject DLQ config into Bus for step-level reporting
1241        if let Some(sink) = &self.dlq_sink {
1242            bus.insert(sink.clone());
1243        }
1244        // Prefer dynamic (hot-reloadable) policy if available, otherwise use static
1245        let effective_dlq_policy = self
1246            .dynamic_dlq_policy
1247            .as_ref()
1248            .map(|d| d.current())
1249            .unwrap_or_else(|| self.dlq_policy.clone());
1250        bus.insert(effective_dlq_policy);
1251        bus.insert(self.schematic.clone());
1252        let effective_saga_policy = self
1253            .dynamic_saga_policy
1254            .as_ref()
1255            .map(|d| d.current())
1256            .unwrap_or_else(|| self.saga_policy.clone());
1257        bus.insert(effective_saga_policy.clone());
1258
1259        // Initialize Saga stack if enabled and not already present (e.g. from resumption)
1260        if effective_saga_policy == SagaPolicy::Enabled && bus.read::<SagaStack>().is_none() {
1261            bus.insert(SagaStack::new());
1262        }
1263
1264        let persistence_handle = bus.read::<PersistenceHandle>().cloned();
1265        let compensation_handle = bus.read::<CompensationHandle>().cloned();
1266        let compensation_retry_policy = compensation_retry_policy(bus);
1267        let compensation_idempotency = bus.read::<CompensationIdempotencyHandle>().cloned();
1268        let version = self.schematic.schema_version.clone();
1269        let migration_registry = bus
1270            .read::<ranvier_core::schematic::MigrationRegistry>()
1271            .cloned();
1272
1273        let persistence_start_step = if let Some(handle) = persistence_handle.as_ref() {
1274            let (mut start_step, trace_version, intervention, last_node_id, mut last_payload) =
1275                load_persistence_version(handle, &trace_id).await;
1276
1277            if let Some(interv) = intervention {
1278                tracing::info!(
1279                    trace_id = %trace_id,
1280                    target_node = %interv.target_node,
1281                    "Applying manual intervention command"
1282                );
1283
1284                // Find the step index for the target node
1285                if let Some(target_idx) = self
1286                    .schematic
1287                    .nodes
1288                    .iter()
1289                    .position(|n| n.id == interv.target_node || n.label == interv.target_node)
1290                {
1291                    tracing::info!(
1292                        trace_id = %trace_id,
1293                        target_node = %interv.target_node,
1294                        target_step = target_idx,
1295                        "Intervention: Jumping to target node"
1296                    );
1297                    start_step = target_idx as u64;
1298
1299                    // Inject ManualJump into the bus so executors can handle skipping/payload overrides
1300                    bus.insert(ManualJump {
1301                        target_node: interv.target_node.clone(),
1302                        payload_override: interv.payload_override.clone(),
1303                    });
1304
1305                    // Log audit event for intervention application
1306                    if let Some(sink) = self.audit_sink.as_ref() {
1307                        let event = AuditEvent::new(
1308                            uuid::Uuid::new_v4().to_string(),
1309                            "System".to_string(),
1310                            "ApplyIntervention".to_string(),
1311                            trace_id.to_string(),
1312                        )
1313                        .with_metadata("target_node", interv.target_node.clone())
1314                        .with_metadata("target_step", target_idx);
1315
1316                        let _ = sink.append(&event).await;
1317                    }
1318                } else {
1319                    tracing::warn!(
1320                        trace_id = %trace_id,
1321                        target_node = %interv.target_node,
1322                        "Intervention target node not found in schematic; ignoring jump"
1323                    );
1324                }
1325            }
1326
1327            if let Some(old_version) = trace_version
1328                && old_version != version
1329            {
1330                tracing::info!(
1331                    trace_id = %trace_id,
1332                    old_version = %old_version,
1333                    current_version = %version,
1334                    "Version mismatch detected during resumption"
1335                );
1336
1337                // Try multi-hop migration path first, fall back to direct lookup
1338                let migration_path = migration_registry
1339                    .as_ref()
1340                    .and_then(|r| r.find_migration_path(&old_version, &version));
1341
1342                let (final_migration, mapped_payload) = if let Some(path) = migration_path {
1343                    if path.is_empty() {
1344                        (None, last_payload.clone())
1345                    } else {
1346                        // Apply payload mappers along the migration chain
1347                        let mut payload = last_payload.clone();
1348                        for hop in &path {
1349                            if let (Some(mapper), Some(p)) = (&hop.payload_mapper, payload.as_ref())
1350                            {
1351                                match mapper.map_state(p) {
1352                                    Ok(mapped) => payload = Some(mapped),
1353                                    Err(e) => {
1354                                        tracing::error!(
1355                                            trace_id = %trace_id,
1356                                            from = %hop.from_version,
1357                                            to = %hop.to_version,
1358                                            error = %e,
1359                                            "Payload migration mapper failed"
1360                                        );
1361                                        return Outcome::emit(
1362                                            "execution.resumption.payload_migration_failed",
1363                                            Some(serde_json::json!({
1364                                                "trace_id": trace_id,
1365                                                "from": hop.from_version,
1366                                                "to": hop.to_version,
1367                                                "error": e.to_string()
1368                                            })),
1369                                        );
1370                                    }
1371                                }
1372                            }
1373                        }
1374                        let hops: Vec<String> = path
1375                            .iter()
1376                            .map(|h| format!("{}->{}", h.from_version, h.to_version))
1377                            .collect();
1378                        tracing::info!(trace_id = %trace_id, hops = ?hops, "Applied multi-hop migration path");
1379                        (path.last().copied(), payload)
1380                    }
1381                } else {
1382                    (None, last_payload.clone())
1383                };
1384
1385                // Use the final migration in the path to determine strategy
1386                let migration = final_migration.or_else(|| {
1387                    migration_registry
1388                        .as_ref()
1389                        .and_then(|r| r.find_migration(&old_version, &version))
1390                });
1391
1392                // Update last_payload with mapped version
1393                if mapped_payload.is_some() {
1394                    last_payload = mapped_payload;
1395                }
1396
1397                let strategy = if let (Some(m), Some(node_id)) = (migration, last_node_id.as_ref())
1398                {
1399                    m.node_mapping
1400                        .get(node_id)
1401                        .cloned()
1402                        .unwrap_or(m.default_strategy.clone())
1403                } else {
1404                    migration
1405                        .map(|m| m.default_strategy.clone())
1406                        .unwrap_or(ranvier_core::schematic::MigrationStrategy::Fail)
1407                };
1408
1409                match strategy {
1410                    ranvier_core::schematic::MigrationStrategy::ResumeFromStart => {
1411                        tracing::info!(trace_id = %trace_id, "Applying ResumeFromStart migration strategy");
1412                        start_step = 0;
1413                    }
1414                    ranvier_core::schematic::MigrationStrategy::MigrateActiveNode {
1415                        new_node_id,
1416                        ..
1417                    } => {
1418                        tracing::info!(trace_id = %trace_id, to_node = %new_node_id, "Applying MigrateActiveNode strategy");
1419                        if let Some(target_idx) = self
1420                            .schematic
1421                            .nodes
1422                            .iter()
1423                            .position(|n| n.id == new_node_id || n.label == new_node_id)
1424                        {
1425                            start_step = target_idx as u64;
1426                        } else {
1427                            tracing::warn!(trace_id = %trace_id, "MigrateActiveNode: target node {} not found", new_node_id);
1428                            return Outcome::emit(
1429                                "execution.resumption.migration_target_not_found",
1430                                Some(serde_json::json!({ "node_id": new_node_id })),
1431                            );
1432                        }
1433                    }
1434                    ranvier_core::schematic::MigrationStrategy::FallbackToNode(node_id) => {
1435                        tracing::info!(trace_id = %trace_id, to_node = %node_id, "Applying FallbackToNode strategy");
1436                        if let Some(target_idx) = self
1437                            .schematic
1438                            .nodes
1439                            .iter()
1440                            .position(|n| n.id == node_id || n.label == node_id)
1441                        {
1442                            start_step = target_idx as u64;
1443                        } else {
1444                            tracing::warn!(trace_id = %trace_id, "FallbackToNode: node {} not found", node_id);
1445                            return Outcome::emit(
1446                                "execution.resumption.migration_target_not_found",
1447                                Some(serde_json::json!({ "node_id": node_id })),
1448                            );
1449                        }
1450                    }
1451                    ranvier_core::schematic::MigrationStrategy::Fail => {
1452                        tracing::error!(trace_id = %trace_id, "Version mismatch: no migration path found. Failing resumption.");
1453                        return Outcome::emit(
1454                            "execution.resumption.version_mismatch_failed",
1455                            Some(serde_json::json!({
1456                                "trace_id": trace_id,
1457                                "old_version": old_version,
1458                                "current_version": version
1459                            })),
1460                        );
1461                    }
1462                    _ => {
1463                        tracing::error!(trace_id = %trace_id, "Unsupported migration strategy: {:?}", strategy);
1464                        return Outcome::emit(
1465                            "execution.resumption.unsupported_migration",
1466                            Some(serde_json::json!({
1467                                "trace_id": trace_id,
1468                                "strategy": format!("{:?}", strategy)
1469                            })),
1470                        );
1471                    }
1472                }
1473            }
1474
1475            let ingress_node_id = self.schematic.nodes.first().map(|n| n.id.clone());
1476            persist_execution_event(
1477                handle,
1478                &trace_id,
1479                &label,
1480                &version,
1481                start_step,
1482                ingress_node_id,
1483                "Enter",
1484                None,
1485            )
1486            .await;
1487
1488            bus.insert(StartStep(start_step));
1489            if start_step > 0 {
1490                bus.insert(ResumptionState {
1491                    payload: last_payload,
1492                });
1493            }
1494
1495            Some(start_step)
1496        } else {
1497            None
1498        };
1499
1500        let should_capture = should_attach_timeline(bus);
1501        let inserted_timeline = if should_capture {
1502            ensure_timeline(bus)
1503        } else {
1504            false
1505        };
1506        let ingress_started = std::time::Instant::now();
1507        let ingress_enter_ts = now_ms();
1508        if should_capture
1509            && let (Some(timeline), Some(ingress)) =
1510                (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1511        {
1512            timeline.push(TimelineEvent::NodeEnter {
1513                node_id: ingress.id.clone(),
1514                node_label: ingress.label.clone(),
1515                timestamp: ingress_enter_ts,
1516            });
1517        }
1518
1519        let circuit_span = tracing::info_span!(
1520            "Circuit",
1521            ranvier.circuit = %label,
1522            ranvier.outcome_kind = tracing::field::Empty,
1523            ranvier.outcome_target = tracing::field::Empty
1524        );
1525        let outcome = (self.executor)(input, resources, bus)
1526            .instrument(circuit_span.clone())
1527            .await;
1528        circuit_span.record("ranvier.outcome_kind", outcome_kind_name(&outcome));
1529        if let Some(target) = outcome_target(&outcome) {
1530            circuit_span.record("ranvier.outcome_target", tracing::field::display(&target));
1531        }
1532
1533        // Automated Saga Rollback (LIFO)
1534        if matches!(outcome, Outcome::Fault(_)) && self.saga_policy == SagaPolicy::Enabled {
1535            while let Some(task) = {
1536                let mut stack = bus.read_mut::<SagaStack>();
1537                stack.as_mut().and_then(|s| s.pop())
1538            } {
1539                tracing::info!(trace_id = %trace_id, node_id = %task.node_id, "Compensating step: {}", task.node_label);
1540
1541                let handler = {
1542                    let registry = self.saga_compensation_registry.read().unwrap();
1543                    registry.get(&task.node_id)
1544                };
1545                if let Some(handler) = handler {
1546                    let res = handler(task.input_snapshot, resources, bus).await;
1547                    if let Outcome::Fault(e) = res {
1548                        tracing::error!(trace_id = %trace_id, node_id = %task.node_id, "Saga compensation FAILED: {:?}", e);
1549                    }
1550                } else {
1551                    tracing::warn!(trace_id = %trace_id, node_id = %task.node_id, "No compensation handler found in registry for saga rollback");
1552                }
1553            }
1554            tracing::info!(trace_id = %trace_id, "Saga automated rollback completed");
1555        }
1556
1557        let ingress_exit_ts = now_ms();
1558        if should_capture
1559            && let (Some(timeline), Some(ingress)) =
1560                (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1561        {
1562            timeline.push(TimelineEvent::NodeExit {
1563                node_id: ingress.id.clone(),
1564                outcome_type: outcome_type_name(&outcome),
1565                duration_ms: ingress_started.elapsed().as_millis() as u64,
1566                timestamp: ingress_exit_ts,
1567            });
1568        }
1569
1570        if let Some(handle) = persistence_handle.as_ref() {
1571            let fault_step = persistence_start_step.map(|s| s + 1).unwrap_or(1);
1572            persist_execution_event(
1573                handle,
1574                &trace_id,
1575                &label,
1576                &version,
1577                fault_step,
1578                None, // Outcome-level events might not have a single node_id context here
1579                outcome_kind_name(&outcome),
1580                Some(outcome.to_json_value()),
1581            )
1582            .await;
1583
1584            let mut completion = completion_from_outcome(&outcome);
1585            if matches!(outcome, Outcome::Fault(_))
1586                && let Some(compensation) = compensation_handle.as_ref()
1587                && compensation_auto_trigger(bus)
1588            {
1589                let context = CompensationContext {
1590                    trace_id: trace_id.clone(),
1591                    circuit: label.clone(),
1592                    fault_kind: outcome_kind_name(&outcome).to_string(),
1593                    fault_step,
1594                    timestamp_ms: now_ms(),
1595                };
1596
1597                if run_compensation(
1598                    compensation,
1599                    context,
1600                    compensation_retry_policy,
1601                    compensation_idempotency.clone(),
1602                )
1603                .await
1604                {
1605                    persist_execution_event(
1606                        handle,
1607                        &trace_id,
1608                        &label,
1609                        &version,
1610                        fault_step.saturating_add(1),
1611                        None,
1612                        "Compensated",
1613                        None,
1614                    )
1615                    .await;
1616                    completion = CompletionState::Compensated;
1617                }
1618            }
1619
1620            if persistence_auto_complete(bus) {
1621                persist_completion(handle, &trace_id, completion).await;
1622            }
1623        }
1624
1625        if should_capture {
1626            maybe_export_timeline(bus, &outcome);
1627        }
1628        if inserted_timeline {
1629            let _ = bus.remove::<Timeline>();
1630        }
1631
1632        outcome
1633    }
1634
1635    /// Starts the Ranvier Inspector for this Axon on the specified port.
1636    /// This spawns a background task to serve the Schematic.
1637    pub fn serve_inspector(self, port: u16) -> Self {
1638        if !inspector_dev_mode_from_env() {
1639            tracing::info!("Inspector disabled because RANVIER_MODE is production");
1640            return self;
1641        }
1642        if !inspector_enabled_from_env() {
1643            tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
1644            return self;
1645        }
1646
1647        let schematic = self.schematic.clone();
1648        let axon_inspector = Arc::new(self.clone());
1649        tokio::spawn(async move {
1650            if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
1651                .with_projection_files_from_env()
1652                .with_mode_from_env()
1653                .with_auth_policy_from_env()
1654                .with_state_inspector(axon_inspector)
1655                .serve()
1656                .await
1657            {
1658                tracing::error!("Inspector server failed: {}", e);
1659            }
1660        });
1661        self
1662    }
1663
1664    /// Get a reference to the Schematic (structural view).
1665    pub fn schematic(&self) -> &Schematic {
1666        &self.schematic
1667    }
1668
1669    /// Consume and return the Schematic.
1670    pub fn into_schematic(self) -> Schematic {
1671        self.schematic
1672    }
1673
1674    /// Detect schematic export mode from runtime flags.
1675    ///
1676    /// Supported triggers:
1677    /// - `RANVIER_SCHEMATIC=1|true|on|yes`
1678    /// - `--schematic`
1679    ///
1680    /// Optional output path:
1681    /// - `RANVIER_SCHEMATIC_OUTPUT=<path>`
1682    /// - `--schematic-output <path>` / `--schematic-output=<path>`
1683    /// - `--output <path>` / `--output=<path>` (only relevant in schematic mode)
1684    pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
1685        schematic_export_request_from_process()
1686    }
1687
1688    /// Export schematic and return `true` when schematic mode is active.
1689    ///
1690    /// Use this once after circuit construction and before server/custom loops:
1691    ///
1692    /// ```rust,ignore
1693    /// let axon = build_axon();
1694    /// if axon.maybe_export_and_exit()? {
1695    ///     return Ok(());
1696    /// }
1697    /// // Normal runtime path...
1698    /// ```
1699    pub fn maybe_export_and_exit(&self) -> anyhow::Result<bool> {
1700        self.maybe_export_and_exit_with(|_| ())
1701    }
1702
1703    /// Same as [`Self::maybe_export_and_exit`] but allows a custom hook right before export/exit.
1704    ///
1705    /// This is useful when your app has custom loop/bootstrap behavior and you want
1706    /// to skip or cleanup that logic in schematic mode.
1707    pub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> anyhow::Result<bool>
1708    where
1709        F: FnOnce(&SchematicExportRequest),
1710    {
1711        let Some(request) = self.schematic_export_request() else {
1712            return Ok(false);
1713        };
1714        on_before_exit(&request);
1715        self.export_schematic(&request)?;
1716        Ok(true)
1717    }
1718
1719    /// Export schematic according to the provided request.
1720    pub fn export_schematic(&self, request: &SchematicExportRequest) -> anyhow::Result<()> {
1721        let json = serde_json::to_string_pretty(self.schematic())?;
1722        if let Some(path) = &request.output {
1723            if let Some(parent) = path.parent()
1724                && !parent.as_os_str().is_empty()
1725            {
1726                fs::create_dir_all(parent)?;
1727            }
1728            fs::write(path, json.as_bytes())?;
1729            return Ok(());
1730        }
1731        println!("{}", json);
1732        Ok(())
1733    }
1734}
1735
1736fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
1737    let args: Vec<OsString> = std::env::args_os().skip(1).collect();
1738    let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
1739    let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
1740
1741    let mut i = 0;
1742    while i < args.len() {
1743        let arg = args[i].to_string_lossy();
1744
1745        if arg == "--schematic" {
1746            enabled = true;
1747            i += 1;
1748            continue;
1749        }
1750
1751        if arg == "--schematic-output" || arg == "--output" {
1752            if let Some(next) = args.get(i + 1) {
1753                output = Some(PathBuf::from(next));
1754                i += 2;
1755                continue;
1756            }
1757        } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
1758            output = Some(PathBuf::from(value));
1759        } else if let Some(value) = arg.strip_prefix("--output=") {
1760            output = Some(PathBuf::from(value));
1761        }
1762
1763        i += 1;
1764    }
1765
1766    if enabled {
1767        Some(SchematicExportRequest { output })
1768    } else {
1769        None
1770    }
1771}
1772
1773fn env_flag_is_true(key: &str) -> bool {
1774    match std::env::var(key) {
1775        Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
1776        Err(_) => false,
1777    }
1778}
1779
1780fn inspector_enabled_from_env() -> bool {
1781    let raw = std::env::var("RANVIER_INSPECTOR").ok();
1782    inspector_enabled_from_value(raw.as_deref())
1783}
1784
1785fn inspector_enabled_from_value(value: Option<&str>) -> bool {
1786    match value {
1787        Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
1788        None => true,
1789    }
1790}
1791
1792fn inspector_dev_mode_from_env() -> bool {
1793    let raw = std::env::var("RANVIER_MODE").ok();
1794    inspector_dev_mode_from_value(raw.as_deref())
1795}
1796
1797fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
1798    !matches!(
1799        value.map(|v| v.to_ascii_lowercase()),
1800        Some(mode) if mode == "prod" || mode == "production"
1801    )
1802}
1803
1804fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
1805    let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
1806        Ok(v) if !v.trim().is_empty() => v,
1807        _ => return,
1808    };
1809
1810    let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
1811    let policy = timeline_adaptive_policy();
1812    let forced = should_force_export(outcome, &policy);
1813    let should_export = sampled || forced;
1814    if !should_export {
1815        record_sampling_stats(false, sampled, forced, "none", &policy);
1816        return;
1817    }
1818
1819    let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
1820    timeline.sort();
1821
1822    let mode = std::env::var("RANVIER_TIMELINE_MODE")
1823        .unwrap_or_else(|_| "overwrite".to_string())
1824        .to_ascii_lowercase();
1825
1826    if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
1827        tracing::warn!(
1828            "Failed to persist timeline file {} (mode={}): {}",
1829            path,
1830            mode,
1831            err
1832        );
1833        record_sampling_stats(false, sampled, forced, &mode, &policy);
1834    } else {
1835        record_sampling_stats(true, sampled, forced, &mode, &policy);
1836    }
1837}
1838
1839fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
1840    match outcome {
1841        Outcome::Next(_) => "Next".to_string(),
1842        Outcome::Branch(id, _) => format!("Branch:{}", id),
1843        Outcome::Jump(id, _) => format!("Jump:{}", id),
1844        Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
1845        Outcome::Fault(_) => "Fault".to_string(),
1846    }
1847}
1848
1849fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
1850    match outcome {
1851        Outcome::Next(_) => "Next",
1852        Outcome::Branch(_, _) => "Branch",
1853        Outcome::Jump(_, _) => "Jump",
1854        Outcome::Emit(_, _) => "Emit",
1855        Outcome::Fault(_) => "Fault",
1856    }
1857}
1858
1859fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
1860    match outcome {
1861        Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
1862        Outcome::Jump(node_id, _) => Some(node_id.to_string()),
1863        Outcome::Emit(event_type, _) => Some(event_type.clone()),
1864        Outcome::Next(_) | Outcome::Fault(_) => None,
1865    }
1866}
1867
1868fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
1869    match outcome {
1870        Outcome::Fault(_) => CompletionState::Fault,
1871        _ => CompletionState::Success,
1872    }
1873}
1874
1875fn persistence_trace_id(bus: &Bus) -> String {
1876    if let Some(explicit) = bus.read::<PersistenceTraceId>() {
1877        explicit.0.clone()
1878    } else {
1879        format!("{}:{}", bus.id, now_ms())
1880    }
1881}
1882
1883fn persistence_auto_complete(bus: &Bus) -> bool {
1884    bus.read::<PersistenceAutoComplete>()
1885        .map(|v| v.0)
1886        .unwrap_or(true)
1887}
1888
1889fn compensation_auto_trigger(bus: &Bus) -> bool {
1890    bus.read::<CompensationAutoTrigger>()
1891        .map(|v| v.0)
1892        .unwrap_or(true)
1893}
1894
1895fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
1896    bus.read::<CompensationRetryPolicy>()
1897        .copied()
1898        .unwrap_or_default()
1899}
1900
1901/// Unwrap the Outcome enum layer from a persisted event payload.
1902///
1903/// Events are stored with `outcome.to_json_value()` which serializes the full
1904/// Outcome enum, e.g. `{"Next": {"order_id": "1001", ...}}`. The resumption
1905/// handler expects the raw inner value, so we extract it here.
1906fn unwrap_outcome_payload(payload: Option<&serde_json::Value>) -> Option<serde_json::Value> {
1907    payload.map(|p| {
1908        p.get("Next")
1909            .or_else(|| p.get("Branch"))
1910            .or_else(|| p.get("Jump"))
1911            .cloned()
1912            .unwrap_or_else(|| p.clone())
1913    })
1914}
1915
1916async fn load_persistence_version(
1917    handle: &PersistenceHandle,
1918    trace_id: &str,
1919) -> (
1920    u64,
1921    Option<String>,
1922    Option<crate::persistence::Intervention>,
1923    Option<String>,
1924    Option<serde_json::Value>,
1925) {
1926    let store = handle.store();
1927    match store.load(trace_id).await {
1928        Ok(Some(trace)) => {
1929            let (next_step, last_node_id, last_payload) =
1930                if let Some(resume_from_step) = trace.resumed_from_step {
1931                    let anchor_event = trace
1932                        .events
1933                        .iter()
1934                        .rev()
1935                        .find(|event| {
1936                            event.step <= resume_from_step
1937                                && event.outcome_kind == "Next"
1938                                && event.payload.is_some()
1939                        })
1940                        .or_else(|| {
1941                            trace.events.iter().rev().find(|event| {
1942                                event.step <= resume_from_step
1943                                    && event.outcome_kind != "Fault"
1944                                    && event.payload.is_some()
1945                            })
1946                        })
1947                        .or_else(|| {
1948                            trace.events.iter().rev().find(|event| {
1949                                event.step <= resume_from_step && event.payload.is_some()
1950                            })
1951                        })
1952                        .or_else(|| trace.events.last());
1953
1954                    (
1955                        resume_from_step.saturating_add(1),
1956                        anchor_event.and_then(|event| event.node_id.clone()),
1957                        anchor_event.and_then(|event| {
1958                            unwrap_outcome_payload(event.payload.as_ref())
1959                        }),
1960                    )
1961                } else {
1962                    let last_event = trace.events.last();
1963                    (
1964                        last_event
1965                            .map(|event| event.step.saturating_add(1))
1966                            .unwrap_or(0),
1967                        last_event.and_then(|event| event.node_id.clone()),
1968                        last_event.and_then(|event| {
1969                            unwrap_outcome_payload(event.payload.as_ref())
1970                        }),
1971                    )
1972                };
1973
1974            (
1975                next_step,
1976                Some(trace.schematic_version),
1977                trace.interventions.last().cloned(),
1978                last_node_id,
1979                last_payload,
1980            )
1981        }
1982        Ok(None) => (0, None, None, None, None),
1983        Err(err) => {
1984            tracing::warn!(
1985                trace_id = %trace_id,
1986                error = %err,
1987                "Failed to load persistence trace; falling back to step=0"
1988            );
1989            (0, None, None, None, None)
1990        }
1991    }
1992}
1993
1994#[allow(clippy::too_many_arguments)]
1995async fn run_this_step<In, Out, E, Res>(
1996    trans: &(impl Transition<In, Out, Resources = Res, Error = E> + Clone + 'static),
1997    state: In,
1998    res: &Res,
1999    bus: &mut Bus,
2000    node_id: &str,
2001    node_label: &str,
2002    bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2003    step_idx: u64,
2004) -> Outcome<Out, E>
2005where
2006    In: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2007    Out: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2008    E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2009    Res: ranvier_core::transition::ResourceRequirement,
2010{
2011    let label = trans.label();
2012    let res_type = std::any::type_name::<Res>()
2013        .split("::")
2014        .last()
2015        .unwrap_or("unknown");
2016
2017    // Debug pausing
2018    let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2019        debug.should_pause(node_id)
2020    } else {
2021        false
2022    };
2023
2024    if should_pause {
2025        let trace_id = persistence_trace_id(bus);
2026        tracing::event!(
2027            target: "ranvier.debugger",
2028            tracing::Level::INFO,
2029            trace_id = %trace_id,
2030            node_id = %node_id,
2031            "Node paused"
2032        );
2033
2034        if let Some(timeline) = bus.read_mut::<Timeline>() {
2035            timeline.push(TimelineEvent::NodePaused {
2036                node_id: node_id.to_string(),
2037                timestamp: now_ms(),
2038            });
2039        }
2040        if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2041            debug.wait().await;
2042        }
2043    }
2044
2045    let enter_ts = now_ms();
2046    if let Some(timeline) = bus.read_mut::<Timeline>() {
2047        timeline.push(TimelineEvent::NodeEnter {
2048            node_id: node_id.to_string(),
2049            node_label: node_label.to_string(),
2050            timestamp: enter_ts,
2051        });
2052    }
2053
2054    // Check DLQ retry policy and pre-serialize state for potential retries
2055    let dlq_retry_config = bus.read::<DlqPolicy>().and_then(|p| {
2056        if let DlqPolicy::RetryThenDlq {
2057            max_attempts,
2058            backoff_ms,
2059        } = p
2060        {
2061            Some((*max_attempts, *backoff_ms))
2062        } else {
2063            None
2064        }
2065    });
2066    let retry_state_snapshot = if dlq_retry_config.is_some() {
2067        serde_json::to_value(&state).ok()
2068    } else {
2069        None
2070    };
2071
2072    // State capture for Saga - SERIALIZE BEFORE CONSUMPTION
2073    let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2074        Some(serde_json::to_vec(&state).unwrap_or_default())
2075    } else {
2076        None
2077    };
2078
2079    let node_span = tracing::info_span!(
2080        "Node",
2081        ranvier.node = %label,
2082        ranvier.resource_type = %res_type,
2083        ranvier.outcome_kind = tracing::field::Empty,
2084        ranvier.outcome_target = tracing::field::Empty
2085    );
2086    let started = std::time::Instant::now();
2087    bus.set_access_policy(label.clone(), bus_policy.clone());
2088    let result = trans
2089        .run(state, res, bus)
2090        .instrument(node_span.clone())
2091        .await;
2092    bus.clear_access_policy();
2093
2094    // DLQ Retry loop: if first attempt faulted and RetryThenDlq is configured,
2095    // retry with exponential backoff before giving up.
2096    let result = if let Outcome::Fault(_) = &result {
2097        if let (Some((max_attempts, backoff_ms)), Some(snapshot)) =
2098            (dlq_retry_config, &retry_state_snapshot)
2099        {
2100            let mut final_result = result;
2101            // attempt 1 already done; retry from 2..=max_attempts
2102            for attempt in 2..=max_attempts {
2103                let delay = backoff_ms.saturating_mul(2u64.saturating_pow(attempt - 2));
2104
2105                tracing::info!(
2106                    ranvier.node = %label,
2107                    attempt = attempt,
2108                    max_attempts = max_attempts,
2109                    backoff_ms = delay,
2110                    "Retrying faulted node"
2111                );
2112
2113                if let Some(timeline) = bus.read_mut::<Timeline>() {
2114                    timeline.push(TimelineEvent::NodeRetry {
2115                        node_id: node_id.to_string(),
2116                        attempt,
2117                        max_attempts,
2118                        backoff_ms: delay,
2119                        timestamp: now_ms(),
2120                    });
2121                }
2122
2123                tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
2124
2125                if let Ok(retry_state) = serde_json::from_value::<In>(snapshot.clone()) {
2126                    bus.set_access_policy(label.clone(), bus_policy.clone());
2127                    let retry_result = trans
2128                        .run(retry_state, res, bus)
2129                        .instrument(tracing::info_span!(
2130                            "NodeRetry",
2131                            ranvier.node = %label,
2132                            attempt = attempt
2133                        ))
2134                        .await;
2135                    bus.clear_access_policy();
2136
2137                    match &retry_result {
2138                        Outcome::Fault(_) => {
2139                            final_result = retry_result;
2140                        }
2141                        _ => {
2142                            final_result = retry_result;
2143                            break;
2144                        }
2145                    }
2146                }
2147            }
2148            final_result
2149        } else {
2150            result
2151        }
2152    } else {
2153        result
2154    };
2155
2156    node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
2157    if let Some(target) = outcome_target(&result) {
2158        node_span.record("ranvier.outcome_target", tracing::field::display(&target));
2159    }
2160    let duration_ms = started.elapsed().as_millis() as u64;
2161    let exit_ts = now_ms();
2162
2163    if let Some(timeline) = bus.read_mut::<Timeline>() {
2164        timeline.push(TimelineEvent::NodeExit {
2165            node_id: node_id.to_string(),
2166            outcome_type: outcome_type_name(&result),
2167            duration_ms,
2168            timestamp: exit_ts,
2169        });
2170
2171        if let Outcome::Branch(branch_id, _) = &result {
2172            timeline.push(TimelineEvent::Branchtaken {
2173                branch_id: branch_id.clone(),
2174                timestamp: exit_ts,
2175            });
2176        }
2177    }
2178
2179    // Push to Saga Stack if Next outcome and snapshot taken
2180    if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot)
2181        && let Some(stack) = bus.read_mut::<SagaStack>()
2182    {
2183        stack.push(node_id.to_string(), label.clone(), snapshot);
2184    }
2185
2186    if let Some(handle) = bus.read::<PersistenceHandle>() {
2187        let trace_id = persistence_trace_id(bus);
2188        let circuit = bus
2189            .read::<ranvier_core::schematic::Schematic>()
2190            .map(|s| s.name.clone())
2191            .unwrap_or_default();
2192        let version = bus
2193            .read::<ranvier_core::schematic::Schematic>()
2194            .map(|s| s.schema_version.clone())
2195            .unwrap_or_default();
2196
2197        persist_execution_event(
2198            handle,
2199            &trace_id,
2200            &circuit,
2201            &version,
2202            step_idx,
2203            Some(node_id.to_string()),
2204            outcome_kind_name(&result),
2205            Some(result.to_json_value()),
2206        )
2207        .await;
2208    }
2209
2210    // DLQ reporting — only fires after all retries are exhausted (RetryThenDlq)
2211    // or immediately (SendToDlq). Drop policy skips entirely.
2212    if let Outcome::Fault(f) = &result {
2213        // Read policy and sink, then drop the borrows before mutable timeline access
2214        let dlq_action = {
2215            let policy = bus.read::<DlqPolicy>();
2216            let sink = bus.read::<Arc<dyn DlqSink>>();
2217            match (sink, policy) {
2218                (Some(s), Some(p)) if !matches!(p, DlqPolicy::Drop) => Some(s.clone()),
2219                _ => None,
2220            }
2221        };
2222
2223        if let Some(sink) = dlq_action {
2224            if let Some((max_attempts, _)) = dlq_retry_config
2225                && let Some(timeline) = bus.read_mut::<Timeline>()
2226            {
2227                timeline.push(TimelineEvent::DlqExhausted {
2228                    node_id: node_id.to_string(),
2229                    total_attempts: max_attempts,
2230                    timestamp: now_ms(),
2231                });
2232            }
2233
2234            let trace_id = persistence_trace_id(bus);
2235            let circuit = bus
2236                .read::<ranvier_core::schematic::Schematic>()
2237                .map(|s| s.name.clone())
2238                .unwrap_or_default();
2239            let _ = sink
2240                .store_dead_letter(
2241                    &trace_id,
2242                    &circuit,
2243                    node_id,
2244                    &format!("{:?}", f),
2245                    &serde_json::to_vec(&f).unwrap_or_default(),
2246                )
2247                .await;
2248        }
2249    }
2250
2251    result
2252}
2253
2254#[allow(clippy::too_many_arguments)]
2255async fn run_this_compensated_step<Out, Next, E, Res, Comp>(
2256    trans: &(impl Transition<Out, Next, Resources = Res, Error = E> + Clone + 'static),
2257    comp: &Comp,
2258    state: Out,
2259    res: &Res,
2260    bus: &mut Bus,
2261    node_id: &str,
2262    comp_node_id: &str,
2263    node_label: &str,
2264    bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2265    step_idx: u64,
2266) -> Outcome<Next, E>
2267where
2268    Out: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
2269    Next: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2270    E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2271    Res: ranvier_core::transition::ResourceRequirement,
2272    Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
2273{
2274    let label = trans.label();
2275
2276    // Debug pausing
2277    let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2278        debug.should_pause(node_id)
2279    } else {
2280        false
2281    };
2282
2283    if should_pause {
2284        let trace_id = persistence_trace_id(bus);
2285        tracing::event!(
2286            target: "ranvier.debugger",
2287            tracing::Level::INFO,
2288            trace_id = %trace_id,
2289            node_id = %node_id,
2290            "Node paused (compensated)"
2291        );
2292
2293        if let Some(timeline) = bus.read_mut::<Timeline>() {
2294            timeline.push(TimelineEvent::NodePaused {
2295                node_id: node_id.to_string(),
2296                timestamp: now_ms(),
2297            });
2298        }
2299        if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2300            debug.wait().await;
2301        }
2302    }
2303
2304    let enter_ts = now_ms();
2305    if let Some(timeline) = bus.read_mut::<Timeline>() {
2306        timeline.push(TimelineEvent::NodeEnter {
2307            node_id: node_id.to_string(),
2308            node_label: node_label.to_string(),
2309            timestamp: enter_ts,
2310        });
2311    }
2312
2313    // State capture for Saga - SERIALIZE BEFORE CONSUMPTION
2314    let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2315        Some(serde_json::to_vec(&state).unwrap_or_default())
2316    } else {
2317        None
2318    };
2319
2320    let node_span = tracing::info_span!("Node", ranvier.node = %label);
2321    bus.set_access_policy(label.clone(), bus_policy.clone());
2322    let result = trans
2323        .run(state.clone(), res, bus)
2324        .instrument(node_span)
2325        .await;
2326    bus.clear_access_policy();
2327
2328    let duration_ms = 0; // Simplified
2329    let exit_ts = now_ms();
2330
2331    if let Some(timeline) = bus.read_mut::<Timeline>() {
2332        timeline.push(TimelineEvent::NodeExit {
2333            node_id: node_id.to_string(),
2334            outcome_type: outcome_type_name(&result),
2335            duration_ms,
2336            timestamp: exit_ts,
2337        });
2338    }
2339
2340    // Automated Compensation Trigger
2341    if let Outcome::Fault(ref err) = result {
2342        if compensation_auto_trigger(bus) {
2343            tracing::info!(
2344                ranvier.node = %label,
2345                ranvier.compensation.trigger = "saga",
2346                error = ?err,
2347                "Saga compensation triggered"
2348            );
2349
2350            if let Some(timeline) = bus.read_mut::<Timeline>() {
2351                timeline.push(TimelineEvent::NodeEnter {
2352                    node_id: comp_node_id.to_string(),
2353                    node_label: format!("Compensate: {}", comp.label()),
2354                    timestamp: exit_ts,
2355                });
2356            }
2357
2358            // Run compensation
2359            let _ = comp.run(state, res, bus).await;
2360
2361            if let Some(timeline) = bus.read_mut::<Timeline>() {
2362                timeline.push(TimelineEvent::NodeExit {
2363                    node_id: comp_node_id.to_string(),
2364                    outcome_type: "Compensated".to_string(),
2365                    duration_ms: 0,
2366                    timestamp: now_ms(),
2367                });
2368            }
2369
2370            if let Some(handle) = bus.read::<PersistenceHandle>() {
2371                let trace_id = persistence_trace_id(bus);
2372                let circuit = bus
2373                    .read::<ranvier_core::schematic::Schematic>()
2374                    .map(|s| s.name.clone())
2375                    .unwrap_or_default();
2376                let version = bus
2377                    .read::<ranvier_core::schematic::Schematic>()
2378                    .map(|s| s.schema_version.clone())
2379                    .unwrap_or_default();
2380
2381                persist_execution_event(
2382                    handle,
2383                    &trace_id,
2384                    &circuit,
2385                    &version,
2386                    step_idx + 1, // Compensation node index
2387                    Some(comp_node_id.to_string()),
2388                    "Compensated",
2389                    None,
2390                )
2391                .await;
2392            }
2393        }
2394    } else if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot) {
2395        // Push to Saga Stack if Next outcome and snapshot taken
2396        if let Some(stack) = bus.read_mut::<SagaStack>() {
2397            stack.push(node_id.to_string(), label.clone(), snapshot);
2398        }
2399
2400        if let Some(handle) = bus.read::<PersistenceHandle>() {
2401            let trace_id = persistence_trace_id(bus);
2402            let circuit = bus
2403                .read::<ranvier_core::schematic::Schematic>()
2404                .map(|s| s.name.clone())
2405                .unwrap_or_default();
2406            let version = bus
2407                .read::<ranvier_core::schematic::Schematic>()
2408                .map(|s| s.schema_version.clone())
2409                .unwrap_or_default();
2410
2411            persist_execution_event(
2412                handle,
2413                &trace_id,
2414                &circuit,
2415                &version,
2416                step_idx,
2417                Some(node_id.to_string()),
2418                outcome_kind_name(&result),
2419                Some(result.to_json_value()),
2420            )
2421            .await;
2422        }
2423    }
2424
2425    // DLQ reporting for compensated steps
2426    if let Outcome::Fault(f) = &result
2427        && let (Some(sink), Some(policy)) =
2428            (bus.read::<Arc<dyn DlqSink>>(), bus.read::<DlqPolicy>())
2429    {
2430        let should_dlq = !matches!(policy, DlqPolicy::Drop);
2431        if should_dlq {
2432            let trace_id = persistence_trace_id(bus);
2433            let circuit = bus
2434                .read::<ranvier_core::schematic::Schematic>()
2435                .map(|s| s.name.clone())
2436                .unwrap_or_default();
2437            let _ = sink
2438                .store_dead_letter(
2439                    &trace_id,
2440                    &circuit,
2441                    node_id,
2442                    &format!("{:?}", f),
2443                    &serde_json::to_vec(&f).unwrap_or_default(),
2444                )
2445                .await;
2446        }
2447    }
2448
2449    result
2450}
2451
2452#[allow(clippy::too_many_arguments)]
2453pub async fn persist_execution_event(
2454    handle: &PersistenceHandle,
2455    trace_id: &str,
2456    circuit: &str,
2457    schematic_version: &str,
2458    step: u64,
2459    node_id: Option<String>,
2460    outcome_kind: &str,
2461    payload: Option<serde_json::Value>,
2462) {
2463    let store = handle.store();
2464    let envelope = PersistenceEnvelope {
2465        trace_id: trace_id.to_string(),
2466        circuit: circuit.to_string(),
2467        schematic_version: schematic_version.to_string(),
2468        step,
2469        node_id,
2470        outcome_kind: outcome_kind.to_string(),
2471        timestamp_ms: now_ms(),
2472        payload_hash: None,
2473        payload,
2474    };
2475
2476    if let Err(err) = store.append(envelope).await {
2477        tracing::warn!(
2478            trace_id = %trace_id,
2479            circuit = %circuit,
2480            step,
2481            outcome_kind = %outcome_kind,
2482            error = %err,
2483            "Failed to append persistence envelope"
2484        );
2485    }
2486}
2487
2488async fn persist_completion(
2489    handle: &PersistenceHandle,
2490    trace_id: &str,
2491    completion: CompletionState,
2492) {
2493    let store = handle.store();
2494    if let Err(err) = store.complete(trace_id, completion).await {
2495        tracing::warn!(
2496            trace_id = %trace_id,
2497            error = %err,
2498            "Failed to complete persistence trace"
2499        );
2500    }
2501}
2502
2503fn compensation_idempotency_key(context: &CompensationContext) -> String {
2504    format!(
2505        "{}:{}:{}",
2506        context.trace_id, context.circuit, context.fault_kind
2507    )
2508}
2509
2510async fn run_compensation(
2511    handle: &CompensationHandle,
2512    context: CompensationContext,
2513    retry_policy: CompensationRetryPolicy,
2514    idempotency: Option<CompensationIdempotencyHandle>,
2515) -> bool {
2516    let hook = handle.hook();
2517    let key = compensation_idempotency_key(&context);
2518
2519    if let Some(store_handle) = idempotency.as_ref() {
2520        let store = store_handle.store();
2521        match store.was_compensated(&key).await {
2522            Ok(true) => {
2523                tracing::info!(
2524                    trace_id = %context.trace_id,
2525                    circuit = %context.circuit,
2526                    key = %key,
2527                    "Compensation already recorded; skipping duplicate hook execution"
2528                );
2529                return true;
2530            }
2531            Ok(false) => {}
2532            Err(err) => {
2533                tracing::warn!(
2534                    trace_id = %context.trace_id,
2535                    key = %key,
2536                    error = %err,
2537                    "Failed to check compensation idempotency state"
2538                );
2539            }
2540        }
2541    }
2542
2543    let max_attempts = retry_policy.max_attempts.max(1);
2544    for attempt in 1..=max_attempts {
2545        match hook.compensate(context.clone()).await {
2546            Ok(()) => {
2547                if let Some(store_handle) = idempotency.as_ref() {
2548                    let store = store_handle.store();
2549                    if let Err(err) = store.mark_compensated(&key).await {
2550                        tracing::warn!(
2551                            trace_id = %context.trace_id,
2552                            key = %key,
2553                            error = %err,
2554                            "Failed to mark compensation idempotency state"
2555                        );
2556                    }
2557                }
2558                return true;
2559            }
2560            Err(err) => {
2561                let is_last = attempt == max_attempts;
2562                tracing::warn!(
2563                    trace_id = %context.trace_id,
2564                    circuit = %context.circuit,
2565                    fault_kind = %context.fault_kind,
2566                    fault_step = context.fault_step,
2567                    attempt,
2568                    max_attempts,
2569                    error = %err,
2570                    "Compensation hook attempt failed"
2571                );
2572                if !is_last && retry_policy.backoff_ms > 0 {
2573                    tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
2574                        .await;
2575                }
2576            }
2577        }
2578    }
2579    false
2580}
2581
2582fn ensure_timeline(bus: &mut Bus) -> bool {
2583    if bus.has::<Timeline>() {
2584        false
2585    } else {
2586        bus.insert(Timeline::new());
2587        true
2588    }
2589}
2590
2591fn should_attach_timeline(bus: &Bus) -> bool {
2592    // Respect explicitly provided timeline collector from caller.
2593    if bus.has::<Timeline>() {
2594        return true;
2595    }
2596
2597    // Attach timeline when runtime export path exists.
2598    has_timeline_output_path()
2599}
2600
2601fn has_timeline_output_path() -> bool {
2602    std::env::var("RANVIER_TIMELINE_OUTPUT")
2603        .ok()
2604        .map(|v| !v.trim().is_empty())
2605        .unwrap_or(false)
2606}
2607
2608fn timeline_sample_rate() -> f64 {
2609    std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
2610        .ok()
2611        .and_then(|v| v.parse::<f64>().ok())
2612        .map(|v| v.clamp(0.0, 1.0))
2613        .unwrap_or(1.0)
2614}
2615
2616fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
2617    if rate <= 0.0 {
2618        return false;
2619    }
2620    if rate >= 1.0 {
2621        return true;
2622    }
2623    let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
2624    bucket < rate
2625}
2626
2627fn timeline_adaptive_policy() -> String {
2628    std::env::var("RANVIER_TIMELINE_ADAPTIVE")
2629        .unwrap_or_else(|_| "fault_branch".to_string())
2630        .to_ascii_lowercase()
2631}
2632
2633fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
2634    match policy {
2635        "off" => false,
2636        "fault_only" => matches!(outcome, Outcome::Fault(_)),
2637        "fault_branch_emit" => {
2638            matches!(
2639                outcome,
2640                Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
2641            )
2642        }
2643        _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
2644    }
2645}
2646
2647#[derive(Default, Clone)]
2648struct SamplingStats {
2649    total_decisions: u64,
2650    exported: u64,
2651    skipped: u64,
2652    sampled_exports: u64,
2653    forced_exports: u64,
2654    last_mode: String,
2655    last_policy: String,
2656    last_updated_ms: u64,
2657}
2658
2659static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
2660
2661fn stats_cell() -> &'static Mutex<SamplingStats> {
2662    TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
2663}
2664
2665fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
2666    let snapshot = {
2667        let mut stats = match stats_cell().lock() {
2668            Ok(guard) => guard,
2669            Err(_) => return,
2670        };
2671
2672        stats.total_decisions += 1;
2673        if exported {
2674            stats.exported += 1;
2675        } else {
2676            stats.skipped += 1;
2677        }
2678        if sampled && exported {
2679            stats.sampled_exports += 1;
2680        }
2681        if forced && exported {
2682            stats.forced_exports += 1;
2683        }
2684        stats.last_mode = mode.to_string();
2685        stats.last_policy = policy.to_string();
2686        stats.last_updated_ms = now_ms();
2687        stats.clone()
2688    };
2689
2690    tracing::debug!(
2691        ranvier.timeline.total_decisions = snapshot.total_decisions,
2692        ranvier.timeline.exported = snapshot.exported,
2693        ranvier.timeline.skipped = snapshot.skipped,
2694        ranvier.timeline.sampled_exports = snapshot.sampled_exports,
2695        ranvier.timeline.forced_exports = snapshot.forced_exports,
2696        ranvier.timeline.mode = %snapshot.last_mode,
2697        ranvier.timeline.policy = %snapshot.last_policy,
2698        "Timeline sampling stats updated"
2699    );
2700
2701    if let Some(path) = timeline_stats_output_path() {
2702        let payload = serde_json::json!({
2703            "total_decisions": snapshot.total_decisions,
2704            "exported": snapshot.exported,
2705            "skipped": snapshot.skipped,
2706            "sampled_exports": snapshot.sampled_exports,
2707            "forced_exports": snapshot.forced_exports,
2708            "last_mode": snapshot.last_mode,
2709            "last_policy": snapshot.last_policy,
2710            "last_updated_ms": snapshot.last_updated_ms
2711        });
2712        if let Some(parent) = Path::new(&path).parent() {
2713            let _ = fs::create_dir_all(parent);
2714        }
2715        if let Err(err) = fs::write(&path, payload.to_string()) {
2716            tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
2717        }
2718    }
2719}
2720
2721fn timeline_stats_output_path() -> Option<String> {
2722    std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
2723        .ok()
2724        .filter(|v| !v.trim().is_empty())
2725}
2726
2727fn write_timeline_with_policy(
2728    path: &str,
2729    mode: &str,
2730    mut timeline: Timeline,
2731) -> Result<(), String> {
2732    match mode {
2733        "append" => {
2734            if Path::new(path).exists() {
2735                let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
2736                match serde_json::from_str::<Timeline>(&content) {
2737                    Ok(mut existing) => {
2738                        existing.events.append(&mut timeline.events);
2739                        existing.sort();
2740                        if let Some(max_events) = max_events_limit() {
2741                            truncate_timeline_events(&mut existing, max_events);
2742                        }
2743                        write_timeline_json(path, &existing)
2744                    }
2745                    Err(_) => {
2746                        // Fallback: if existing is invalid, replace with current timeline
2747                        if let Some(max_events) = max_events_limit() {
2748                            truncate_timeline_events(&mut timeline, max_events);
2749                        }
2750                        write_timeline_json(path, &timeline)
2751                    }
2752                }
2753            } else {
2754                if let Some(max_events) = max_events_limit() {
2755                    truncate_timeline_events(&mut timeline, max_events);
2756                }
2757                write_timeline_json(path, &timeline)
2758            }
2759        }
2760        "rotate" => {
2761            let rotated_path = rotated_path(path, now_ms());
2762            write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
2763            if let Some(keep) = rotate_keep_limit() {
2764                cleanup_rotated_files(path, keep)?;
2765            }
2766            Ok(())
2767        }
2768        _ => write_timeline_json(path, &timeline),
2769    }
2770}
2771
2772fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
2773    if let Some(parent) = Path::new(path).parent()
2774        && !parent.as_os_str().is_empty()
2775    {
2776        fs::create_dir_all(parent).map_err(|e| e.to_string())?;
2777    }
2778    let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
2779    fs::write(path, json).map_err(|e| e.to_string())
2780}
2781
2782fn rotated_path(path: &str, suffix: u64) -> PathBuf {
2783    let p = Path::new(path);
2784    let parent = p.parent().unwrap_or_else(|| Path::new(""));
2785    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
2786    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
2787    parent.join(format!("{}_{}.{}", stem, suffix, ext))
2788}
2789
2790fn max_events_limit() -> Option<usize> {
2791    std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
2792        .ok()
2793        .and_then(|v| v.parse::<usize>().ok())
2794        .filter(|v| *v > 0)
2795}
2796
2797fn rotate_keep_limit() -> Option<usize> {
2798    std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
2799        .ok()
2800        .and_then(|v| v.parse::<usize>().ok())
2801        .filter(|v| *v > 0)
2802}
2803
2804fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
2805    let len = timeline.events.len();
2806    if len > max_events {
2807        let keep_from = len - max_events;
2808        timeline.events = timeline.events.split_off(keep_from);
2809    }
2810}
2811
2812fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
2813    let p = Path::new(base_path);
2814    let parent = p.parent().unwrap_or_else(|| Path::new("."));
2815    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
2816    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
2817    let prefix = format!("{}_", stem);
2818    let suffix = format!(".{}", ext);
2819
2820    let mut files = fs::read_dir(parent)
2821        .map_err(|e| e.to_string())?
2822        .filter_map(|entry| entry.ok())
2823        .filter(|entry| {
2824            let name = entry.file_name();
2825            let name = name.to_string_lossy();
2826            name.starts_with(&prefix) && name.ends_with(&suffix)
2827        })
2828        .map(|entry| {
2829            let modified = entry
2830                .metadata()
2831                .ok()
2832                .and_then(|m| m.modified().ok())
2833                .unwrap_or(SystemTime::UNIX_EPOCH);
2834            (entry.path(), modified)
2835        })
2836        .collect::<Vec<_>>();
2837
2838    files.sort_by(|a, b| b.1.cmp(&a.1));
2839    for (path, _) in files.into_iter().skip(keep) {
2840        let _ = fs::remove_file(path);
2841    }
2842    Ok(())
2843}
2844
2845fn bus_capability_schema_from_policy(
2846    policy: Option<ranvier_core::bus::BusAccessPolicy>,
2847) -> Option<BusCapabilitySchema> {
2848    let policy = policy?;
2849
2850    let mut allow = policy
2851        .allow
2852        .unwrap_or_default()
2853        .into_iter()
2854        .map(|entry| entry.type_name.to_string())
2855        .collect::<Vec<_>>();
2856    let mut deny = policy
2857        .deny
2858        .into_iter()
2859        .map(|entry| entry.type_name.to_string())
2860        .collect::<Vec<_>>();
2861    allow.sort();
2862    deny.sort();
2863
2864    if allow.is_empty() && deny.is_empty() {
2865        return None;
2866    }
2867
2868    Some(BusCapabilitySchema { allow, deny })
2869}
2870
2871fn now_ms() -> u64 {
2872    SystemTime::now()
2873        .duration_since(UNIX_EPOCH)
2874        .map(|d| d.as_millis() as u64)
2875        .unwrap_or(0)
2876}
2877
2878#[cfg(test)]
2879mod tests {
2880    use super::{
2881        Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
2882        should_force_export,
2883    };
2884    use crate::persistence::{
2885        CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
2886        CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
2887        InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
2888        PersistenceHandle, PersistenceStore, PersistenceTraceId,
2889    };
2890    use anyhow::Result;
2891    use async_trait::async_trait;
2892    use ranvier_audit::{AuditError, AuditEvent, AuditSink};
2893    use ranvier_core::event::{DlqPolicy, DlqSink};
2894    use ranvier_core::saga::SagaStack;
2895    use ranvier_core::timeline::{Timeline, TimelineEvent};
2896    use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
2897    use serde::{Deserialize, Serialize};
2898    use std::sync::Arc;
2899    use tokio::sync::Mutex;
2900    use uuid::Uuid;
2901
2902    struct MockAuditSink {
2903        events: Arc<Mutex<Vec<AuditEvent>>>,
2904    }
2905
2906    #[async_trait]
2907    impl AuditSink for MockAuditSink {
2908        async fn append(&self, event: &AuditEvent) -> Result<(), AuditError> {
2909            self.events.lock().await.push(event.clone());
2910            Ok(())
2911        }
2912    }
2913
2914    #[tokio::test]
2915    async fn execute_logs_audit_events_for_intervention() {
2916        use ranvier_inspector::StateInspector;
2917
2918        let trace_id = "test-audit-trace";
2919        let store_impl = InMemoryPersistenceStore::new();
2920        let events = Arc::new(Mutex::new(Vec::new()));
2921        let sink = MockAuditSink {
2922            events: events.clone(),
2923        };
2924
2925        let axon = Axon::<i32, i32, TestInfallible>::start("AuditTest")
2926            .then(AddOne)
2927            .with_persistence_store(store_impl.clone())
2928            .with_audit_sink(sink);
2929
2930        let mut bus = Bus::new();
2931        bus.insert(PersistenceHandle::from_arc(Arc::new(store_impl.clone())));
2932        bus.insert(PersistenceTraceId::new(trace_id));
2933        let target_node_id = axon.schematic.nodes[0].id.clone();
2934
2935        // 0. Pre-requisite: Save an initial trace state so intervention has a target to attach to
2936        store_impl
2937            .append(crate::persistence::PersistenceEnvelope {
2938                trace_id: trace_id.to_string(),
2939                circuit: "AuditTest".to_string(),
2940                schematic_version: "v1.0".to_string(),
2941                step: 0,
2942                node_id: None,
2943                outcome_kind: "Next".to_string(),
2944                timestamp_ms: 0,
2945                payload_hash: None,
2946                payload: None,
2947            })
2948            .await
2949            .unwrap();
2950
2951        // 1. Trigger force_resume (should log ForceResume)
2952        axon.force_resume(trace_id, &target_node_id, None)
2953            .await
2954            .unwrap();
2955
2956        // 2. Execute (should log ApplyIntervention)
2957        axon.execute(10, &(), &mut bus).await;
2958
2959        let recorded = events.lock().await;
2960        assert_eq!(
2961            recorded.len(),
2962            2,
2963            "Should have 2 audit events: ForceResume and ApplyIntervention"
2964        );
2965        assert_eq!(recorded[0].action, "ForceResume");
2966        assert_eq!(recorded[0].target, trace_id);
2967        assert_eq!(recorded[1].action, "ApplyIntervention");
2968        assert_eq!(recorded[1].target, trace_id);
2969    }
2970
2971    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2972    pub enum TestInfallible {}
2973
2974    #[test]
2975    fn inspector_enabled_flag_matrix() {
2976        assert!(inspector_enabled_from_value(None));
2977        assert!(inspector_enabled_from_value(Some("1")));
2978        assert!(inspector_enabled_from_value(Some("true")));
2979        assert!(inspector_enabled_from_value(Some("on")));
2980        assert!(!inspector_enabled_from_value(Some("0")));
2981        assert!(!inspector_enabled_from_value(Some("false")));
2982    }
2983
2984    #[test]
2985    fn inspector_dev_mode_matrix() {
2986        assert!(inspector_dev_mode_from_value(None));
2987        assert!(inspector_dev_mode_from_value(Some("dev")));
2988        assert!(inspector_dev_mode_from_value(Some("staging")));
2989        assert!(!inspector_dev_mode_from_value(Some("prod")));
2990        assert!(!inspector_dev_mode_from_value(Some("production")));
2991    }
2992
2993    #[test]
2994    fn adaptive_policy_force_export_matrix() {
2995        let next = Outcome::<(), &'static str>::Next(());
2996        let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
2997        let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
2998        let fault = Outcome::<(), &'static str>::Fault("boom");
2999
3000        assert!(!should_force_export(&next, "off"));
3001        assert!(!should_force_export(&fault, "off"));
3002
3003        assert!(!should_force_export(&branch, "fault_only"));
3004        assert!(should_force_export(&fault, "fault_only"));
3005
3006        assert!(should_force_export(&branch, "fault_branch"));
3007        assert!(!should_force_export(&emit, "fault_branch"));
3008        assert!(should_force_export(&fault, "fault_branch"));
3009
3010        assert!(should_force_export(&branch, "fault_branch_emit"));
3011        assert!(should_force_export(&emit, "fault_branch_emit"));
3012        assert!(should_force_export(&fault, "fault_branch_emit"));
3013    }
3014
3015    #[test]
3016    fn sampling_and_adaptive_combination_decisions() {
3017        let bus_id = Uuid::nil();
3018        let next = Outcome::<(), &'static str>::Next(());
3019        let fault = Outcome::<(), &'static str>::Fault("boom");
3020
3021        let sampled_never = sampled_by_bus_id(bus_id, 0.0);
3022        assert!(!sampled_never);
3023        assert!(!(sampled_never || should_force_export(&next, "off")));
3024        assert!(sampled_never || should_force_export(&fault, "fault_only"));
3025
3026        let sampled_always = sampled_by_bus_id(bus_id, 1.0);
3027        assert!(sampled_always);
3028        assert!(sampled_always || should_force_export(&next, "off"));
3029        assert!(sampled_always || should_force_export(&fault, "off"));
3030    }
3031
3032    #[derive(Clone)]
3033    struct AddOne;
3034
3035    #[async_trait]
3036    impl Transition<i32, i32> for AddOne {
3037        type Error = TestInfallible;
3038        type Resources = ();
3039
3040        async fn run(
3041            &self,
3042            state: i32,
3043            _resources: &Self::Resources,
3044            _bus: &mut Bus,
3045        ) -> Outcome<i32, Self::Error> {
3046            Outcome::Next(state + 1)
3047        }
3048    }
3049
3050    #[derive(Clone)]
3051    struct AlwaysFault;
3052
3053    #[async_trait]
3054    impl Transition<i32, i32> for AlwaysFault {
3055        type Error = String;
3056        type Resources = ();
3057
3058        async fn run(
3059            &self,
3060            _state: i32,
3061            _resources: &Self::Resources,
3062            _bus: &mut Bus,
3063        ) -> Outcome<i32, Self::Error> {
3064            Outcome::Fault("boom".to_string())
3065        }
3066    }
3067
3068    #[derive(Clone)]
3069    struct CapabilityGuarded;
3070
3071    #[async_trait]
3072    impl Transition<(), ()> for CapabilityGuarded {
3073        type Error = String;
3074        type Resources = ();
3075
3076        fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
3077            Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
3078        }
3079
3080        async fn run(
3081            &self,
3082            _state: (),
3083            _resources: &Self::Resources,
3084            bus: &mut Bus,
3085        ) -> Outcome<(), Self::Error> {
3086            match bus.get::<String>() {
3087                Ok(_) => Outcome::Next(()),
3088                Err(err) => Outcome::Fault(err.to_string()),
3089            }
3090        }
3091    }
3092
3093    #[derive(Clone)]
3094    struct RecordingCompensationHook {
3095        calls: Arc<Mutex<Vec<CompensationContext>>>,
3096        should_fail: bool,
3097    }
3098
3099    #[async_trait]
3100    impl CompensationHook for RecordingCompensationHook {
3101        async fn compensate(&self, context: CompensationContext) -> Result<()> {
3102            self.calls.lock().await.push(context);
3103            if self.should_fail {
3104                return Err(anyhow::anyhow!("compensation failed"));
3105            }
3106            Ok(())
3107        }
3108    }
3109
3110    #[derive(Clone)]
3111    struct FlakyCompensationHook {
3112        calls: Arc<Mutex<u32>>,
3113        failures_remaining: Arc<Mutex<u32>>,
3114    }
3115
3116    #[async_trait]
3117    impl CompensationHook for FlakyCompensationHook {
3118        async fn compensate(&self, _context: CompensationContext) -> Result<()> {
3119            {
3120                let mut calls = self.calls.lock().await;
3121                *calls += 1;
3122            }
3123            let mut failures_remaining = self.failures_remaining.lock().await;
3124            if *failures_remaining > 0 {
3125                *failures_remaining -= 1;
3126                return Err(anyhow::anyhow!("transient compensation failure"));
3127            }
3128            Ok(())
3129        }
3130    }
3131
3132    #[derive(Clone)]
3133    struct FailingCompensationIdempotencyStore {
3134        read_calls: Arc<Mutex<u32>>,
3135        write_calls: Arc<Mutex<u32>>,
3136    }
3137
3138    #[async_trait]
3139    impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
3140        async fn was_compensated(&self, _key: &str) -> Result<bool> {
3141            let mut read_calls = self.read_calls.lock().await;
3142            *read_calls += 1;
3143            Err(anyhow::anyhow!("forced idempotency read failure"))
3144        }
3145
3146        async fn mark_compensated(&self, _key: &str) -> Result<()> {
3147            let mut write_calls = self.write_calls.lock().await;
3148            *write_calls += 1;
3149            Err(anyhow::anyhow!("forced idempotency write failure"))
3150        }
3151    }
3152
3153    #[tokio::test]
3154    async fn execute_persists_success_trace_when_handle_exists() {
3155        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3156        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3157
3158        let mut bus = Bus::new();
3159        bus.insert(PersistenceHandle::from_arc(store_dyn));
3160        bus.insert(PersistenceTraceId::new("trace-success"));
3161
3162        let axon = Axon::<i32, i32, TestInfallible>::start("PersistSuccess").then(AddOne);
3163        let outcome = axon.execute(41, &(), &mut bus).await;
3164        assert!(matches!(outcome, Outcome::Next(42)));
3165
3166        let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
3167        assert_eq!(persisted.events.len(), 3); // Enter + step-level Next + final Next
3168        assert_eq!(persisted.events[0].outcome_kind, "Enter");
3169        assert_eq!(persisted.events[1].outcome_kind, "Next"); // step-level
3170        assert_eq!(persisted.events[2].outcome_kind, "Next"); // final
3171        assert_eq!(persisted.completion, Some(CompletionState::Success));
3172    }
3173
3174    #[tokio::test]
3175    async fn execute_persists_fault_completion_state() {
3176        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3177        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3178
3179        let mut bus = Bus::new();
3180        bus.insert(PersistenceHandle::from_arc(store_dyn));
3181        bus.insert(PersistenceTraceId::new("trace-fault"));
3182
3183        let axon = Axon::<i32, i32, String>::start("PersistFault").then(AlwaysFault);
3184        let outcome = axon.execute(41, &(), &mut bus).await;
3185        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3186
3187        let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
3188        assert_eq!(persisted.events.len(), 3); // Enter + step-level Fault + final Fault
3189        assert_eq!(persisted.events[1].outcome_kind, "Fault"); // step-level
3190        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3191        assert_eq!(persisted.completion, Some(CompletionState::Fault));
3192    }
3193
3194    #[tokio::test]
3195    async fn execute_respects_persistence_auto_complete_off() {
3196        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3197        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3198
3199        let mut bus = Bus::new();
3200        bus.insert(PersistenceHandle::from_arc(store_dyn));
3201        bus.insert(PersistenceTraceId::new("trace-no-complete"));
3202        bus.insert(PersistenceAutoComplete(false));
3203
3204        let axon = Axon::<i32, i32, TestInfallible>::start("PersistNoComplete").then(AddOne);
3205        let outcome = axon.execute(1, &(), &mut bus).await;
3206        assert!(matches!(outcome, Outcome::Next(2)));
3207
3208        let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
3209        assert_eq!(persisted.events.len(), 3); // Enter + step-level Next + final Next
3210        assert_eq!(persisted.completion, None);
3211    }
3212
3213    #[tokio::test]
3214    async fn fault_triggers_compensation_and_marks_compensated() {
3215        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3216        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3217        let calls = Arc::new(Mutex::new(Vec::new()));
3218        let compensation = RecordingCompensationHook {
3219            calls: calls.clone(),
3220            should_fail: false,
3221        };
3222
3223        let mut bus = Bus::new();
3224        bus.insert(PersistenceHandle::from_arc(store_dyn));
3225        bus.insert(PersistenceTraceId::new("trace-compensated"));
3226        bus.insert(CompensationHandle::from_hook(compensation));
3227
3228        let axon = Axon::<i32, i32, String>::start("CompensatedFault").then(AlwaysFault);
3229        let outcome = axon.execute(7, &(), &mut bus).await;
3230        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3231
3232        let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
3233        assert_eq!(persisted.events.len(), 4); // Enter + step-level Fault + final Fault + Compensated
3234        assert_eq!(persisted.events[0].outcome_kind, "Enter");
3235        assert_eq!(persisted.events[1].outcome_kind, "Fault"); // step-level
3236        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3237        assert_eq!(persisted.events[3].outcome_kind, "Compensated");
3238        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3239
3240        let recorded = calls.lock().await;
3241        assert_eq!(recorded.len(), 1);
3242        assert_eq!(recorded[0].trace_id, "trace-compensated");
3243        assert_eq!(recorded[0].fault_kind, "Fault");
3244    }
3245
3246    #[tokio::test]
3247    async fn failed_compensation_keeps_fault_completion() {
3248        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3249        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3250        let calls = Arc::new(Mutex::new(Vec::new()));
3251        let compensation = RecordingCompensationHook {
3252            calls: calls.clone(),
3253            should_fail: true,
3254        };
3255
3256        let mut bus = Bus::new();
3257        bus.insert(PersistenceHandle::from_arc(store_dyn));
3258        bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
3259        bus.insert(CompensationHandle::from_hook(compensation));
3260
3261        let axon = Axon::<i32, i32, String>::start("CompensationFails").then(AlwaysFault);
3262        let outcome = axon.execute(7, &(), &mut bus).await;
3263        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3264
3265        let persisted = store_impl
3266            .load("trace-compensation-failed")
3267            .await
3268            .unwrap()
3269            .unwrap();
3270        assert_eq!(persisted.events.len(), 3); // Enter + step-level Fault + final Fault
3271        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3272        assert_eq!(persisted.completion, Some(CompletionState::Fault));
3273
3274        let recorded = calls.lock().await;
3275        assert_eq!(recorded.len(), 1);
3276    }
3277
3278    #[tokio::test]
3279    async fn compensation_retry_policy_succeeds_after_retries() {
3280        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3281        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3282        let calls = Arc::new(Mutex::new(0u32));
3283        let failures_remaining = Arc::new(Mutex::new(2u32));
3284        let compensation = FlakyCompensationHook {
3285            calls: calls.clone(),
3286            failures_remaining,
3287        };
3288
3289        let mut bus = Bus::new();
3290        bus.insert(PersistenceHandle::from_arc(store_dyn));
3291        bus.insert(PersistenceTraceId::new("trace-retry-success"));
3292        bus.insert(CompensationHandle::from_hook(compensation));
3293        bus.insert(CompensationRetryPolicy {
3294            max_attempts: 3,
3295            backoff_ms: 0,
3296        });
3297
3298        let axon = Axon::<i32, i32, String>::start("CompensationRetry").then(AlwaysFault);
3299        let outcome = axon.execute(7, &(), &mut bus).await;
3300        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3301
3302        let persisted = store_impl
3303            .load("trace-retry-success")
3304            .await
3305            .unwrap()
3306            .unwrap();
3307        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3308        assert_eq!(
3309            persisted.events.last().map(|e| e.outcome_kind.as_str()),
3310            Some("Compensated")
3311        );
3312
3313        let attempt_count = calls.lock().await;
3314        assert_eq!(*attempt_count, 3);
3315    }
3316
3317    #[tokio::test]
3318    async fn compensation_idempotency_skips_duplicate_hook_execution() {
3319        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3320        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3321        let calls = Arc::new(Mutex::new(Vec::new()));
3322        let compensation = RecordingCompensationHook {
3323            calls: calls.clone(),
3324            should_fail: false,
3325        };
3326        let idempotency = InMemoryCompensationIdempotencyStore::new();
3327
3328        let mut bus = Bus::new();
3329        bus.insert(PersistenceHandle::from_arc(store_dyn));
3330        bus.insert(PersistenceTraceId::new("trace-idempotent"));
3331        bus.insert(PersistenceAutoComplete(false));
3332        bus.insert(CompensationHandle::from_hook(compensation));
3333        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3334
3335        let axon = Axon::<i32, i32, String>::start("CompensationIdempotency").then(AlwaysFault);
3336
3337        let outcome1 = axon.execute(7, &(), &mut bus).await;
3338        let outcome2 = axon.execute(8, &(), &mut bus).await;
3339        assert!(matches!(outcome1, Outcome::Fault(msg) if msg == "boom"));
3340        assert!(matches!(outcome2, Outcome::Fault(msg) if msg == "boom"));
3341
3342        let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
3343        assert_eq!(persisted.completion, None);
3344        // Verify that "Compensated" events are present for both executions
3345        let compensated_count = persisted
3346            .events
3347            .iter()
3348            .filter(|e| e.outcome_kind == "Compensated")
3349            .count();
3350        assert_eq!(
3351            compensated_count, 2,
3352            "Should have 2 Compensated events (one per execution)"
3353        );
3354
3355        let recorded = calls.lock().await;
3356        assert_eq!(recorded.len(), 1);
3357    }
3358
3359    #[tokio::test]
3360    async fn compensation_idempotency_store_failure_does_not_block_compensation() {
3361        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3362        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3363        let calls = Arc::new(Mutex::new(Vec::new()));
3364        let read_calls = Arc::new(Mutex::new(0u32));
3365        let write_calls = Arc::new(Mutex::new(0u32));
3366        let compensation = RecordingCompensationHook {
3367            calls: calls.clone(),
3368            should_fail: false,
3369        };
3370        let idempotency = FailingCompensationIdempotencyStore {
3371            read_calls: read_calls.clone(),
3372            write_calls: write_calls.clone(),
3373        };
3374
3375        let mut bus = Bus::new();
3376        bus.insert(PersistenceHandle::from_arc(store_dyn));
3377        bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
3378        bus.insert(CompensationHandle::from_hook(compensation));
3379        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3380
3381        let axon = Axon::<i32, i32, String>::start("IdempotencyStoreFailure").then(AlwaysFault);
3382        let outcome = axon.execute(9, &(), &mut bus).await;
3383        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3384
3385        let persisted = store_impl
3386            .load("trace-idempotency-store-failure")
3387            .await
3388            .unwrap()
3389            .unwrap();
3390        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3391        assert_eq!(
3392            persisted.events.last().map(|e| e.outcome_kind.as_str()),
3393            Some("Compensated")
3394        );
3395
3396        let recorded = calls.lock().await;
3397        assert_eq!(recorded.len(), 1);
3398        assert_eq!(*read_calls.lock().await, 1);
3399        assert_eq!(*write_calls.lock().await, 1);
3400    }
3401
3402    #[tokio::test]
3403    async fn transition_bus_policy_blocks_unauthorized_resource_access() {
3404        let mut bus = Bus::new();
3405        bus.insert(1_i32);
3406        bus.insert("secret".to_string());
3407
3408        let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
3409        let outcome = axon.execute((), &(), &mut bus).await;
3410
3411        match outcome {
3412            Outcome::Fault(msg) => {
3413                assert!(msg.contains("Bus access denied"), "{msg}");
3414                assert!(msg.contains("CapabilityGuarded"), "{msg}");
3415                assert!(msg.contains("alloc::string::String"), "{msg}");
3416            }
3417            other => panic!("expected fault, got {other:?}"),
3418        }
3419    }
3420
3421    #[tokio::test]
3422    async fn execute_fails_on_version_mismatch_without_migration() {
3423        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3424        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3425
3426        let trace_id = "v-mismatch";
3427        // Create an existing trace with an older version
3428        let old_envelope = crate::persistence::PersistenceEnvelope {
3429            trace_id: trace_id.to_string(),
3430            circuit: "TestCircuit".to_string(),
3431            schematic_version: "0.9".to_string(),
3432            step: 0,
3433            node_id: None,
3434            outcome_kind: "Enter".to_string(),
3435            timestamp_ms: 0,
3436            payload_hash: None,
3437            payload: None,
3438        };
3439        store_impl.append(old_envelope).await.unwrap();
3440
3441        let mut bus = Bus::new();
3442        bus.insert(PersistenceHandle::from_arc(store_dyn));
3443        bus.insert(PersistenceTraceId::new(trace_id));
3444
3445        // Current axon is version 1.0
3446        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3447        let outcome = axon.execute(10, &(), &mut bus).await;
3448
3449        if let Outcome::Emit(kind, _) = outcome {
3450            assert_eq!(kind, "execution.resumption.version_mismatch_failed");
3451        } else {
3452            panic!("Expected version mismatch emission, got {:?}", outcome);
3453        }
3454    }
3455
3456    #[tokio::test]
3457    async fn execute_resumes_from_start_on_migration_strategy() {
3458        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3459        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3460
3461        let trace_id = "v-migration";
3462        // Create an existing trace with an older version at step 5
3463        let old_envelope = crate::persistence::PersistenceEnvelope {
3464            trace_id: trace_id.to_string(),
3465            circuit: "TestCircuit".to_string(),
3466            schematic_version: "0.9".to_string(),
3467            step: 5,
3468            node_id: None,
3469            outcome_kind: "Next".to_string(),
3470            timestamp_ms: 0,
3471            payload_hash: None,
3472            payload: None,
3473        };
3474        store_impl.append(old_envelope).await.unwrap();
3475
3476        let mut registry = ranvier_core::schematic::MigrationRegistry::new("TestCircuit");
3477        registry.register(ranvier_core::schematic::SnapshotMigration {
3478            name: Some("v0.9 to v1.0".to_string()),
3479            from_version: "0.9".to_string(),
3480            to_version: "1.0".to_string(),
3481            default_strategy: ranvier_core::schematic::MigrationStrategy::ResumeFromStart,
3482            node_mapping: std::collections::HashMap::new(),
3483            payload_mapper: None,
3484        });
3485
3486        let mut bus = Bus::new();
3487        bus.insert(PersistenceHandle::from_arc(store_dyn));
3488        bus.insert(PersistenceTraceId::new(trace_id));
3489        bus.insert(registry);
3490
3491        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3492        let outcome = axon.execute(10, &(), &mut bus).await;
3493
3494        // Should have resumed from start (step 0), resulting in 11
3495        assert!(matches!(outcome, Outcome::Next(11)));
3496
3497        // Verify new event has current version
3498        let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3499        assert_eq!(persisted.schematic_version, "1.0");
3500    }
3501
3502    #[tokio::test]
3503    async fn execute_applies_manual_intervention_jump_and_payload() {
3504        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3505        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3506
3507        let trace_id = "intervention-test";
3508        // 1. Run a normal trace part-way
3509        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit")
3510            .then(AddOne)
3511            .then(AddOne);
3512
3513        let mut bus = Bus::new();
3514        bus.insert(PersistenceHandle::from_arc(store_dyn));
3515        bus.insert(PersistenceTraceId::new(trace_id));
3516
3517        // Save an intervention: Jump to the second 'AddOne' node (which has the label 'AddOne')
3518        // with a payload override of 100.
3519        // The first node is 'AddOne', the second is ALSO 'AddOne'.
3520        // Schematic position: 0=Ingress, 1=AddOne, 2=AddOne
3521        let _target_node_label = "AddOne";
3522        // To be precise, let's find the ID of the second node.
3523        let target_node_id = axon.schematic.nodes[2].id.clone();
3524
3525        // Pre-seed an initial trace entry so save_intervention doesn't fail
3526        store_impl
3527            .append(crate::persistence::PersistenceEnvelope {
3528                trace_id: trace_id.to_string(),
3529                circuit: "TestCircuit".to_string(),
3530                schematic_version: "1.0".to_string(),
3531                step: 0,
3532                node_id: None,
3533                outcome_kind: "Enter".to_string(),
3534                timestamp_ms: 0,
3535                payload_hash: None,
3536                payload: None,
3537            })
3538            .await
3539            .unwrap();
3540
3541        store_impl
3542            .save_intervention(
3543                trace_id,
3544                crate::persistence::Intervention {
3545                    target_node: target_node_id.clone(),
3546                    payload_override: Some(serde_json::json!(100)),
3547                    timestamp_ms: 0,
3548                },
3549            )
3550            .await
3551            .unwrap();
3552
3553        // 2. Execute. It should skip the first AddOne and use 100 for the second AddOne.
3554        // Result should be 100 + 1 = 101.
3555        let outcome = axon.execute(10, &(), &mut bus).await;
3556
3557        match outcome {
3558            Outcome::Next(val) => assert_eq!(val, 101, "Should have used payload 100 and added 1"),
3559            other => panic!("Expected Outcome::Next(101), got {:?}", other),
3560        }
3561
3562        // Verify the jump was logged in trace
3563        let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3564        // The last event should be from the jump target's execution.
3565        assert_eq!(persisted.interventions.len(), 1);
3566        assert_eq!(persisted.interventions[0].target_node, target_node_id);
3567    }
3568
3569    // ── DLQ Retry Tests ──────────────────────────────────────────────
3570
3571    /// A transition that fails a configurable number of times before succeeding.
3572    #[derive(Clone)]
3573    struct FailNThenSucceed {
3574        remaining: Arc<tokio::sync::Mutex<u32>>,
3575    }
3576
3577    #[async_trait]
3578    impl Transition<i32, i32> for FailNThenSucceed {
3579        type Error = String;
3580        type Resources = ();
3581
3582        async fn run(
3583            &self,
3584            state: i32,
3585            _resources: &Self::Resources,
3586            _bus: &mut Bus,
3587        ) -> Outcome<i32, Self::Error> {
3588            let mut rem = self.remaining.lock().await;
3589            if *rem > 0 {
3590                *rem -= 1;
3591                Outcome::Fault("transient failure".to_string())
3592            } else {
3593                Outcome::Next(state + 1)
3594            }
3595        }
3596    }
3597
3598    /// A mock DLQ sink that records all dead letters.
3599    #[derive(Clone)]
3600    struct MockDlqSink {
3601        letters: Arc<tokio::sync::Mutex<Vec<String>>>,
3602    }
3603
3604    #[async_trait]
3605    impl DlqSink for MockDlqSink {
3606        async fn store_dead_letter(
3607            &self,
3608            workflow_id: &str,
3609            _circuit_label: &str,
3610            node_id: &str,
3611            error_msg: &str,
3612            _payload: &[u8],
3613        ) -> Result<(), String> {
3614            let entry = format!("{}:{}:{}", workflow_id, node_id, error_msg);
3615            self.letters.lock().await.push(entry);
3616            Ok(())
3617        }
3618    }
3619
3620    #[tokio::test]
3621    async fn retry_then_dlq_retries_and_succeeds_before_exhaustion() {
3622        // Fail 2 times, succeed on 3rd attempt. Policy allows 5 attempts.
3623        let remaining = Arc::new(tokio::sync::Mutex::new(2u32));
3624        let trans = FailNThenSucceed { remaining };
3625
3626        let dlq_sink = MockDlqSink {
3627            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3628        };
3629
3630        let mut bus = Bus::new();
3631        bus.insert(Timeline::new());
3632
3633        let axon = Axon::<i32, i32, String>::start("RetrySucceed")
3634            .then(trans)
3635            .with_dlq_policy(DlqPolicy::RetryThenDlq {
3636                max_attempts: 5,
3637                backoff_ms: 1,
3638            })
3639            .with_dlq_sink(dlq_sink.clone());
3640        let outcome = axon.execute(10, &(), &mut bus).await;
3641
3642        // Should succeed (10 + 1 = 11)
3643        assert!(
3644            matches!(outcome, Outcome::Next(11)),
3645            "Expected Next(11), got {:?}",
3646            outcome
3647        );
3648
3649        // No dead letters since it recovered
3650        let letters = dlq_sink.letters.lock().await;
3651        assert!(
3652            letters.is_empty(),
3653            "Should have 0 dead letters, got {}",
3654            letters.len()
3655        );
3656
3657        // Timeline should contain NodeRetry events
3658        let timeline = bus.read::<Timeline>().unwrap();
3659        let retry_count = timeline
3660            .events
3661            .iter()
3662            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3663            .count();
3664        assert_eq!(retry_count, 2, "Should have 2 retry events");
3665    }
3666
3667    #[tokio::test]
3668    async fn retry_then_dlq_exhausts_retries_and_sends_to_dlq() {
3669        // Always fails. Policy allows 3 attempts (1 initial + 2 retries).
3670        let mut bus = Bus::new();
3671        bus.insert(Timeline::new());
3672
3673        let dlq_sink = MockDlqSink {
3674            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3675        };
3676
3677        let axon = Axon::<i32, i32, String>::start("RetryExhaust")
3678            .then(AlwaysFault)
3679            .with_dlq_policy(DlqPolicy::RetryThenDlq {
3680                max_attempts: 3,
3681                backoff_ms: 1,
3682            })
3683            .with_dlq_sink(dlq_sink.clone());
3684        let outcome = axon.execute(42, &(), &mut bus).await;
3685
3686        assert!(
3687            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
3688            "Expected Fault(boom), got {:?}",
3689            outcome
3690        );
3691
3692        // Should have exactly 1 dead letter
3693        let letters = dlq_sink.letters.lock().await;
3694        assert_eq!(letters.len(), 1, "Should have 1 dead letter");
3695
3696        // Timeline should have 2 retry events and 1 DlqExhausted event
3697        let timeline = bus.read::<Timeline>().unwrap();
3698        let retry_count = timeline
3699            .events
3700            .iter()
3701            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3702            .count();
3703        let dlq_count = timeline
3704            .events
3705            .iter()
3706            .filter(|e| matches!(e, TimelineEvent::DlqExhausted { .. }))
3707            .count();
3708        assert_eq!(
3709            retry_count, 2,
3710            "Should have 2 retry events (attempts 2 and 3)"
3711        );
3712        assert_eq!(dlq_count, 1, "Should have 1 DlqExhausted event");
3713    }
3714
3715    #[tokio::test]
3716    async fn send_to_dlq_policy_sends_immediately_without_retry() {
3717        let mut bus = Bus::new();
3718        bus.insert(Timeline::new());
3719
3720        let dlq_sink = MockDlqSink {
3721            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3722        };
3723
3724        let axon = Axon::<i32, i32, String>::start("SendDlq")
3725            .then(AlwaysFault)
3726            .with_dlq_policy(DlqPolicy::SendToDlq)
3727            .with_dlq_sink(dlq_sink.clone());
3728        let outcome = axon.execute(1, &(), &mut bus).await;
3729
3730        assert!(matches!(outcome, Outcome::Fault(_)));
3731
3732        // Should have exactly 1 dead letter (immediate, no retries)
3733        let letters = dlq_sink.letters.lock().await;
3734        assert_eq!(letters.len(), 1);
3735
3736        // No retry or DlqExhausted events
3737        let timeline = bus.read::<Timeline>().unwrap();
3738        let retry_count = timeline
3739            .events
3740            .iter()
3741            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3742            .count();
3743        assert_eq!(retry_count, 0);
3744    }
3745
3746    #[tokio::test]
3747    async fn drop_policy_does_not_send_to_dlq() {
3748        let mut bus = Bus::new();
3749
3750        let dlq_sink = MockDlqSink {
3751            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3752        };
3753
3754        let axon = Axon::<i32, i32, String>::start("DropDlq")
3755            .then(AlwaysFault)
3756            .with_dlq_policy(DlqPolicy::Drop)
3757            .with_dlq_sink(dlq_sink.clone());
3758        let outcome = axon.execute(1, &(), &mut bus).await;
3759
3760        assert!(matches!(outcome, Outcome::Fault(_)));
3761
3762        // No dead letters
3763        let letters = dlq_sink.letters.lock().await;
3764        assert!(letters.is_empty());
3765    }
3766
3767    #[tokio::test]
3768    async fn dynamic_policy_hot_reload_changes_dlq_behavior() {
3769        use ranvier_core::policy::DynamicPolicy;
3770
3771        // Start with Drop policy (no DLQ)
3772        let (tx, dynamic) = DynamicPolicy::new(DlqPolicy::Drop);
3773        let dlq_sink = MockDlqSink {
3774            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3775        };
3776
3777        let axon = Axon::<i32, i32, String>::start("DynamicDlq")
3778            .then(AlwaysFault)
3779            .with_dynamic_dlq_policy(dynamic)
3780            .with_dlq_sink(dlq_sink.clone());
3781
3782        // First execution: Drop policy → no dead letters
3783        let mut bus = Bus::new();
3784        let outcome = axon.execute(1, &(), &mut bus).await;
3785        assert!(matches!(outcome, Outcome::Fault(_)));
3786        assert!(
3787            dlq_sink.letters.lock().await.is_empty(),
3788            "Drop policy should produce no DLQ entries"
3789        );
3790
3791        // Hot-reload: switch to SendToDlq
3792        tx.send(DlqPolicy::SendToDlq).unwrap();
3793
3794        // Second execution: SendToDlq policy → dead letter captured
3795        let mut bus2 = Bus::new();
3796        let outcome2 = axon.execute(2, &(), &mut bus2).await;
3797        assert!(matches!(outcome2, Outcome::Fault(_)));
3798        assert_eq!(
3799            dlq_sink.letters.lock().await.len(),
3800            1,
3801            "SendToDlq policy should produce 1 DLQ entry"
3802        );
3803    }
3804
3805    #[tokio::test]
3806    async fn dynamic_saga_policy_hot_reload() {
3807        use ranvier_core::policy::DynamicPolicy;
3808        use ranvier_core::saga::SagaPolicy;
3809
3810        // Start with Disabled saga
3811        let (tx, dynamic) = DynamicPolicy::new(SagaPolicy::Disabled);
3812
3813        let axon = Axon::<i32, i32, TestInfallible>::start("DynamicSaga")
3814            .then(AddOne)
3815            .with_dynamic_saga_policy(dynamic);
3816
3817        // First execution: Disabled → no SagaStack in bus
3818        let mut bus = Bus::new();
3819        let _outcome = axon.execute(1, &(), &mut bus).await;
3820        assert!(
3821            bus.read::<SagaStack>().is_none() || bus.read::<SagaStack>().unwrap().is_empty(),
3822            "SagaStack should be absent or empty when disabled"
3823        );
3824
3825        // Hot-reload: enable saga
3826        tx.send(SagaPolicy::Enabled).unwrap();
3827
3828        // Second execution: Enabled → SagaStack populated
3829        let mut bus2 = Bus::new();
3830        let _outcome2 = axon.execute(10, &(), &mut bus2).await;
3831        assert!(
3832            bus2.read::<SagaStack>().is_some(),
3833            "SagaStack should exist when saga is enabled"
3834        );
3835    }
3836
3837    // ── IAM Boundary Tests ──────────────────────────────────────
3838
3839    mod iam_tests {
3840        use super::*;
3841        use ranvier_core::iam::{IamError, IamIdentity, IamPolicy, IamToken, IamVerifier};
3842
3843        /// Mock IamVerifier that returns a fixed identity.
3844        #[derive(Clone)]
3845        struct MockVerifier {
3846            identity: IamIdentity,
3847            should_fail: bool,
3848        }
3849
3850        #[async_trait]
3851        impl IamVerifier for MockVerifier {
3852            async fn verify(&self, _token: &str) -> Result<IamIdentity, IamError> {
3853                if self.should_fail {
3854                    Err(IamError::InvalidToken("mock verification failure".into()))
3855                } else {
3856                    Ok(self.identity.clone())
3857                }
3858            }
3859        }
3860
3861        #[tokio::test]
3862        async fn iam_require_identity_passes_with_valid_token() {
3863            let verifier = MockVerifier {
3864                identity: IamIdentity::new("alice").with_role("user"),
3865                should_fail: false,
3866            };
3867
3868            let axon = Axon::<i32, i32, TestInfallible>::new("IamTest")
3869                .with_iam(IamPolicy::RequireIdentity, verifier)
3870                .then(AddOne);
3871
3872            let mut bus = Bus::new();
3873            bus.insert(IamToken("valid-token".to_string()));
3874            let outcome = axon.execute(10, &(), &mut bus).await;
3875
3876            assert!(matches!(outcome, Outcome::Next(11)));
3877            // Verify IamIdentity was injected into Bus
3878            let identity = bus
3879                .read::<IamIdentity>()
3880                .expect("IamIdentity should be in Bus");
3881            assert_eq!(identity.subject, "alice");
3882        }
3883
3884        #[tokio::test]
3885        async fn iam_require_identity_rejects_missing_token() {
3886            let verifier = MockVerifier {
3887                identity: IamIdentity::new("ignored"),
3888                should_fail: false,
3889            };
3890
3891            let axon = Axon::<i32, i32, TestInfallible>::new("IamNoToken")
3892                .with_iam(IamPolicy::RequireIdentity, verifier)
3893                .then(AddOne);
3894
3895            let mut bus = Bus::new();
3896            // No IamToken inserted
3897            let outcome = axon.execute(10, &(), &mut bus).await;
3898
3899            // Should emit missing_token event
3900            match &outcome {
3901                Outcome::Emit(label, _) => {
3902                    assert_eq!(label, "iam.missing_token");
3903                }
3904                other => panic!("Expected Emit(iam.missing_token), got {:?}", other),
3905            }
3906        }
3907
3908        #[tokio::test]
3909        async fn iam_rejects_failed_verification() {
3910            let verifier = MockVerifier {
3911                identity: IamIdentity::new("ignored"),
3912                should_fail: true,
3913            };
3914
3915            let axon = Axon::<i32, i32, TestInfallible>::new("IamBadToken")
3916                .with_iam(IamPolicy::RequireIdentity, verifier)
3917                .then(AddOne);
3918
3919            let mut bus = Bus::new();
3920            bus.insert(IamToken("bad-token".to_string()));
3921            let outcome = axon.execute(10, &(), &mut bus).await;
3922
3923            match &outcome {
3924                Outcome::Emit(label, _) => {
3925                    assert_eq!(label, "iam.verification_failed");
3926                }
3927                other => panic!("Expected Emit(iam.verification_failed), got {:?}", other),
3928            }
3929        }
3930
3931        #[tokio::test]
3932        async fn iam_require_role_passes_with_matching_role() {
3933            let verifier = MockVerifier {
3934                identity: IamIdentity::new("bob").with_role("admin").with_role("user"),
3935                should_fail: false,
3936            };
3937
3938            let axon = Axon::<i32, i32, TestInfallible>::new("IamRole")
3939                .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
3940                .then(AddOne);
3941
3942            let mut bus = Bus::new();
3943            bus.insert(IamToken("token".to_string()));
3944            let outcome = axon.execute(5, &(), &mut bus).await;
3945
3946            assert!(matches!(outcome, Outcome::Next(6)));
3947        }
3948
3949        #[tokio::test]
3950        async fn iam_require_role_denies_without_role() {
3951            let verifier = MockVerifier {
3952                identity: IamIdentity::new("carol").with_role("user"),
3953                should_fail: false,
3954            };
3955
3956            let axon = Axon::<i32, i32, TestInfallible>::new("IamRoleDeny")
3957                .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
3958                .then(AddOne);
3959
3960            let mut bus = Bus::new();
3961            bus.insert(IamToken("token".to_string()));
3962            let outcome = axon.execute(5, &(), &mut bus).await;
3963
3964            match &outcome {
3965                Outcome::Emit(label, _) => {
3966                    assert_eq!(label, "iam.policy_denied");
3967                }
3968                other => panic!("Expected Emit(iam.policy_denied), got {:?}", other),
3969            }
3970        }
3971
3972        #[tokio::test]
3973        async fn iam_policy_none_skips_verification() {
3974            let verifier = MockVerifier {
3975                identity: IamIdentity::new("ignored"),
3976                should_fail: true, // would fail if actually called
3977            };
3978
3979            let axon = Axon::<i32, i32, TestInfallible>::new("IamNone")
3980                .with_iam(IamPolicy::None, verifier)
3981                .then(AddOne);
3982
3983            let mut bus = Bus::new();
3984            // No token needed when policy is None
3985            let outcome = axon.execute(10, &(), &mut bus).await;
3986
3987            assert!(matches!(outcome, Outcome::Next(11)));
3988        }
3989    }
3990
3991    // ── Schema Propagation Tests (M201-RQ9, RQ12) ──────────────────
3992
3993    #[derive(Clone)]
3994    struct SchemaTransition;
3995
3996    #[async_trait]
3997    impl Transition<String, String> for SchemaTransition {
3998        type Error = String;
3999        type Resources = ();
4000
4001        fn input_schema(&self) -> Option<serde_json::Value> {
4002            Some(serde_json::json!({
4003                "type": "object",
4004                "required": ["name"],
4005                "properties": {
4006                    "name": { "type": "string" }
4007                }
4008            }))
4009        }
4010
4011        async fn run(
4012            &self,
4013            state: String,
4014            _resources: &Self::Resources,
4015            _bus: &mut Bus,
4016        ) -> Outcome<String, Self::Error> {
4017            Outcome::Next(state)
4018        }
4019    }
4020
4021    #[test]
4022    fn then_auto_populates_input_schema_from_transition() {
4023        let axon = Axon::<String, String, String>::new("SchemaTest").then(SchemaTransition);
4024
4025        // The last node (added by .then()) should have input_schema
4026        let last_node = axon.schematic.nodes.last().unwrap();
4027        assert!(last_node.input_schema.is_some());
4028        let schema = last_node.input_schema.as_ref().unwrap();
4029        assert_eq!(schema["type"], "object");
4030        assert_eq!(schema["required"][0], "name");
4031    }
4032
4033    #[test]
4034    fn then_leaves_input_schema_none_when_not_provided() {
4035        let axon = Axon::<i32, i32, TestInfallible>::new("NoSchema").then(AddOne);
4036
4037        let last_node = axon.schematic.nodes.last().unwrap();
4038        assert!(last_node.input_schema.is_none());
4039    }
4040
4041    #[test]
4042    fn with_input_schema_value_sets_on_last_node() {
4043        let schema = serde_json::json!({"type": "integer"});
4044        let axon = Axon::<i32, i32, TestInfallible>::new("ManualSchema")
4045            .then(AddOne)
4046            .with_input_schema_value(schema.clone());
4047
4048        let last_node = axon.schematic.nodes.last().unwrap();
4049        assert_eq!(last_node.input_schema.as_ref().unwrap(), &schema);
4050    }
4051
4052    #[test]
4053    fn with_output_schema_value_sets_on_last_node() {
4054        let schema = serde_json::json!({"type": "integer"});
4055        let axon = Axon::<i32, i32, TestInfallible>::new("OutputSchema")
4056            .then(AddOne)
4057            .with_output_schema_value(schema.clone());
4058
4059        let last_node = axon.schematic.nodes.last().unwrap();
4060        assert_eq!(last_node.output_schema.as_ref().unwrap(), &schema);
4061    }
4062
4063    #[test]
4064    fn schematic_export_includes_schema_fields() {
4065        let axon = Axon::<String, String, String>::new("ExportTest")
4066            .then(SchemaTransition)
4067            .with_output_schema_value(serde_json::json!({"type": "string"}));
4068
4069        let json = serde_json::to_value(&axon.schematic).unwrap();
4070        let nodes = json["nodes"].as_array().unwrap();
4071        // Find the SchemaTransition node (last one)
4072        let last = nodes.last().unwrap();
4073        assert!(last.get("input_schema").is_some());
4074        assert_eq!(last["input_schema"]["type"], "object");
4075        assert_eq!(last["output_schema"]["type"], "string");
4076    }
4077
4078    #[test]
4079    fn schematic_export_omits_schema_fields_when_none() {
4080        let axon = Axon::<i32, i32, TestInfallible>::new("NoSchemaExport").then(AddOne);
4081
4082        let json = serde_json::to_value(&axon.schematic).unwrap();
4083        let nodes = json["nodes"].as_array().unwrap();
4084        let last = nodes.last().unwrap();
4085        let obj = last.as_object().unwrap();
4086        assert!(!obj.contains_key("input_schema"));
4087        assert!(!obj.contains_key("output_schema"));
4088    }
4089
4090    #[test]
4091    fn schematic_json_roundtrip_preserves_schemas() {
4092        let axon = Axon::<String, String, String>::new("Roundtrip")
4093            .then(SchemaTransition)
4094            .with_output_schema_value(serde_json::json!({"type": "string"}));
4095
4096        let json_str = serde_json::to_string(&axon.schematic).unwrap();
4097        let deserialized: ranvier_core::schematic::Schematic =
4098            serde_json::from_str(&json_str).unwrap();
4099
4100        let last = deserialized.nodes.last().unwrap();
4101        assert!(last.input_schema.is_some());
4102        assert!(last.output_schema.is_some());
4103        assert_eq!(last.input_schema.as_ref().unwrap()["required"][0], "name");
4104        assert_eq!(last.output_schema.as_ref().unwrap()["type"], "string");
4105    }
4106
4107    // Test transitions for new unit tests
4108    #[derive(Clone)]
4109    struct MultiplyByTwo;
4110
4111    #[async_trait]
4112    impl Transition<i32, i32> for MultiplyByTwo {
4113        type Error = TestInfallible;
4114        type Resources = ();
4115
4116        async fn run(
4117            &self,
4118            state: i32,
4119            _resources: &Self::Resources,
4120            _bus: &mut Bus,
4121        ) -> Outcome<i32, Self::Error> {
4122            Outcome::Next(state * 2)
4123        }
4124    }
4125
4126    #[derive(Clone)]
4127    struct AddTen;
4128
4129    #[async_trait]
4130    impl Transition<i32, i32> for AddTen {
4131        type Error = TestInfallible;
4132        type Resources = ();
4133
4134        async fn run(
4135            &self,
4136            state: i32,
4137            _resources: &Self::Resources,
4138            _bus: &mut Bus,
4139        ) -> Outcome<i32, Self::Error> {
4140            Outcome::Next(state + 10)
4141        }
4142    }
4143
4144    #[derive(Clone)]
4145    struct AddOneString;
4146
4147    #[async_trait]
4148    impl Transition<i32, i32> for AddOneString {
4149        type Error = String;
4150        type Resources = ();
4151
4152        async fn run(
4153            &self,
4154            state: i32,
4155            _resources: &Self::Resources,
4156            _bus: &mut Bus,
4157        ) -> Outcome<i32, Self::Error> {
4158            Outcome::Next(state + 1)
4159        }
4160    }
4161
4162    #[derive(Clone)]
4163    struct AddTenString;
4164
4165    #[async_trait]
4166    impl Transition<i32, i32> for AddTenString {
4167        type Error = String;
4168        type Resources = ();
4169
4170        async fn run(
4171            &self,
4172            state: i32,
4173            _resources: &Self::Resources,
4174            _bus: &mut Bus,
4175        ) -> Outcome<i32, Self::Error> {
4176            Outcome::Next(state + 10)
4177        }
4178    }
4179
4180    #[tokio::test]
4181    async fn axon_single_step_chain_executes_and_returns_next() {
4182        let mut bus = Bus::new();
4183        let axon = Axon::<i32, i32, TestInfallible>::start("SingleStep").then(AddOne);
4184
4185        let outcome = axon.execute(5, &(), &mut bus).await;
4186        assert!(matches!(outcome, Outcome::Next(6)));
4187    }
4188
4189    #[tokio::test]
4190    async fn axon_three_step_chain_executes_in_order() {
4191        let mut bus = Bus::new();
4192        let axon = Axon::<i32, i32, TestInfallible>::start("ThreeStep")
4193            .then(AddOne)
4194            .then(MultiplyByTwo)
4195            .then(AddTen);
4196
4197        // Starting with 5: AddOne -> 6, MultiplyByTwo -> 12, AddTen -> 22
4198        let outcome = axon.execute(5, &(), &mut bus).await;
4199        assert!(matches!(outcome, Outcome::Next(22)));
4200    }
4201
4202    #[tokio::test]
4203    async fn axon_with_fault_in_middle_step_propagates_error() {
4204        let mut bus = Bus::new();
4205
4206        // Create a 3-step chain where the middle step faults
4207        // Step 1: AddOneString (5 -> 6)
4208        // Step 2: AlwaysFault (should fault here)
4209        // Step 3: AddTenString (never reached)
4210        let axon = Axon::<i32, i32, String>::start("FaultInMiddle")
4211            .then(AddOneString)
4212            .then(AlwaysFault)
4213            .then(AddTenString);
4214
4215        let outcome = axon.execute(5, &(), &mut bus).await;
4216        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
4217    }
4218
4219    #[test]
4220    fn axon_schematic_has_correct_node_count_after_chaining() {
4221        let axon = Axon::<i32, i32, TestInfallible>::start("NodeCount")
4222            .then(AddOne)
4223            .then(MultiplyByTwo)
4224            .then(AddTen);
4225
4226        // Should have 4 nodes: ingress + 3 transitions
4227        assert_eq!(axon.schematic.nodes.len(), 4);
4228        assert_eq!(axon.schematic.name, "NodeCount");
4229    }
4230
4231    #[tokio::test]
4232    async fn axon_execution_records_timeline_events() {
4233        let mut bus = Bus::new();
4234        bus.insert(Timeline::new());
4235
4236        let axon = Axon::<i32, i32, TestInfallible>::start("TimelineTest")
4237            .then(AddOne)
4238            .then(MultiplyByTwo);
4239
4240        let outcome = axon.execute(3, &(), &mut bus).await;
4241        assert!(matches!(outcome, Outcome::Next(8))); // (3 + 1) * 2 = 8
4242
4243        let timeline = bus.read::<Timeline>().unwrap();
4244
4245        // Should have NodeEnter and NodeExit events
4246        let enter_count = timeline
4247            .events
4248            .iter()
4249            .filter(|e| matches!(e, TimelineEvent::NodeEnter { .. }))
4250            .count();
4251        let exit_count = timeline
4252            .events
4253            .iter()
4254            .filter(|e| matches!(e, TimelineEvent::NodeExit { .. }))
4255            .count();
4256
4257        // Expect at least 1 enter and 1 exit (ingress node)
4258        assert!(enter_count >= 1, "Should have at least 1 NodeEnter event");
4259        assert!(exit_count >= 1, "Should have at least 1 NodeExit event");
4260    }
4261}