Skip to main content

ranvier_runtime/
axon.rs

1//! # Axon: Executable Decision Tree
2//!
3//! The `Axon` is the **runtime execution path** of a Typed Decision Tree.
4//! It functions as a reusable Typed Decision flow (Axon<In, Out, E>).
5//!
6//! ## Design Philosophy
7//!
8//! * **Axon flows, Schematic shows**: Axon executes; Schematic describes
9//! * **Builder pattern**: `Axon::start().then().then()`
10//! * **Schematic extraction**: Every Axon carries its structural metadata
11//!
12//! "Axon is the flowing thing, Schematic is the visible thing."
13
14use crate::persistence::{
15    CompensationAutoTrigger, CompensationContext, CompensationHandle,
16    CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17    PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use async_trait::async_trait;
20use ranvier_audit::{AuditEvent, AuditSink};
21use ranvier_core::bus::Bus;
22use ranvier_core::cluster::DistributedLock;
23use ranvier_core::event::{DlqPolicy, DlqSink};
24use ranvier_core::outcome::Outcome;
25use ranvier_core::policy::DynamicPolicy;
26use ranvier_core::saga::{SagaPolicy, SagaStack};
27use ranvier_core::schematic::{
28    BusCapabilitySchema, Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation,
29};
30use ranvier_core::timeline::{Timeline, TimelineEvent};
31use ranvier_core::transition::Transition;
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use std::any::type_name;
35use std::ffi::OsString;
36use std::fs;
37use std::future::Future;
38use std::panic::Location;
39use std::path::{Path, PathBuf};
40use std::pin::Pin;
41use std::sync::{Arc, Mutex, OnceLock};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tracing::Instrument;
44
45/// Configuration for Execution Mode.
46#[derive(Clone)]
47pub enum ExecutionMode {
48    /// Normal, unpartitioned local execution.
49    Local,
50    /// Singleton execution, ensures only one instance runs across the entire cluster.
51    Singleton {
52        lock_key: String,
53        ttl_ms: u64,
54        lock_provider: Arc<dyn DistributedLock>,
55    },
56}
57
58/// Type alias for async boxed futures used in Axon execution.
59pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
60
61/// Executor type for Axon steps.
62/// Now takes an input state `In`, a resource bundle `Res`, and returns an `Outcome<Out, E>`.
63/// Must be Send + Sync to be reusable across threads and clones.
64pub type Executor<In, Out, E, Res> =
65    Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
66
67/// Manual intervention jump command injected into the Bus.
68#[derive(Debug, Clone)]
69pub struct ManualJump {
70    pub target_node: String,
71    pub payload_override: Option<serde_json::Value>,
72}
73
74/// Start step index for resumption, injected into the Bus.
75#[derive(Debug, Clone, Copy)]
76struct StartStep(u64);
77
78/// Persisted state for resumption, injected into the Bus.
79#[derive(Debug, Clone)]
80struct ResumptionState {
81    payload: Option<serde_json::Value>,
82}
83
84/// Helper to extract a readable type name from a type.
85fn type_name_of<T: ?Sized>() -> String {
86    let full = type_name::<T>();
87    full.split("::").last().unwrap_or(full).to_string()
88}
89
90/// The Axon Builder and Runtime.
91///
92/// `Axon` represents an executable decision tree.
93/// It is reusable and thread-safe.
94///
95/// ## Example
96///
97/// ```rust,ignore
98/// use ranvier_core::prelude::*;
99/// // ...
100/// // Start with an identity Axon (In -> In)
101/// let axon = Axon::<(), (), _>::new("My Axon")
102///     .then(StepA)
103///     .then(StepB);
104///
105/// // Execute multiple times
106/// let res1 = axon.execute((), &mut bus1).await;
107/// let res2 = axon.execute((), &mut bus2).await;
108/// ```
109pub struct Axon<In, Out, E, Res = ()> {
110    /// The static structure (for visualization/analysis)
111    pub schematic: Schematic,
112    /// The runtime executor
113    executor: Executor<In, Out, E, Res>,
114    /// How this Axon is executed across the cluster
115    pub execution_mode: ExecutionMode,
116    /// Optional persistence store for state inspection
117    pub persistence_store: Option<Arc<dyn crate::persistence::PersistenceStore>>,
118    /// Optional audit sink for tamper-evident logging of interventions
119    pub audit_sink: Option<Arc<dyn AuditSink>>,
120    /// Optional dead-letter queue sink for storing failed events
121    pub dlq_sink: Option<Arc<dyn DlqSink>>,
122    /// Policy for handling event failures
123    pub dlq_policy: DlqPolicy,
124    /// Optional dynamic (hot-reloadable) DLQ policy — takes precedence over static `dlq_policy`
125    pub dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>,
126    /// Policy for automated saga compensation
127    pub saga_policy: SagaPolicy,
128    /// Optional dynamic (hot-reloadable) Saga policy — takes precedence over static `saga_policy`
129    pub dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>,
130    /// Registry for Saga compensation handlers
131    pub saga_compensation_registry:
132        Arc<std::sync::RwLock<ranvier_core::saga::SagaCompensationRegistry<E, Res>>>,
133    /// Optional IAM handle for identity verification at the Schematic boundary
134    pub iam_handle: Option<ranvier_core::iam::IamHandle>,
135}
136
137/// Schematic export request derived from command-line args/env.
138#[derive(Debug, Clone)]
139pub struct SchematicExportRequest {
140    /// Optional output file path. If omitted, schematic is written to stdout.
141    pub output: Option<PathBuf>,
142}
143
144impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
145    fn clone(&self) -> Self {
146        Self {
147            schematic: self.schematic.clone(),
148            executor: self.executor.clone(),
149            execution_mode: self.execution_mode.clone(),
150            persistence_store: self.persistence_store.clone(),
151            audit_sink: self.audit_sink.clone(),
152            dlq_sink: self.dlq_sink.clone(),
153            dlq_policy: self.dlq_policy.clone(),
154            dynamic_dlq_policy: self.dynamic_dlq_policy.clone(),
155            saga_policy: self.saga_policy.clone(),
156            dynamic_saga_policy: self.dynamic_saga_policy.clone(),
157            saga_compensation_registry: self.saga_compensation_registry.clone(),
158            iam_handle: self.iam_handle.clone(),
159        }
160    }
161}
162
163impl<In, E, Res> Axon<In, In, E, Res>
164where
165    In: Send + Sync + Serialize + DeserializeOwned + 'static,
166    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
167    Res: ranvier_core::transition::ResourceRequirement,
168{
169    /// Create a new Axon flow with the given label.
170    /// This is the preferred entry point per Flat API guidelines.
171    #[track_caller]
172    pub fn new(label: &str) -> Self {
173        let caller = Location::caller();
174        Self::start_with_source(label, caller)
175    }
176
177    /// Start defining a new Axon flow.
178    /// This creates an Identity Axon (In -> In) with no initial resource requirements.
179    #[track_caller]
180    pub fn start(label: &str) -> Self {
181        let caller = Location::caller();
182        Self::start_with_source(label, caller)
183    }
184
185    fn start_with_source(label: &str, caller: &'static Location<'static>) -> Self {
186        let node_id = uuid::Uuid::new_v4().to_string();
187        let node = Node {
188            id: node_id,
189            kind: NodeKind::Ingress,
190            label: label.to_string(),
191            description: None,
192            input_type: "void".to_string(),
193            output_type: type_name_of::<In>(),
194            resource_type: type_name_of::<Res>(),
195            metadata: Default::default(),
196            bus_capability: None,
197            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
198            position: None,
199            compensation_node_id: None,
200            input_schema: None,
201            output_schema: None,
202        };
203
204        let mut schematic = Schematic::new(label);
205        schematic.nodes.push(node);
206
207        let executor: Executor<In, In, E, Res> =
208            Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
209
210        Self {
211            schematic,
212            executor,
213            execution_mode: ExecutionMode::Local,
214            persistence_store: None,
215            audit_sink: None,
216            dlq_sink: None,
217            dlq_policy: DlqPolicy::default(),
218            dynamic_dlq_policy: None,
219            saga_policy: SagaPolicy::default(),
220            dynamic_saga_policy: None,
221            saga_compensation_registry: Arc::new(std::sync::RwLock::new(
222                ranvier_core::saga::SagaCompensationRegistry::new(),
223            )),
224            iam_handle: None,
225        }
226    }
227}
228
229impl<In, Out, E, Res> Axon<In, Out, E, Res>
230where
231    In: Send + Sync + Serialize + DeserializeOwned + 'static,
232    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
233    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
234    Res: ranvier_core::transition::ResourceRequirement,
235{
236    /// Update the Execution Mode for this Axon (e.g., Local vs Singleton).
237    pub fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
238        self.execution_mode = mode;
239        self
240    }
241
242    /// Set the schematic version for this Axon.
243    pub fn with_version(mut self, version: impl Into<String>) -> Self {
244        self.schematic.schema_version = version.into();
245        self
246    }
247
248    /// Attach a persistence store to enable state inspection via the Inspector.
249    pub fn with_persistence_store<S>(mut self, store: S) -> Self
250    where
251        S: crate::persistence::PersistenceStore + 'static,
252    {
253        self.persistence_store = Some(Arc::new(store));
254        self
255    }
256
257    /// Attach an audit sink for tamper-evident logging.
258    pub fn with_audit_sink<S>(mut self, sink: S) -> Self
259    where
260        S: AuditSink + 'static,
261    {
262        self.audit_sink = Some(Arc::new(sink));
263        self
264    }
265
266    /// Set the Dead Letter Queue sink for this Axon.
267    pub fn with_dlq_sink<S>(mut self, sink: S) -> Self
268    where
269        S: DlqSink + 'static,
270    {
271        self.dlq_sink = Some(Arc::new(sink));
272        self
273    }
274
275    /// Set the Dead Letter Queue policy for this Axon.
276    pub fn with_dlq_policy(mut self, policy: DlqPolicy) -> Self {
277        self.dlq_policy = policy;
278        self
279    }
280
281    /// Set the Saga compensation policy for this Axon.
282    pub fn with_saga_policy(mut self, policy: SagaPolicy) -> Self {
283        self.saga_policy = policy;
284        self
285    }
286
287    /// Set a dynamic (hot-reloadable) DLQ policy. When set, the dynamic policy's
288    /// current value is read at each execution, overriding the static `dlq_policy`.
289    pub fn with_dynamic_dlq_policy(mut self, dynamic: DynamicPolicy<DlqPolicy>) -> Self {
290        self.dynamic_dlq_policy = Some(dynamic);
291        self
292    }
293
294    /// Set a dynamic (hot-reloadable) Saga policy. When set, the dynamic policy's
295    /// current value is read at each execution, overriding the static `saga_policy`.
296    pub fn with_dynamic_saga_policy(mut self, dynamic: DynamicPolicy<SagaPolicy>) -> Self {
297        self.dynamic_saga_policy = Some(dynamic);
298        self
299    }
300
301    /// Set an IAM policy and verifier for identity verification at the Axon boundary.
302    ///
303    /// When set, `execute()` will:
304    /// 1. Read `IamToken` from the Bus (injected by the HTTP layer or test harness)
305    /// 2. Verify the token using the provided verifier
306    /// 3. Enforce the policy against the verified identity
307    /// 4. Insert the resulting `IamIdentity` into the Bus for downstream Transitions
308    pub fn with_iam(
309        mut self,
310        policy: ranvier_core::iam::IamPolicy,
311        verifier: impl ranvier_core::iam::IamVerifier + 'static,
312    ) -> Self {
313        self.iam_handle = Some(ranvier_core::iam::IamHandle::new(
314            policy,
315            Arc::new(verifier),
316        ));
317        self
318    }
319
320    /// Attach a JSON Schema for the **last node's input type** in the schematic.
321    ///
322    /// Requires the `schema` feature and `T: schemars::JsonSchema`.
323    ///
324    /// ```rust,ignore
325    /// let axon = Axon::new("My Circuit")
326    ///     .then(ProcessStep)
327    ///     .with_input_schema::<CreateUserRequest>()
328    ///     .with_output_schema::<UserResponse>();
329    /// ```
330    #[cfg(feature = "schema")]
331    pub fn with_input_schema<T>(mut self) -> Self
332    where
333        T: schemars::JsonSchema,
334    {
335        if let Some(last_node) = self.schematic.nodes.last_mut() {
336            let schema = schemars::schema_for!(T);
337            last_node.input_schema =
338                Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
339        }
340        self
341    }
342
343    /// Attach a JSON Schema for the **last node's output type** in the schematic.
344    ///
345    /// Requires the `schema` feature and `T: schemars::JsonSchema`.
346    #[cfg(feature = "schema")]
347    pub fn with_output_schema<T>(mut self) -> Self
348    where
349        T: schemars::JsonSchema,
350    {
351        if let Some(last_node) = self.schematic.nodes.last_mut() {
352            let schema = schemars::schema_for!(T);
353            last_node.output_schema =
354                Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
355        }
356        self
357    }
358
359    /// Attach a raw JSON Schema value for the **last node's input type** in the schematic.
360    ///
361    /// Use this for pre-built schemas without requiring the `schema` feature.
362    pub fn with_input_schema_value(mut self, schema: serde_json::Value) -> Self {
363        if let Some(last_node) = self.schematic.nodes.last_mut() {
364            last_node.input_schema = Some(schema);
365        }
366        self
367    }
368
369    /// Attach a raw JSON Schema value for the **last node's output type** in the schematic.
370    pub fn with_output_schema_value(mut self, schema: serde_json::Value) -> Self {
371        if let Some(last_node) = self.schematic.nodes.last_mut() {
372            last_node.output_schema = Some(schema);
373        }
374        self
375    }
376}
377
378#[async_trait]
379impl<In, Out, E, Res> ranvier_inspector::StateInspector for Axon<In, Out, E, Res>
380where
381    In: Send + Sync + Serialize + DeserializeOwned + 'static,
382    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
383    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
384    Res: ranvier_core::transition::ResourceRequirement,
385{
386    async fn get_state(&self, trace_id: &str) -> Option<serde_json::Value> {
387        let store = self.persistence_store.as_ref()?;
388        let trace = store.load(trace_id).await.ok().flatten()?;
389        Some(serde_json::to_value(trace).unwrap_or(serde_json::Value::Null))
390    }
391
392    async fn force_resume(
393        &self,
394        trace_id: &str,
395        target_node: &str,
396        payload_override: Option<Value>,
397    ) -> Result<(), String> {
398        let store = self
399            .persistence_store
400            .as_ref()
401            .ok_or("No persistence store attached to Axon")?;
402
403        let intervention = crate::persistence::Intervention {
404            target_node: target_node.to_string(),
405            payload_override,
406            timestamp_ms: now_ms(),
407        };
408
409        store
410            .save_intervention(trace_id, intervention)
411            .await
412            .map_err(|e| format!("Failed to save intervention: {}", e))?;
413
414        if let Some(sink) = self.audit_sink.as_ref() {
415            let event = AuditEvent::new(
416                uuid::Uuid::new_v4().to_string(),
417                "Inspector".to_string(),
418                "ForceResume".to_string(),
419                trace_id.to_string(),
420            )
421            .with_metadata("target_node", target_node);
422
423            let _ = sink.append(&event).await;
424        }
425
426        tracing::info!(trace_id = %trace_id, target_node = %target_node, "Force resume requested via Inspector");
427        Ok(())
428    }
429}
430
431impl<In, Out, E, Res> Axon<In, Out, E, Res>
432where
433    In: Send + Sync + Serialize + DeserializeOwned + 'static,
434    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
435    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
436    Res: ranvier_core::transition::ResourceRequirement,
437{
438    /// Chain a transition to this Axon.
439    ///
440    /// Requires the transition to use the SAME resource bundle as the previous steps.
441    #[track_caller]
442    pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
443    where
444        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
445        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
446    {
447        let caller = Location::caller();
448        // Decompose self to avoid partial move issues
449        let Axon {
450            mut schematic,
451            executor: prev_executor,
452            execution_mode,
453            persistence_store,
454            audit_sink,
455            dlq_sink,
456            dlq_policy,
457            dynamic_dlq_policy,
458            saga_policy,
459            dynamic_saga_policy,
460            saga_compensation_registry,
461            iam_handle,
462        } = self;
463
464        // Update Schematic
465        let next_node_id = uuid::Uuid::new_v4().to_string();
466        let next_node = Node {
467            id: next_node_id.clone(),
468            kind: NodeKind::Atom,
469            label: transition.label(),
470            description: transition.description(),
471            input_type: type_name_of::<Out>(),
472            output_type: type_name_of::<Next>(),
473            resource_type: type_name_of::<Res>(),
474            metadata: Default::default(),
475            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
476            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
477            position: transition
478                .position()
479                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
480            compensation_node_id: None,
481            input_schema: transition.input_schema(),
482            output_schema: None,
483        };
484
485        let last_node_id = schematic
486            .nodes
487            .last()
488            .map(|n| n.id.clone())
489            .unwrap_or_default();
490
491        schematic.nodes.push(next_node);
492        schematic.edges.push(Edge {
493            from: last_node_id,
494            to: next_node_id.clone(),
495            kind: EdgeType::Linear,
496            label: Some("Next".to_string()),
497        });
498
499        // Compose Executor
500        let node_id_for_exec = next_node_id.clone();
501        let node_label_for_exec = transition.label();
502        let bus_policy_for_exec = transition.bus_access_policy();
503        let bus_policy_clone = bus_policy_for_exec.clone();
504        let current_step_idx = schematic.nodes.len() as u64 - 1;
505        let next_executor: Executor<In, Next, E, Res> = Arc::new(
506            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
507                let prev = prev_executor.clone();
508                let trans = transition.clone();
509                let timeline_node_id = node_id_for_exec.clone();
510                let timeline_node_label = node_label_for_exec.clone();
511                let transition_bus_policy = bus_policy_clone.clone();
512                let step_idx = current_step_idx;
513
514                Box::pin(async move {
515                    // Check for manual intervention jump
516                    if let Some(jump) = bus.read::<ManualJump>()
517                        && (jump.target_node == timeline_node_id
518                            || jump.target_node == timeline_node_label)
519                    {
520                        tracing::info!(
521                            node_id = %timeline_node_id,
522                            node_label = %timeline_node_label,
523                            "Manual jump target reached; skipping previous steps"
524                        );
525
526                        let state = if let Some(ow) = jump.payload_override.clone() {
527                            match serde_json::from_value::<Out>(ow) {
528                                Ok(s) => s,
529                                Err(e) => {
530                                    tracing::error!(
531                                        "Payload override deserialization failed: {}",
532                                        e
533                                    );
534                                    return Outcome::emit(
535                                        "execution.jump.payload_error",
536                                        Some(serde_json::json!({"error": e.to_string()})),
537                                    )
538                                    .map(|_: ()| unreachable!());
539                                }
540                            }
541                        } else {
542                            // Default back to the provided input if this is an identity jump or types match
543                            // For now, treat missing payload on a mid-flow jump as an avoidable error if Possible.
544                            // In a better implementation, we'd try to load the last persisted Out for the previous step.
545                            return Outcome::emit(
546                                "execution.jump.missing_payload",
547                                Some(serde_json::json!({"node_id": timeline_node_id})),
548                            );
549                        };
550
551                        // Skip prev() and continue with trans.run(state, ...)
552                        return run_this_step::<Out, Next, E, Res>(
553                            &trans,
554                            state,
555                            res,
556                            bus,
557                            &timeline_node_id,
558                            &timeline_node_label,
559                            &transition_bus_policy,
560                            step_idx,
561                        )
562                        .await;
563                    }
564
565                    // Handle resumption skip
566                    if let Some(start) = bus.read::<StartStep>()
567                        && step_idx == start.0
568                        && bus.read::<ResumptionState>().is_some()
569                    {
570                        // Prefer fresh input (In → Out via JSON round-trip).
571                        // The caller provides updated state (e.g., corrected data after a fault).
572                        // Falls back to persisted checkpoint state when types are incompatible.
573                        let fresh_state = serde_json::to_value(&input)
574                            .ok()
575                            .and_then(|v| serde_json::from_value::<Out>(v).ok());
576                        let persisted_state = bus
577                            .read::<ResumptionState>()
578                            .and_then(|r| r.payload.clone())
579                            .and_then(|p| serde_json::from_value::<Out>(p).ok());
580
581                        if let Some(s) = fresh_state.or(persisted_state) {
582                            tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint");
583                            return run_this_step::<Out, Next, E, Res>(
584                                &trans,
585                                s,
586                                res,
587                                bus,
588                                &timeline_node_id,
589                                &timeline_node_label,
590                                &transition_bus_policy,
591                                step_idx,
592                            )
593                            .await;
594                        }
595
596                        return Outcome::emit(
597                            "execution.resumption.payload_error",
598                            Some(serde_json::json!({"error": "no compatible resumption state"})),
599                        )
600                        .map(|_: ()| unreachable!());
601                    }
602
603                    // Run previous step
604                    let prev_result = prev(input, res, bus).await;
605
606                    // Unpack
607                    let state = match prev_result {
608                        Outcome::Next(t) => t,
609                        other => return other.map(|_| unreachable!()),
610                    };
611
612                    run_this_step::<Out, Next, E, Res>(
613                        &trans,
614                        state,
615                        res,
616                        bus,
617                        &timeline_node_id,
618                        &timeline_node_label,
619                        &transition_bus_policy,
620                        step_idx,
621                    )
622                    .await
623                })
624            },
625        );
626        Axon {
627            schematic,
628            executor: next_executor,
629            execution_mode,
630            persistence_store,
631            audit_sink,
632            dlq_sink,
633            dlq_policy,
634            dynamic_dlq_policy,
635            saga_policy,
636            dynamic_saga_policy,
637            saga_compensation_registry,
638            iam_handle,
639        }
640    }
641
642    /// Chain a transition to this Axon with a Saga compensation step.
643    ///
644    /// If the transition fails, the compensation transition will be executed
645    /// automatically if `CompensationAutoTrigger` is enabled in the Bus.
646    #[track_caller]
647    pub fn then_compensated<Next, Trans, Comp>(
648        self,
649        transition: Trans,
650        compensation: Comp,
651    ) -> Axon<In, Next, E, Res>
652    where
653        Out: Clone,
654        Next: Send + Sync + Serialize + DeserializeOwned + 'static,
655        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
656        Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
657    {
658        let caller = Location::caller();
659        let Axon {
660            mut schematic,
661            executor: prev_executor,
662            execution_mode,
663            persistence_store,
664            audit_sink,
665            dlq_sink,
666            dlq_policy,
667            dynamic_dlq_policy,
668            saga_policy,
669            dynamic_saga_policy,
670            saga_compensation_registry,
671            iam_handle,
672        } = self;
673
674        // 1. Add Primary Node
675        let next_node_id = uuid::Uuid::new_v4().to_string();
676        let comp_node_id = uuid::Uuid::new_v4().to_string();
677
678        let next_node = Node {
679            id: next_node_id.clone(),
680            kind: NodeKind::Atom,
681            label: transition.label(),
682            description: transition.description(),
683            input_type: type_name_of::<Out>(),
684            output_type: type_name_of::<Next>(),
685            resource_type: type_name_of::<Res>(),
686            metadata: Default::default(),
687            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
688            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
689            position: transition
690                .position()
691                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
692            compensation_node_id: Some(comp_node_id.clone()),
693            input_schema: None,
694            output_schema: None,
695        };
696
697        // 2. Add Compensation Node
698        let comp_node = Node {
699            id: comp_node_id.clone(),
700            kind: NodeKind::Atom,
701            label: format!("Compensate: {}", compensation.label()),
702            description: compensation.description(),
703            input_type: type_name_of::<Out>(),
704            output_type: "void".to_string(),
705            resource_type: type_name_of::<Res>(),
706            metadata: Default::default(),
707            bus_capability: None,
708            source_location: None,
709            position: compensation
710                .position()
711                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
712            compensation_node_id: None,
713            input_schema: None,
714            output_schema: None,
715        };
716
717        let last_node_id = schematic
718            .nodes
719            .last()
720            .map(|n| n.id.clone())
721            .unwrap_or_default();
722
723        schematic.nodes.push(next_node);
724        schematic.nodes.push(comp_node);
725        schematic.edges.push(Edge {
726            from: last_node_id,
727            to: next_node_id.clone(),
728            kind: EdgeType::Linear,
729            label: Some("Next".to_string()),
730        });
731
732        // 3. Compose Executor with Compensation Logic
733        let node_id_for_exec = next_node_id.clone();
734        let comp_id_for_exec = comp_node_id.clone();
735        let node_label_for_exec = transition.label();
736        let bus_policy_for_exec = transition.bus_access_policy();
737        let step_idx_for_exec = schematic.nodes.len() as u64 - 2;
738        let comp_for_exec = compensation.clone();
739        let bus_policy_for_executor = bus_policy_for_exec.clone();
740        let bus_policy_for_registry = bus_policy_for_exec.clone();
741        let next_executor: Executor<In, Next, E, Res> = Arc::new(
742            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
743                let prev = prev_executor.clone();
744                let trans = transition.clone();
745                let comp = comp_for_exec.clone();
746                let timeline_node_id = node_id_for_exec.clone();
747                let timeline_comp_id = comp_id_for_exec.clone();
748                let timeline_node_label = node_label_for_exec.clone();
749                let transition_bus_policy = bus_policy_for_executor.clone();
750                let step_idx = step_idx_for_exec;
751
752                Box::pin(async move {
753                    // Check for manual intervention jump
754                    if let Some(jump) = bus.read::<ManualJump>()
755                        && (jump.target_node == timeline_node_id
756                            || jump.target_node == timeline_node_label)
757                    {
758                        tracing::info!(
759                            node_id = %timeline_node_id,
760                            node_label = %timeline_node_label,
761                            "Manual jump target reached (compensated); skipping previous steps"
762                        );
763
764                        let state = if let Some(ow) = jump.payload_override.clone() {
765                            match serde_json::from_value::<Out>(ow) {
766                                Ok(s) => s,
767                                Err(e) => {
768                                    tracing::error!(
769                                        "Payload override deserialization failed: {}",
770                                        e
771                                    );
772                                    return Outcome::emit(
773                                        "execution.jump.payload_error",
774                                        Some(serde_json::json!({"error": e.to_string()})),
775                                    )
776                                    .map(|_: ()| unreachable!());
777                                }
778                            }
779                        } else {
780                            return Outcome::emit(
781                                "execution.jump.missing_payload",
782                                Some(serde_json::json!({"node_id": timeline_node_id})),
783                            )
784                            .map(|_: ()| unreachable!());
785                        };
786
787                        // Skip prev() and continue with trans.run(state, ...)
788                        return run_this_compensated_step::<Out, Next, E, Res, Comp>(
789                            &trans,
790                            &comp,
791                            state,
792                            res,
793                            bus,
794                            &timeline_node_id,
795                            &timeline_comp_id,
796                            &timeline_node_label,
797                            &transition_bus_policy,
798                            step_idx,
799                        )
800                        .await;
801                    }
802
803                    // Handle resumption skip
804                    if let Some(start) = bus.read::<StartStep>()
805                        && step_idx == start.0
806                        && bus.read::<ResumptionState>().is_some()
807                    {
808                        let fresh_state = serde_json::to_value(&input)
809                            .ok()
810                            .and_then(|v| serde_json::from_value::<Out>(v).ok());
811                        let persisted_state = bus
812                            .read::<ResumptionState>()
813                            .and_then(|r| r.payload.clone())
814                            .and_then(|p| serde_json::from_value::<Out>(p).ok());
815
816                        if let Some(s) = fresh_state.or(persisted_state) {
817                            tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint (compensated)");
818                            return run_this_compensated_step::<Out, Next, E, Res, Comp>(
819                                &trans,
820                                &comp,
821                                s,
822                                res,
823                                bus,
824                                &timeline_node_id,
825                                &timeline_comp_id,
826                                &timeline_node_label,
827                                &transition_bus_policy,
828                                step_idx,
829                            )
830                            .await;
831                        }
832
833                        return Outcome::emit(
834                            "execution.resumption.payload_error",
835                            Some(serde_json::json!({"error": "no compatible resumption state"})),
836                        )
837                        .map(|_: ()| unreachable!());
838                    }
839
840                    // Run previous step
841                    let prev_result = prev(input, res, bus).await;
842
843                    // Unpack
844                    let state = match prev_result {
845                        Outcome::Next(t) => t,
846                        other => return other.map(|_| unreachable!()),
847                    };
848
849                    run_this_compensated_step::<Out, Next, E, Res, Comp>(
850                        &trans,
851                        &comp,
852                        state,
853                        res,
854                        bus,
855                        &timeline_node_id,
856                        &timeline_comp_id,
857                        &timeline_node_label,
858                        &transition_bus_policy,
859                        step_idx,
860                    )
861                    .await
862                })
863            },
864        );
865        // 4. Register Saga Compensation if enabled
866        {
867            let mut registry = saga_compensation_registry.write().unwrap();
868            let comp_fn = compensation.clone();
869            let transition_bus_policy = bus_policy_for_registry.clone();
870
871            let handler: ranvier_core::saga::SagaCompensationFn<E, Res> =
872                Arc::new(move |input_data, res, bus| {
873                    let comp = comp_fn.clone();
874                    let bus_policy = transition_bus_policy.clone();
875                    Box::pin(async move {
876                        let input: Out = serde_json::from_slice(&input_data).unwrap();
877                        bus.set_access_policy(comp.label(), bus_policy);
878                        let res = comp.run(input, res, bus).await;
879                        bus.clear_access_policy();
880                        res
881                    })
882                });
883            registry.register(next_node_id.clone(), handler);
884        }
885
886        Axon {
887            schematic,
888            executor: next_executor,
889            execution_mode,
890            persistence_store,
891            audit_sink,
892            dlq_sink,
893            dlq_policy,
894            dynamic_dlq_policy,
895            saga_policy,
896            dynamic_saga_policy,
897            saga_compensation_registry,
898            iam_handle,
899        }
900    }
901
902    /// Attach a compensation transition to the previously added node.
903    /// This establishes a Schematic-level Saga compensation mapping.
904    #[track_caller]
905    pub fn compensate_with<Comp>(mut self, transition: Comp) -> Self
906    where
907        Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
908    {
909        // NOTE: This currently only updates the Schematic.
910        // For runtime compensation behavior, use `then_compensated`.
911        let caller = Location::caller();
912        let comp_node_id = uuid::Uuid::new_v4().to_string();
913
914        let comp_node = Node {
915            id: comp_node_id.clone(),
916            kind: NodeKind::Atom,
917            label: transition.label(),
918            description: transition.description(),
919            input_type: type_name_of::<Out>(),
920            output_type: "void".to_string(),
921            resource_type: type_name_of::<Res>(),
922            metadata: Default::default(),
923            bus_capability: None,
924            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
925            position: transition
926                .position()
927                .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
928            compensation_node_id: None,
929            input_schema: None,
930            output_schema: None,
931        };
932
933        if let Some(last_node) = self.schematic.nodes.last_mut() {
934            last_node.compensation_node_id = Some(comp_node_id.clone());
935        }
936
937        self.schematic.nodes.push(comp_node);
938        self
939    }
940
941    /// Add a branch point
942    #[track_caller]
943    pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
944        let caller = Location::caller();
945        let branch_id_str = branch_id.into();
946        let last_node_id = self
947            .schematic
948            .nodes
949            .last()
950            .map(|n| n.id.clone())
951            .unwrap_or_default();
952
953        let branch_node = Node {
954            id: uuid::Uuid::new_v4().to_string(),
955            kind: NodeKind::Synapse,
956            label: label.to_string(),
957            description: None,
958            input_type: type_name_of::<Out>(),
959            output_type: type_name_of::<Out>(),
960            resource_type: type_name_of::<Res>(),
961            metadata: Default::default(),
962            bus_capability: None,
963            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
964            position: None,
965            compensation_node_id: None,
966            input_schema: None,
967            output_schema: None,
968        };
969
970        self.schematic.nodes.push(branch_node);
971        self.schematic.edges.push(Edge {
972            from: last_node_id,
973            to: branch_id_str.clone(),
974            kind: EdgeType::Branch(branch_id_str),
975            label: Some("Branch".to_string()),
976        });
977
978        self
979    }
980
981    /// Execute the Axon with the given input and resources.
982    pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
983        if let ExecutionMode::Singleton {
984            lock_key,
985            ttl_ms,
986            lock_provider,
987        } = &self.execution_mode
988        {
989            let trace_span = tracing::info_span!("Singleton Execution", key = %lock_key);
990            let _enter = trace_span.enter();
991            match lock_provider.try_acquire(lock_key, *ttl_ms).await {
992                Ok(true) => {
993                    tracing::debug!("Successfully acquired singleton lock: {}", lock_key);
994                }
995                Ok(false) => {
996                    tracing::debug!(
997                        "Singleton lock {} already held, aborting execution.",
998                        lock_key
999                    );
1000                    // Emit a specific event indicating skip
1001                    return Outcome::emit(
1002                        "execution.skipped.lock_held",
1003                        Some(serde_json::json!({
1004                            "lock_key": lock_key
1005                        })),
1006                    );
1007                }
1008                Err(e) => {
1009                    tracing::error!("Failed to check singleton lock {}: {:?}", lock_key, e);
1010                    return Outcome::emit(
1011                        "execution.skipped.lock_error",
1012                        Some(serde_json::json!({
1013                            "error": e.to_string()
1014                        })),
1015                    );
1016                }
1017            }
1018        }
1019
1020        // ── IAM Boundary Check ────────────────────────────────────
1021        if let Some(iam) = &self.iam_handle {
1022            use ranvier_core::iam::{IamPolicy, IamToken, enforce_policy};
1023
1024            if matches!(iam.policy, IamPolicy::None) {
1025                // No verification required — skip
1026            } else {
1027                let token = bus.read::<IamToken>().map(|t| t.0.clone());
1028
1029                match token {
1030                    Some(raw_token) => {
1031                        match iam.verifier.verify(&raw_token).await {
1032                            Ok(identity) => {
1033                                if let Err(e) = enforce_policy(&iam.policy, &identity) {
1034                                    tracing::warn!(
1035                                        policy = ?iam.policy,
1036                                        subject = %identity.subject,
1037                                        "IAM policy enforcement failed: {}",
1038                                        e
1039                                    );
1040                                    return Outcome::emit(
1041                                        "iam.policy_denied",
1042                                        Some(serde_json::json!({
1043                                            "error": e.to_string(),
1044                                            "subject": identity.subject,
1045                                        })),
1046                                    );
1047                                }
1048                                // Insert verified identity into Bus for downstream access
1049                                bus.insert(identity);
1050                            }
1051                            Err(e) => {
1052                                tracing::warn!("IAM token verification failed: {}", e);
1053                                return Outcome::emit(
1054                                    "iam.verification_failed",
1055                                    Some(serde_json::json!({
1056                                        "error": e.to_string()
1057                                    })),
1058                                );
1059                            }
1060                        }
1061                    }
1062                    None => {
1063                        tracing::warn!("IAM policy requires token but none found in Bus");
1064                        return Outcome::emit("iam.missing_token", None);
1065                    }
1066                }
1067            }
1068        }
1069
1070        let trace_id = persistence_trace_id(bus);
1071        let label = self.schematic.name.clone();
1072
1073        // Inject DLQ config into Bus for step-level reporting
1074        if let Some(sink) = &self.dlq_sink {
1075            bus.insert(sink.clone());
1076        }
1077        // Prefer dynamic (hot-reloadable) policy if available, otherwise use static
1078        let effective_dlq_policy = self
1079            .dynamic_dlq_policy
1080            .as_ref()
1081            .map(|d| d.current())
1082            .unwrap_or_else(|| self.dlq_policy.clone());
1083        bus.insert(effective_dlq_policy);
1084        bus.insert(self.schematic.clone());
1085        let effective_saga_policy = self
1086            .dynamic_saga_policy
1087            .as_ref()
1088            .map(|d| d.current())
1089            .unwrap_or_else(|| self.saga_policy.clone());
1090        bus.insert(effective_saga_policy.clone());
1091
1092        // Initialize Saga stack if enabled and not already present (e.g. from resumption)
1093        if effective_saga_policy == SagaPolicy::Enabled && bus.read::<SagaStack>().is_none() {
1094            bus.insert(SagaStack::new());
1095        }
1096
1097        let persistence_handle = bus.read::<PersistenceHandle>().cloned();
1098        let compensation_handle = bus.read::<CompensationHandle>().cloned();
1099        let compensation_retry_policy = compensation_retry_policy(bus);
1100        let compensation_idempotency = bus.read::<CompensationIdempotencyHandle>().cloned();
1101        let version = self.schematic.schema_version.clone();
1102        let migration_registry = bus
1103            .read::<ranvier_core::schematic::MigrationRegistry>()
1104            .cloned();
1105
1106        let persistence_start_step = if let Some(handle) = persistence_handle.as_ref() {
1107            let (mut start_step, trace_version, intervention, last_node_id, mut last_payload) =
1108                load_persistence_version(handle, &trace_id).await;
1109
1110            if let Some(interv) = intervention {
1111                tracing::info!(
1112                    trace_id = %trace_id,
1113                    target_node = %interv.target_node,
1114                    "Applying manual intervention command"
1115                );
1116
1117                // Find the step index for the target node
1118                if let Some(target_idx) = self
1119                    .schematic
1120                    .nodes
1121                    .iter()
1122                    .position(|n| n.id == interv.target_node || n.label == interv.target_node)
1123                {
1124                    tracing::info!(
1125                        trace_id = %trace_id,
1126                        target_node = %interv.target_node,
1127                        target_step = target_idx,
1128                        "Intervention: Jumping to target node"
1129                    );
1130                    start_step = target_idx as u64;
1131
1132                    // Inject ManualJump into the bus so executors can handle skipping/payload overrides
1133                    bus.insert(ManualJump {
1134                        target_node: interv.target_node.clone(),
1135                        payload_override: interv.payload_override.clone(),
1136                    });
1137
1138                    // Log audit event for intervention application
1139                    if let Some(sink) = self.audit_sink.as_ref() {
1140                        let event = AuditEvent::new(
1141                            uuid::Uuid::new_v4().to_string(),
1142                            "System".to_string(),
1143                            "ApplyIntervention".to_string(),
1144                            trace_id.to_string(),
1145                        )
1146                        .with_metadata("target_node", interv.target_node.clone())
1147                        .with_metadata("target_step", target_idx);
1148
1149                        let _ = sink.append(&event).await;
1150                    }
1151                } else {
1152                    tracing::warn!(
1153                        trace_id = %trace_id,
1154                        target_node = %interv.target_node,
1155                        "Intervention target node not found in schematic; ignoring jump"
1156                    );
1157                }
1158            }
1159
1160            if let Some(old_version) = trace_version
1161                && old_version != version
1162            {
1163                tracing::info!(
1164                    trace_id = %trace_id,
1165                    old_version = %old_version,
1166                    current_version = %version,
1167                    "Version mismatch detected during resumption"
1168                );
1169
1170                // Try multi-hop migration path first, fall back to direct lookup
1171                let migration_path = migration_registry
1172                    .as_ref()
1173                    .and_then(|r| r.find_migration_path(&old_version, &version));
1174
1175                let (final_migration, mapped_payload) = if let Some(path) = migration_path {
1176                    if path.is_empty() {
1177                        (None, last_payload.clone())
1178                    } else {
1179                        // Apply payload mappers along the migration chain
1180                        let mut payload = last_payload.clone();
1181                        for hop in &path {
1182                            if let (Some(mapper), Some(p)) = (&hop.payload_mapper, payload.as_ref())
1183                            {
1184                                match mapper.map_state(p) {
1185                                    Ok(mapped) => payload = Some(mapped),
1186                                    Err(e) => {
1187                                        tracing::error!(
1188                                            trace_id = %trace_id,
1189                                            from = %hop.from_version,
1190                                            to = %hop.to_version,
1191                                            error = %e,
1192                                            "Payload migration mapper failed"
1193                                        );
1194                                        return Outcome::emit(
1195                                            "execution.resumption.payload_migration_failed",
1196                                            Some(serde_json::json!({
1197                                                "trace_id": trace_id,
1198                                                "from": hop.from_version,
1199                                                "to": hop.to_version,
1200                                                "error": e.to_string()
1201                                            })),
1202                                        );
1203                                    }
1204                                }
1205                            }
1206                        }
1207                        let hops: Vec<String> = path
1208                            .iter()
1209                            .map(|h| format!("{}->{}", h.from_version, h.to_version))
1210                            .collect();
1211                        tracing::info!(trace_id = %trace_id, hops = ?hops, "Applied multi-hop migration path");
1212                        (path.last().copied(), payload)
1213                    }
1214                } else {
1215                    (None, last_payload.clone())
1216                };
1217
1218                // Use the final migration in the path to determine strategy
1219                let migration = final_migration.or_else(|| {
1220                    migration_registry
1221                        .as_ref()
1222                        .and_then(|r| r.find_migration(&old_version, &version))
1223                });
1224
1225                // Update last_payload with mapped version
1226                if mapped_payload.is_some() {
1227                    last_payload = mapped_payload;
1228                }
1229
1230                let strategy = if let (Some(m), Some(node_id)) = (migration, last_node_id.as_ref())
1231                {
1232                    m.node_mapping
1233                        .get(node_id)
1234                        .cloned()
1235                        .unwrap_or(m.default_strategy.clone())
1236                } else {
1237                    migration
1238                        .map(|m| m.default_strategy.clone())
1239                        .unwrap_or(ranvier_core::schematic::MigrationStrategy::Fail)
1240                };
1241
1242                match strategy {
1243                    ranvier_core::schematic::MigrationStrategy::ResumeFromStart => {
1244                        tracing::info!(trace_id = %trace_id, "Applying ResumeFromStart migration strategy");
1245                        start_step = 0;
1246                    }
1247                    ranvier_core::schematic::MigrationStrategy::MigrateActiveNode {
1248                        new_node_id,
1249                        ..
1250                    } => {
1251                        tracing::info!(trace_id = %trace_id, to_node = %new_node_id, "Applying MigrateActiveNode strategy");
1252                        if let Some(target_idx) = self
1253                            .schematic
1254                            .nodes
1255                            .iter()
1256                            .position(|n| n.id == new_node_id || n.label == new_node_id)
1257                        {
1258                            start_step = target_idx as u64;
1259                        } else {
1260                            tracing::warn!(trace_id = %trace_id, "MigrateActiveNode: target node {} not found", new_node_id);
1261                            return Outcome::emit(
1262                                "execution.resumption.migration_target_not_found",
1263                                Some(serde_json::json!({ "node_id": new_node_id })),
1264                            );
1265                        }
1266                    }
1267                    ranvier_core::schematic::MigrationStrategy::FallbackToNode(node_id) => {
1268                        tracing::info!(trace_id = %trace_id, to_node = %node_id, "Applying FallbackToNode strategy");
1269                        if let Some(target_idx) = self
1270                            .schematic
1271                            .nodes
1272                            .iter()
1273                            .position(|n| n.id == node_id || n.label == node_id)
1274                        {
1275                            start_step = target_idx as u64;
1276                        } else {
1277                            tracing::warn!(trace_id = %trace_id, "FallbackToNode: node {} not found", node_id);
1278                            return Outcome::emit(
1279                                "execution.resumption.migration_target_not_found",
1280                                Some(serde_json::json!({ "node_id": node_id })),
1281                            );
1282                        }
1283                    }
1284                    ranvier_core::schematic::MigrationStrategy::Fail => {
1285                        tracing::error!(trace_id = %trace_id, "Version mismatch: no migration path found. Failing resumption.");
1286                        return Outcome::emit(
1287                            "execution.resumption.version_mismatch_failed",
1288                            Some(serde_json::json!({
1289                                "trace_id": trace_id,
1290                                "old_version": old_version,
1291                                "current_version": version
1292                            })),
1293                        );
1294                    }
1295                    _ => {
1296                        tracing::error!(trace_id = %trace_id, "Unsupported migration strategy: {:?}", strategy);
1297                        return Outcome::emit(
1298                            "execution.resumption.unsupported_migration",
1299                            Some(serde_json::json!({
1300                                "trace_id": trace_id,
1301                                "strategy": format!("{:?}", strategy)
1302                            })),
1303                        );
1304                    }
1305                }
1306            }
1307
1308            let ingress_node_id = self.schematic.nodes.first().map(|n| n.id.clone());
1309            persist_execution_event(
1310                handle,
1311                &trace_id,
1312                &label,
1313                &version,
1314                start_step,
1315                ingress_node_id,
1316                "Enter",
1317                None,
1318            )
1319            .await;
1320
1321            bus.insert(StartStep(start_step));
1322            if start_step > 0 {
1323                bus.insert(ResumptionState {
1324                    payload: last_payload,
1325                });
1326            }
1327
1328            Some(start_step)
1329        } else {
1330            None
1331        };
1332
1333        let should_capture = should_attach_timeline(bus);
1334        let inserted_timeline = if should_capture {
1335            ensure_timeline(bus)
1336        } else {
1337            false
1338        };
1339        let ingress_started = std::time::Instant::now();
1340        let ingress_enter_ts = now_ms();
1341        if should_capture
1342            && let (Some(timeline), Some(ingress)) =
1343                (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1344        {
1345            timeline.push(TimelineEvent::NodeEnter {
1346                node_id: ingress.id.clone(),
1347                node_label: ingress.label.clone(),
1348                timestamp: ingress_enter_ts,
1349            });
1350        }
1351
1352        let circuit_span = tracing::info_span!(
1353            "Circuit",
1354            ranvier.circuit = %label,
1355            ranvier.outcome_kind = tracing::field::Empty,
1356            ranvier.outcome_target = tracing::field::Empty
1357        );
1358        let outcome = (self.executor)(input, resources, bus)
1359            .instrument(circuit_span.clone())
1360            .await;
1361        circuit_span.record("ranvier.outcome_kind", outcome_kind_name(&outcome));
1362        if let Some(target) = outcome_target(&outcome) {
1363            circuit_span.record("ranvier.outcome_target", tracing::field::display(&target));
1364        }
1365
1366        // Automated Saga Rollback (LIFO)
1367        if matches!(outcome, Outcome::Fault(_)) && self.saga_policy == SagaPolicy::Enabled {
1368            while let Some(task) = {
1369                let mut stack = bus.read_mut::<SagaStack>();
1370                stack.as_mut().and_then(|s| s.pop())
1371            } {
1372                tracing::info!(trace_id = %trace_id, node_id = %task.node_id, "Compensating step: {}", task.node_label);
1373
1374                let handler = {
1375                    let registry = self.saga_compensation_registry.read().unwrap();
1376                    registry.get(&task.node_id)
1377                };
1378                if let Some(handler) = handler {
1379                    let res = handler(task.input_snapshot, resources, bus).await;
1380                    if let Outcome::Fault(e) = res {
1381                        tracing::error!(trace_id = %trace_id, node_id = %task.node_id, "Saga compensation FAILED: {:?}", e);
1382                    }
1383                } else {
1384                    tracing::warn!(trace_id = %trace_id, node_id = %task.node_id, "No compensation handler found in registry for saga rollback");
1385                }
1386            }
1387            tracing::info!(trace_id = %trace_id, "Saga automated rollback completed");
1388        }
1389
1390        let ingress_exit_ts = now_ms();
1391        if should_capture
1392            && let (Some(timeline), Some(ingress)) =
1393                (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1394        {
1395            timeline.push(TimelineEvent::NodeExit {
1396                node_id: ingress.id.clone(),
1397                outcome_type: outcome_type_name(&outcome),
1398                duration_ms: ingress_started.elapsed().as_millis() as u64,
1399                timestamp: ingress_exit_ts,
1400            });
1401        }
1402
1403        if let Some(handle) = persistence_handle.as_ref() {
1404            let fault_step = persistence_start_step.map(|s| s + 1).unwrap_or(1);
1405            persist_execution_event(
1406                handle,
1407                &trace_id,
1408                &label,
1409                &version,
1410                fault_step,
1411                None, // Outcome-level events might not have a single node_id context here
1412                outcome_kind_name(&outcome),
1413                Some(outcome.to_json_value()),
1414            )
1415            .await;
1416
1417            let mut completion = completion_from_outcome(&outcome);
1418            if matches!(outcome, Outcome::Fault(_))
1419                && let Some(compensation) = compensation_handle.as_ref()
1420                && compensation_auto_trigger(bus)
1421            {
1422                let context = CompensationContext {
1423                    trace_id: trace_id.clone(),
1424                    circuit: label.clone(),
1425                    fault_kind: outcome_kind_name(&outcome).to_string(),
1426                    fault_step,
1427                    timestamp_ms: now_ms(),
1428                };
1429
1430                if run_compensation(
1431                    compensation,
1432                    context,
1433                    compensation_retry_policy,
1434                    compensation_idempotency.clone(),
1435                )
1436                .await
1437                {
1438                    persist_execution_event(
1439                        handle,
1440                        &trace_id,
1441                        &label,
1442                        &version,
1443                        fault_step.saturating_add(1),
1444                        None,
1445                        "Compensated",
1446                        None,
1447                    )
1448                    .await;
1449                    completion = CompletionState::Compensated;
1450                }
1451            }
1452
1453            if persistence_auto_complete(bus) {
1454                persist_completion(handle, &trace_id, completion).await;
1455            }
1456        }
1457
1458        if should_capture {
1459            maybe_export_timeline(bus, &outcome);
1460        }
1461        if inserted_timeline {
1462            let _ = bus.remove::<Timeline>();
1463        }
1464
1465        outcome
1466    }
1467
1468    /// Starts the Ranvier Inspector for this Axon on the specified port.
1469    /// This spawns a background task to serve the Schematic.
1470    pub fn serve_inspector(self, port: u16) -> Self {
1471        if !inspector_dev_mode_from_env() {
1472            tracing::info!("Inspector disabled because RANVIER_MODE is production");
1473            return self;
1474        }
1475        if !inspector_enabled_from_env() {
1476            tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
1477            return self;
1478        }
1479
1480        let schematic = self.schematic.clone();
1481        let axon_inspector = Arc::new(self.clone());
1482        tokio::spawn(async move {
1483            if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
1484                .with_projection_files_from_env()
1485                .with_mode_from_env()
1486                .with_auth_policy_from_env()
1487                .with_state_inspector(axon_inspector)
1488                .serve()
1489                .await
1490            {
1491                tracing::error!("Inspector server failed: {}", e);
1492            }
1493        });
1494        self
1495    }
1496
1497    /// Get a reference to the Schematic (structural view).
1498    pub fn schematic(&self) -> &Schematic {
1499        &self.schematic
1500    }
1501
1502    /// Consume and return the Schematic.
1503    pub fn into_schematic(self) -> Schematic {
1504        self.schematic
1505    }
1506
1507    /// Detect schematic export mode from runtime flags.
1508    ///
1509    /// Supported triggers:
1510    /// - `RANVIER_SCHEMATIC=1|true|on|yes`
1511    /// - `--schematic`
1512    ///
1513    /// Optional output path:
1514    /// - `RANVIER_SCHEMATIC_OUTPUT=<path>`
1515    /// - `--schematic-output <path>` / `--schematic-output=<path>`
1516    /// - `--output <path>` / `--output=<path>` (only relevant in schematic mode)
1517    pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
1518        schematic_export_request_from_process()
1519    }
1520
1521    /// Export schematic and return `true` when schematic mode is active.
1522    ///
1523    /// Use this once after circuit construction and before server/custom loops:
1524    ///
1525    /// ```rust,ignore
1526    /// let axon = build_axon();
1527    /// if axon.maybe_export_and_exit()? {
1528    ///     return Ok(());
1529    /// }
1530    /// // Normal runtime path...
1531    /// ```
1532    pub fn maybe_export_and_exit(&self) -> anyhow::Result<bool> {
1533        self.maybe_export_and_exit_with(|_| ())
1534    }
1535
1536    /// Same as [`Self::maybe_export_and_exit`] but allows a custom hook right before export/exit.
1537    ///
1538    /// This is useful when your app has custom loop/bootstrap behavior and you want
1539    /// to skip or cleanup that logic in schematic mode.
1540    pub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> anyhow::Result<bool>
1541    where
1542        F: FnOnce(&SchematicExportRequest),
1543    {
1544        let Some(request) = self.schematic_export_request() else {
1545            return Ok(false);
1546        };
1547        on_before_exit(&request);
1548        self.export_schematic(&request)?;
1549        Ok(true)
1550    }
1551
1552    /// Export schematic according to the provided request.
1553    pub fn export_schematic(&self, request: &SchematicExportRequest) -> anyhow::Result<()> {
1554        let json = serde_json::to_string_pretty(self.schematic())?;
1555        if let Some(path) = &request.output {
1556            if let Some(parent) = path.parent()
1557                && !parent.as_os_str().is_empty()
1558            {
1559                fs::create_dir_all(parent)?;
1560            }
1561            fs::write(path, json.as_bytes())?;
1562            return Ok(());
1563        }
1564        println!("{}", json);
1565        Ok(())
1566    }
1567}
1568
1569fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
1570    let args: Vec<OsString> = std::env::args_os().skip(1).collect();
1571    let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
1572    let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
1573
1574    let mut i = 0;
1575    while i < args.len() {
1576        let arg = args[i].to_string_lossy();
1577
1578        if arg == "--schematic" {
1579            enabled = true;
1580            i += 1;
1581            continue;
1582        }
1583
1584        if arg == "--schematic-output" || arg == "--output" {
1585            if let Some(next) = args.get(i + 1) {
1586                output = Some(PathBuf::from(next));
1587                i += 2;
1588                continue;
1589            }
1590        } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
1591            output = Some(PathBuf::from(value));
1592        } else if let Some(value) = arg.strip_prefix("--output=") {
1593            output = Some(PathBuf::from(value));
1594        }
1595
1596        i += 1;
1597    }
1598
1599    if enabled {
1600        Some(SchematicExportRequest { output })
1601    } else {
1602        None
1603    }
1604}
1605
1606fn env_flag_is_true(key: &str) -> bool {
1607    match std::env::var(key) {
1608        Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
1609        Err(_) => false,
1610    }
1611}
1612
1613fn inspector_enabled_from_env() -> bool {
1614    let raw = std::env::var("RANVIER_INSPECTOR").ok();
1615    inspector_enabled_from_value(raw.as_deref())
1616}
1617
1618fn inspector_enabled_from_value(value: Option<&str>) -> bool {
1619    match value {
1620        Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
1621        None => true,
1622    }
1623}
1624
1625fn inspector_dev_mode_from_env() -> bool {
1626    let raw = std::env::var("RANVIER_MODE").ok();
1627    inspector_dev_mode_from_value(raw.as_deref())
1628}
1629
1630fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
1631    !matches!(
1632        value.map(|v| v.to_ascii_lowercase()),
1633        Some(mode) if mode == "prod" || mode == "production"
1634    )
1635}
1636
1637fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
1638    let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
1639        Ok(v) if !v.trim().is_empty() => v,
1640        _ => return,
1641    };
1642
1643    let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
1644    let policy = timeline_adaptive_policy();
1645    let forced = should_force_export(outcome, &policy);
1646    let should_export = sampled || forced;
1647    if !should_export {
1648        record_sampling_stats(false, sampled, forced, "none", &policy);
1649        return;
1650    }
1651
1652    let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
1653    timeline.sort();
1654
1655    let mode = std::env::var("RANVIER_TIMELINE_MODE")
1656        .unwrap_or_else(|_| "overwrite".to_string())
1657        .to_ascii_lowercase();
1658
1659    if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
1660        tracing::warn!(
1661            "Failed to persist timeline file {} (mode={}): {}",
1662            path,
1663            mode,
1664            err
1665        );
1666        record_sampling_stats(false, sampled, forced, &mode, &policy);
1667    } else {
1668        record_sampling_stats(true, sampled, forced, &mode, &policy);
1669    }
1670}
1671
1672fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
1673    match outcome {
1674        Outcome::Next(_) => "Next".to_string(),
1675        Outcome::Branch(id, _) => format!("Branch:{}", id),
1676        Outcome::Jump(id, _) => format!("Jump:{}", id),
1677        Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
1678        Outcome::Fault(_) => "Fault".to_string(),
1679    }
1680}
1681
1682fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
1683    match outcome {
1684        Outcome::Next(_) => "Next",
1685        Outcome::Branch(_, _) => "Branch",
1686        Outcome::Jump(_, _) => "Jump",
1687        Outcome::Emit(_, _) => "Emit",
1688        Outcome::Fault(_) => "Fault",
1689    }
1690}
1691
1692fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
1693    match outcome {
1694        Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
1695        Outcome::Jump(node_id, _) => Some(node_id.to_string()),
1696        Outcome::Emit(event_type, _) => Some(event_type.clone()),
1697        Outcome::Next(_) | Outcome::Fault(_) => None,
1698    }
1699}
1700
1701fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
1702    match outcome {
1703        Outcome::Fault(_) => CompletionState::Fault,
1704        _ => CompletionState::Success,
1705    }
1706}
1707
1708fn persistence_trace_id(bus: &Bus) -> String {
1709    if let Some(explicit) = bus.read::<PersistenceTraceId>() {
1710        explicit.0.clone()
1711    } else {
1712        format!("{}:{}", bus.id, now_ms())
1713    }
1714}
1715
1716fn persistence_auto_complete(bus: &Bus) -> bool {
1717    bus.read::<PersistenceAutoComplete>()
1718        .map(|v| v.0)
1719        .unwrap_or(true)
1720}
1721
1722fn compensation_auto_trigger(bus: &Bus) -> bool {
1723    bus.read::<CompensationAutoTrigger>()
1724        .map(|v| v.0)
1725        .unwrap_or(true)
1726}
1727
1728fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
1729    bus.read::<CompensationRetryPolicy>()
1730        .copied()
1731        .unwrap_or_default()
1732}
1733
1734/// Unwrap the Outcome enum layer from a persisted event payload.
1735///
1736/// Events are stored with `outcome.to_json_value()` which serializes the full
1737/// Outcome enum, e.g. `{"Next": {"order_id": "1001", ...}}`. The resumption
1738/// handler expects the raw inner value, so we extract it here.
1739fn unwrap_outcome_payload(payload: Option<&serde_json::Value>) -> Option<serde_json::Value> {
1740    payload.map(|p| {
1741        p.get("Next")
1742            .or_else(|| p.get("Branch"))
1743            .or_else(|| p.get("Jump"))
1744            .cloned()
1745            .unwrap_or_else(|| p.clone())
1746    })
1747}
1748
1749async fn load_persistence_version(
1750    handle: &PersistenceHandle,
1751    trace_id: &str,
1752) -> (
1753    u64,
1754    Option<String>,
1755    Option<crate::persistence::Intervention>,
1756    Option<String>,
1757    Option<serde_json::Value>,
1758) {
1759    let store = handle.store();
1760    match store.load(trace_id).await {
1761        Ok(Some(trace)) => {
1762            let (next_step, last_node_id, last_payload) =
1763                if let Some(resume_from_step) = trace.resumed_from_step {
1764                    let anchor_event = trace
1765                        .events
1766                        .iter()
1767                        .rev()
1768                        .find(|event| {
1769                            event.step <= resume_from_step
1770                                && event.outcome_kind == "Next"
1771                                && event.payload.is_some()
1772                        })
1773                        .or_else(|| {
1774                            trace.events.iter().rev().find(|event| {
1775                                event.step <= resume_from_step
1776                                    && event.outcome_kind != "Fault"
1777                                    && event.payload.is_some()
1778                            })
1779                        })
1780                        .or_else(|| {
1781                            trace.events.iter().rev().find(|event| {
1782                                event.step <= resume_from_step && event.payload.is_some()
1783                            })
1784                        })
1785                        .or_else(|| trace.events.last());
1786
1787                    (
1788                        resume_from_step.saturating_add(1),
1789                        anchor_event.and_then(|event| event.node_id.clone()),
1790                        anchor_event.and_then(|event| {
1791                            unwrap_outcome_payload(event.payload.as_ref())
1792                        }),
1793                    )
1794                } else {
1795                    let last_event = trace.events.last();
1796                    (
1797                        last_event
1798                            .map(|event| event.step.saturating_add(1))
1799                            .unwrap_or(0),
1800                        last_event.and_then(|event| event.node_id.clone()),
1801                        last_event.and_then(|event| {
1802                            unwrap_outcome_payload(event.payload.as_ref())
1803                        }),
1804                    )
1805                };
1806
1807            (
1808                next_step,
1809                Some(trace.schematic_version),
1810                trace.interventions.last().cloned(),
1811                last_node_id,
1812                last_payload,
1813            )
1814        }
1815        Ok(None) => (0, None, None, None, None),
1816        Err(err) => {
1817            tracing::warn!(
1818                trace_id = %trace_id,
1819                error = %err,
1820                "Failed to load persistence trace; falling back to step=0"
1821            );
1822            (0, None, None, None, None)
1823        }
1824    }
1825}
1826
1827#[allow(clippy::too_many_arguments)]
1828async fn run_this_step<In, Out, E, Res>(
1829    trans: &(impl Transition<In, Out, Resources = Res, Error = E> + Clone + 'static),
1830    state: In,
1831    res: &Res,
1832    bus: &mut Bus,
1833    node_id: &str,
1834    node_label: &str,
1835    bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
1836    step_idx: u64,
1837) -> Outcome<Out, E>
1838where
1839    In: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
1840    Out: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
1841    E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
1842    Res: ranvier_core::transition::ResourceRequirement,
1843{
1844    let label = trans.label();
1845    let res_type = std::any::type_name::<Res>()
1846        .split("::")
1847        .last()
1848        .unwrap_or("unknown");
1849
1850    // Debug pausing
1851    let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
1852        debug.should_pause(node_id)
1853    } else {
1854        false
1855    };
1856
1857    if should_pause {
1858        let trace_id = persistence_trace_id(bus);
1859        tracing::event!(
1860            target: "ranvier.debugger",
1861            tracing::Level::INFO,
1862            trace_id = %trace_id,
1863            node_id = %node_id,
1864            "Node paused"
1865        );
1866
1867        if let Some(timeline) = bus.read_mut::<Timeline>() {
1868            timeline.push(TimelineEvent::NodePaused {
1869                node_id: node_id.to_string(),
1870                timestamp: now_ms(),
1871            });
1872        }
1873        if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
1874            debug.wait().await;
1875        }
1876    }
1877
1878    let enter_ts = now_ms();
1879    if let Some(timeline) = bus.read_mut::<Timeline>() {
1880        timeline.push(TimelineEvent::NodeEnter {
1881            node_id: node_id.to_string(),
1882            node_label: node_label.to_string(),
1883            timestamp: enter_ts,
1884        });
1885    }
1886
1887    // Check DLQ retry policy and pre-serialize state for potential retries
1888    let dlq_retry_config = bus.read::<DlqPolicy>().and_then(|p| {
1889        if let DlqPolicy::RetryThenDlq {
1890            max_attempts,
1891            backoff_ms,
1892        } = p
1893        {
1894            Some((*max_attempts, *backoff_ms))
1895        } else {
1896            None
1897        }
1898    });
1899    let retry_state_snapshot = if dlq_retry_config.is_some() {
1900        serde_json::to_value(&state).ok()
1901    } else {
1902        None
1903    };
1904
1905    // State capture for Saga - SERIALIZE BEFORE CONSUMPTION
1906    let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
1907        Some(serde_json::to_vec(&state).unwrap_or_default())
1908    } else {
1909        None
1910    };
1911
1912    let node_span = tracing::info_span!(
1913        "Node",
1914        ranvier.node = %label,
1915        ranvier.resource_type = %res_type,
1916        ranvier.outcome_kind = tracing::field::Empty,
1917        ranvier.outcome_target = tracing::field::Empty
1918    );
1919    let started = std::time::Instant::now();
1920    bus.set_access_policy(label.clone(), bus_policy.clone());
1921    let result = trans
1922        .run(state, res, bus)
1923        .instrument(node_span.clone())
1924        .await;
1925    bus.clear_access_policy();
1926
1927    // DLQ Retry loop: if first attempt faulted and RetryThenDlq is configured,
1928    // retry with exponential backoff before giving up.
1929    let result = if let Outcome::Fault(_) = &result {
1930        if let (Some((max_attempts, backoff_ms)), Some(snapshot)) =
1931            (dlq_retry_config, &retry_state_snapshot)
1932        {
1933            let mut final_result = result;
1934            // attempt 1 already done; retry from 2..=max_attempts
1935            for attempt in 2..=max_attempts {
1936                let delay = backoff_ms.saturating_mul(2u64.saturating_pow(attempt - 2));
1937
1938                tracing::info!(
1939                    ranvier.node = %label,
1940                    attempt = attempt,
1941                    max_attempts = max_attempts,
1942                    backoff_ms = delay,
1943                    "Retrying faulted node"
1944                );
1945
1946                if let Some(timeline) = bus.read_mut::<Timeline>() {
1947                    timeline.push(TimelineEvent::NodeRetry {
1948                        node_id: node_id.to_string(),
1949                        attempt,
1950                        max_attempts,
1951                        backoff_ms: delay,
1952                        timestamp: now_ms(),
1953                    });
1954                }
1955
1956                tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
1957
1958                if let Ok(retry_state) = serde_json::from_value::<In>(snapshot.clone()) {
1959                    bus.set_access_policy(label.clone(), bus_policy.clone());
1960                    let retry_result = trans
1961                        .run(retry_state, res, bus)
1962                        .instrument(tracing::info_span!(
1963                            "NodeRetry",
1964                            ranvier.node = %label,
1965                            attempt = attempt
1966                        ))
1967                        .await;
1968                    bus.clear_access_policy();
1969
1970                    match &retry_result {
1971                        Outcome::Fault(_) => {
1972                            final_result = retry_result;
1973                        }
1974                        _ => {
1975                            final_result = retry_result;
1976                            break;
1977                        }
1978                    }
1979                }
1980            }
1981            final_result
1982        } else {
1983            result
1984        }
1985    } else {
1986        result
1987    };
1988
1989    node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
1990    if let Some(target) = outcome_target(&result) {
1991        node_span.record("ranvier.outcome_target", tracing::field::display(&target));
1992    }
1993    let duration_ms = started.elapsed().as_millis() as u64;
1994    let exit_ts = now_ms();
1995
1996    if let Some(timeline) = bus.read_mut::<Timeline>() {
1997        timeline.push(TimelineEvent::NodeExit {
1998            node_id: node_id.to_string(),
1999            outcome_type: outcome_type_name(&result),
2000            duration_ms,
2001            timestamp: exit_ts,
2002        });
2003
2004        if let Outcome::Branch(branch_id, _) = &result {
2005            timeline.push(TimelineEvent::Branchtaken {
2006                branch_id: branch_id.clone(),
2007                timestamp: exit_ts,
2008            });
2009        }
2010    }
2011
2012    // Push to Saga Stack if Next outcome and snapshot taken
2013    if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot)
2014        && let Some(stack) = bus.read_mut::<SagaStack>()
2015    {
2016        stack.push(node_id.to_string(), label.clone(), snapshot);
2017    }
2018
2019    if let Some(handle) = bus.read::<PersistenceHandle>() {
2020        let trace_id = persistence_trace_id(bus);
2021        let circuit = bus
2022            .read::<ranvier_core::schematic::Schematic>()
2023            .map(|s| s.name.clone())
2024            .unwrap_or_default();
2025        let version = bus
2026            .read::<ranvier_core::schematic::Schematic>()
2027            .map(|s| s.schema_version.clone())
2028            .unwrap_or_default();
2029
2030        persist_execution_event(
2031            handle,
2032            &trace_id,
2033            &circuit,
2034            &version,
2035            step_idx,
2036            Some(node_id.to_string()),
2037            outcome_kind_name(&result),
2038            Some(result.to_json_value()),
2039        )
2040        .await;
2041    }
2042
2043    // DLQ reporting — only fires after all retries are exhausted (RetryThenDlq)
2044    // or immediately (SendToDlq). Drop policy skips entirely.
2045    if let Outcome::Fault(f) = &result {
2046        // Read policy and sink, then drop the borrows before mutable timeline access
2047        let dlq_action = {
2048            let policy = bus.read::<DlqPolicy>();
2049            let sink = bus.read::<Arc<dyn DlqSink>>();
2050            match (sink, policy) {
2051                (Some(s), Some(p)) if !matches!(p, DlqPolicy::Drop) => Some(s.clone()),
2052                _ => None,
2053            }
2054        };
2055
2056        if let Some(sink) = dlq_action {
2057            if let Some((max_attempts, _)) = dlq_retry_config
2058                && let Some(timeline) = bus.read_mut::<Timeline>()
2059            {
2060                timeline.push(TimelineEvent::DlqExhausted {
2061                    node_id: node_id.to_string(),
2062                    total_attempts: max_attempts,
2063                    timestamp: now_ms(),
2064                });
2065            }
2066
2067            let trace_id = persistence_trace_id(bus);
2068            let circuit = bus
2069                .read::<ranvier_core::schematic::Schematic>()
2070                .map(|s| s.name.clone())
2071                .unwrap_or_default();
2072            let _ = sink
2073                .store_dead_letter(
2074                    &trace_id,
2075                    &circuit,
2076                    node_id,
2077                    &format!("{:?}", f),
2078                    &serde_json::to_vec(&f).unwrap_or_default(),
2079                )
2080                .await;
2081        }
2082    }
2083
2084    result
2085}
2086
2087#[allow(clippy::too_many_arguments)]
2088async fn run_this_compensated_step<Out, Next, E, Res, Comp>(
2089    trans: &(impl Transition<Out, Next, Resources = Res, Error = E> + Clone + 'static),
2090    comp: &Comp,
2091    state: Out,
2092    res: &Res,
2093    bus: &mut Bus,
2094    node_id: &str,
2095    comp_node_id: &str,
2096    node_label: &str,
2097    bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2098    step_idx: u64,
2099) -> Outcome<Next, E>
2100where
2101    Out: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
2102    Next: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2103    E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2104    Res: ranvier_core::transition::ResourceRequirement,
2105    Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
2106{
2107    let label = trans.label();
2108
2109    // Debug pausing
2110    let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2111        debug.should_pause(node_id)
2112    } else {
2113        false
2114    };
2115
2116    if should_pause {
2117        let trace_id = persistence_trace_id(bus);
2118        tracing::event!(
2119            target: "ranvier.debugger",
2120            tracing::Level::INFO,
2121            trace_id = %trace_id,
2122            node_id = %node_id,
2123            "Node paused (compensated)"
2124        );
2125
2126        if let Some(timeline) = bus.read_mut::<Timeline>() {
2127            timeline.push(TimelineEvent::NodePaused {
2128                node_id: node_id.to_string(),
2129                timestamp: now_ms(),
2130            });
2131        }
2132        if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2133            debug.wait().await;
2134        }
2135    }
2136
2137    let enter_ts = now_ms();
2138    if let Some(timeline) = bus.read_mut::<Timeline>() {
2139        timeline.push(TimelineEvent::NodeEnter {
2140            node_id: node_id.to_string(),
2141            node_label: node_label.to_string(),
2142            timestamp: enter_ts,
2143        });
2144    }
2145
2146    // State capture for Saga - SERIALIZE BEFORE CONSUMPTION
2147    let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2148        Some(serde_json::to_vec(&state).unwrap_or_default())
2149    } else {
2150        None
2151    };
2152
2153    let node_span = tracing::info_span!("Node", ranvier.node = %label);
2154    bus.set_access_policy(label.clone(), bus_policy.clone());
2155    let result = trans
2156        .run(state.clone(), res, bus)
2157        .instrument(node_span)
2158        .await;
2159    bus.clear_access_policy();
2160
2161    let duration_ms = 0; // Simplified
2162    let exit_ts = now_ms();
2163
2164    if let Some(timeline) = bus.read_mut::<Timeline>() {
2165        timeline.push(TimelineEvent::NodeExit {
2166            node_id: node_id.to_string(),
2167            outcome_type: outcome_type_name(&result),
2168            duration_ms,
2169            timestamp: exit_ts,
2170        });
2171    }
2172
2173    // Automated Compensation Trigger
2174    if let Outcome::Fault(ref err) = result {
2175        if compensation_auto_trigger(bus) {
2176            tracing::info!(
2177                ranvier.node = %label,
2178                ranvier.compensation.trigger = "saga",
2179                error = ?err,
2180                "Saga compensation triggered"
2181            );
2182
2183            if let Some(timeline) = bus.read_mut::<Timeline>() {
2184                timeline.push(TimelineEvent::NodeEnter {
2185                    node_id: comp_node_id.to_string(),
2186                    node_label: format!("Compensate: {}", comp.label()),
2187                    timestamp: exit_ts,
2188                });
2189            }
2190
2191            // Run compensation
2192            let _ = comp.run(state, res, bus).await;
2193
2194            if let Some(timeline) = bus.read_mut::<Timeline>() {
2195                timeline.push(TimelineEvent::NodeExit {
2196                    node_id: comp_node_id.to_string(),
2197                    outcome_type: "Compensated".to_string(),
2198                    duration_ms: 0,
2199                    timestamp: now_ms(),
2200                });
2201            }
2202
2203            if let Some(handle) = bus.read::<PersistenceHandle>() {
2204                let trace_id = persistence_trace_id(bus);
2205                let circuit = bus
2206                    .read::<ranvier_core::schematic::Schematic>()
2207                    .map(|s| s.name.clone())
2208                    .unwrap_or_default();
2209                let version = bus
2210                    .read::<ranvier_core::schematic::Schematic>()
2211                    .map(|s| s.schema_version.clone())
2212                    .unwrap_or_default();
2213
2214                persist_execution_event(
2215                    handle,
2216                    &trace_id,
2217                    &circuit,
2218                    &version,
2219                    step_idx + 1, // Compensation node index
2220                    Some(comp_node_id.to_string()),
2221                    "Compensated",
2222                    None,
2223                )
2224                .await;
2225            }
2226        }
2227    } else if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot) {
2228        // Push to Saga Stack if Next outcome and snapshot taken
2229        if let Some(stack) = bus.read_mut::<SagaStack>() {
2230            stack.push(node_id.to_string(), label.clone(), snapshot);
2231        }
2232
2233        if let Some(handle) = bus.read::<PersistenceHandle>() {
2234            let trace_id = persistence_trace_id(bus);
2235            let circuit = bus
2236                .read::<ranvier_core::schematic::Schematic>()
2237                .map(|s| s.name.clone())
2238                .unwrap_or_default();
2239            let version = bus
2240                .read::<ranvier_core::schematic::Schematic>()
2241                .map(|s| s.schema_version.clone())
2242                .unwrap_or_default();
2243
2244            persist_execution_event(
2245                handle,
2246                &trace_id,
2247                &circuit,
2248                &version,
2249                step_idx,
2250                Some(node_id.to_string()),
2251                outcome_kind_name(&result),
2252                Some(result.to_json_value()),
2253            )
2254            .await;
2255        }
2256    }
2257
2258    // DLQ reporting for compensated steps
2259    if let Outcome::Fault(f) = &result
2260        && let (Some(sink), Some(policy)) =
2261            (bus.read::<Arc<dyn DlqSink>>(), bus.read::<DlqPolicy>())
2262    {
2263        let should_dlq = !matches!(policy, DlqPolicy::Drop);
2264        if should_dlq {
2265            let trace_id = persistence_trace_id(bus);
2266            let circuit = bus
2267                .read::<ranvier_core::schematic::Schematic>()
2268                .map(|s| s.name.clone())
2269                .unwrap_or_default();
2270            let _ = sink
2271                .store_dead_letter(
2272                    &trace_id,
2273                    &circuit,
2274                    node_id,
2275                    &format!("{:?}", f),
2276                    &serde_json::to_vec(&f).unwrap_or_default(),
2277                )
2278                .await;
2279        }
2280    }
2281
2282    result
2283}
2284
2285#[allow(clippy::too_many_arguments)]
2286pub async fn persist_execution_event(
2287    handle: &PersistenceHandle,
2288    trace_id: &str,
2289    circuit: &str,
2290    schematic_version: &str,
2291    step: u64,
2292    node_id: Option<String>,
2293    outcome_kind: &str,
2294    payload: Option<serde_json::Value>,
2295) {
2296    let store = handle.store();
2297    let envelope = PersistenceEnvelope {
2298        trace_id: trace_id.to_string(),
2299        circuit: circuit.to_string(),
2300        schematic_version: schematic_version.to_string(),
2301        step,
2302        node_id,
2303        outcome_kind: outcome_kind.to_string(),
2304        timestamp_ms: now_ms(),
2305        payload_hash: None,
2306        payload,
2307    };
2308
2309    if let Err(err) = store.append(envelope).await {
2310        tracing::warn!(
2311            trace_id = %trace_id,
2312            circuit = %circuit,
2313            step,
2314            outcome_kind = %outcome_kind,
2315            error = %err,
2316            "Failed to append persistence envelope"
2317        );
2318    }
2319}
2320
2321async fn persist_completion(
2322    handle: &PersistenceHandle,
2323    trace_id: &str,
2324    completion: CompletionState,
2325) {
2326    let store = handle.store();
2327    if let Err(err) = store.complete(trace_id, completion).await {
2328        tracing::warn!(
2329            trace_id = %trace_id,
2330            error = %err,
2331            "Failed to complete persistence trace"
2332        );
2333    }
2334}
2335
2336fn compensation_idempotency_key(context: &CompensationContext) -> String {
2337    format!(
2338        "{}:{}:{}",
2339        context.trace_id, context.circuit, context.fault_kind
2340    )
2341}
2342
2343async fn run_compensation(
2344    handle: &CompensationHandle,
2345    context: CompensationContext,
2346    retry_policy: CompensationRetryPolicy,
2347    idempotency: Option<CompensationIdempotencyHandle>,
2348) -> bool {
2349    let hook = handle.hook();
2350    let key = compensation_idempotency_key(&context);
2351
2352    if let Some(store_handle) = idempotency.as_ref() {
2353        let store = store_handle.store();
2354        match store.was_compensated(&key).await {
2355            Ok(true) => {
2356                tracing::info!(
2357                    trace_id = %context.trace_id,
2358                    circuit = %context.circuit,
2359                    key = %key,
2360                    "Compensation already recorded; skipping duplicate hook execution"
2361                );
2362                return true;
2363            }
2364            Ok(false) => {}
2365            Err(err) => {
2366                tracing::warn!(
2367                    trace_id = %context.trace_id,
2368                    key = %key,
2369                    error = %err,
2370                    "Failed to check compensation idempotency state"
2371                );
2372            }
2373        }
2374    }
2375
2376    let max_attempts = retry_policy.max_attempts.max(1);
2377    for attempt in 1..=max_attempts {
2378        match hook.compensate(context.clone()).await {
2379            Ok(()) => {
2380                if let Some(store_handle) = idempotency.as_ref() {
2381                    let store = store_handle.store();
2382                    if let Err(err) = store.mark_compensated(&key).await {
2383                        tracing::warn!(
2384                            trace_id = %context.trace_id,
2385                            key = %key,
2386                            error = %err,
2387                            "Failed to mark compensation idempotency state"
2388                        );
2389                    }
2390                }
2391                return true;
2392            }
2393            Err(err) => {
2394                let is_last = attempt == max_attempts;
2395                tracing::warn!(
2396                    trace_id = %context.trace_id,
2397                    circuit = %context.circuit,
2398                    fault_kind = %context.fault_kind,
2399                    fault_step = context.fault_step,
2400                    attempt,
2401                    max_attempts,
2402                    error = %err,
2403                    "Compensation hook attempt failed"
2404                );
2405                if !is_last && retry_policy.backoff_ms > 0 {
2406                    tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
2407                        .await;
2408                }
2409            }
2410        }
2411    }
2412    false
2413}
2414
2415fn ensure_timeline(bus: &mut Bus) -> bool {
2416    if bus.has::<Timeline>() {
2417        false
2418    } else {
2419        bus.insert(Timeline::new());
2420        true
2421    }
2422}
2423
2424fn should_attach_timeline(bus: &Bus) -> bool {
2425    // Respect explicitly provided timeline collector from caller.
2426    if bus.has::<Timeline>() {
2427        return true;
2428    }
2429
2430    // Attach timeline when runtime export path exists.
2431    has_timeline_output_path()
2432}
2433
2434fn has_timeline_output_path() -> bool {
2435    std::env::var("RANVIER_TIMELINE_OUTPUT")
2436        .ok()
2437        .map(|v| !v.trim().is_empty())
2438        .unwrap_or(false)
2439}
2440
2441fn timeline_sample_rate() -> f64 {
2442    std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
2443        .ok()
2444        .and_then(|v| v.parse::<f64>().ok())
2445        .map(|v| v.clamp(0.0, 1.0))
2446        .unwrap_or(1.0)
2447}
2448
2449fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
2450    if rate <= 0.0 {
2451        return false;
2452    }
2453    if rate >= 1.0 {
2454        return true;
2455    }
2456    let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
2457    bucket < rate
2458}
2459
2460fn timeline_adaptive_policy() -> String {
2461    std::env::var("RANVIER_TIMELINE_ADAPTIVE")
2462        .unwrap_or_else(|_| "fault_branch".to_string())
2463        .to_ascii_lowercase()
2464}
2465
2466fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
2467    match policy {
2468        "off" => false,
2469        "fault_only" => matches!(outcome, Outcome::Fault(_)),
2470        "fault_branch_emit" => {
2471            matches!(
2472                outcome,
2473                Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
2474            )
2475        }
2476        _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
2477    }
2478}
2479
2480#[derive(Default, Clone)]
2481struct SamplingStats {
2482    total_decisions: u64,
2483    exported: u64,
2484    skipped: u64,
2485    sampled_exports: u64,
2486    forced_exports: u64,
2487    last_mode: String,
2488    last_policy: String,
2489    last_updated_ms: u64,
2490}
2491
2492static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
2493
2494fn stats_cell() -> &'static Mutex<SamplingStats> {
2495    TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
2496}
2497
2498fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
2499    let snapshot = {
2500        let mut stats = match stats_cell().lock() {
2501            Ok(guard) => guard,
2502            Err(_) => return,
2503        };
2504
2505        stats.total_decisions += 1;
2506        if exported {
2507            stats.exported += 1;
2508        } else {
2509            stats.skipped += 1;
2510        }
2511        if sampled && exported {
2512            stats.sampled_exports += 1;
2513        }
2514        if forced && exported {
2515            stats.forced_exports += 1;
2516        }
2517        stats.last_mode = mode.to_string();
2518        stats.last_policy = policy.to_string();
2519        stats.last_updated_ms = now_ms();
2520        stats.clone()
2521    };
2522
2523    tracing::debug!(
2524        ranvier.timeline.total_decisions = snapshot.total_decisions,
2525        ranvier.timeline.exported = snapshot.exported,
2526        ranvier.timeline.skipped = snapshot.skipped,
2527        ranvier.timeline.sampled_exports = snapshot.sampled_exports,
2528        ranvier.timeline.forced_exports = snapshot.forced_exports,
2529        ranvier.timeline.mode = %snapshot.last_mode,
2530        ranvier.timeline.policy = %snapshot.last_policy,
2531        "Timeline sampling stats updated"
2532    );
2533
2534    if let Some(path) = timeline_stats_output_path() {
2535        let payload = serde_json::json!({
2536            "total_decisions": snapshot.total_decisions,
2537            "exported": snapshot.exported,
2538            "skipped": snapshot.skipped,
2539            "sampled_exports": snapshot.sampled_exports,
2540            "forced_exports": snapshot.forced_exports,
2541            "last_mode": snapshot.last_mode,
2542            "last_policy": snapshot.last_policy,
2543            "last_updated_ms": snapshot.last_updated_ms
2544        });
2545        if let Some(parent) = Path::new(&path).parent() {
2546            let _ = fs::create_dir_all(parent);
2547        }
2548        if let Err(err) = fs::write(&path, payload.to_string()) {
2549            tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
2550        }
2551    }
2552}
2553
2554fn timeline_stats_output_path() -> Option<String> {
2555    std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
2556        .ok()
2557        .filter(|v| !v.trim().is_empty())
2558}
2559
2560fn write_timeline_with_policy(
2561    path: &str,
2562    mode: &str,
2563    mut timeline: Timeline,
2564) -> Result<(), String> {
2565    match mode {
2566        "append" => {
2567            if Path::new(path).exists() {
2568                let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
2569                match serde_json::from_str::<Timeline>(&content) {
2570                    Ok(mut existing) => {
2571                        existing.events.append(&mut timeline.events);
2572                        existing.sort();
2573                        if let Some(max_events) = max_events_limit() {
2574                            truncate_timeline_events(&mut existing, max_events);
2575                        }
2576                        write_timeline_json(path, &existing)
2577                    }
2578                    Err(_) => {
2579                        // Fallback: if existing is invalid, replace with current timeline
2580                        if let Some(max_events) = max_events_limit() {
2581                            truncate_timeline_events(&mut timeline, max_events);
2582                        }
2583                        write_timeline_json(path, &timeline)
2584                    }
2585                }
2586            } else {
2587                if let Some(max_events) = max_events_limit() {
2588                    truncate_timeline_events(&mut timeline, max_events);
2589                }
2590                write_timeline_json(path, &timeline)
2591            }
2592        }
2593        "rotate" => {
2594            let rotated_path = rotated_path(path, now_ms());
2595            write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
2596            if let Some(keep) = rotate_keep_limit() {
2597                cleanup_rotated_files(path, keep)?;
2598            }
2599            Ok(())
2600        }
2601        _ => write_timeline_json(path, &timeline),
2602    }
2603}
2604
2605fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
2606    if let Some(parent) = Path::new(path).parent()
2607        && !parent.as_os_str().is_empty()
2608    {
2609        fs::create_dir_all(parent).map_err(|e| e.to_string())?;
2610    }
2611    let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
2612    fs::write(path, json).map_err(|e| e.to_string())
2613}
2614
2615fn rotated_path(path: &str, suffix: u64) -> PathBuf {
2616    let p = Path::new(path);
2617    let parent = p.parent().unwrap_or_else(|| Path::new(""));
2618    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
2619    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
2620    parent.join(format!("{}_{}.{}", stem, suffix, ext))
2621}
2622
2623fn max_events_limit() -> Option<usize> {
2624    std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
2625        .ok()
2626        .and_then(|v| v.parse::<usize>().ok())
2627        .filter(|v| *v > 0)
2628}
2629
2630fn rotate_keep_limit() -> Option<usize> {
2631    std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
2632        .ok()
2633        .and_then(|v| v.parse::<usize>().ok())
2634        .filter(|v| *v > 0)
2635}
2636
2637fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
2638    let len = timeline.events.len();
2639    if len > max_events {
2640        let keep_from = len - max_events;
2641        timeline.events = timeline.events.split_off(keep_from);
2642    }
2643}
2644
2645fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
2646    let p = Path::new(base_path);
2647    let parent = p.parent().unwrap_or_else(|| Path::new("."));
2648    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
2649    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
2650    let prefix = format!("{}_", stem);
2651    let suffix = format!(".{}", ext);
2652
2653    let mut files = fs::read_dir(parent)
2654        .map_err(|e| e.to_string())?
2655        .filter_map(|entry| entry.ok())
2656        .filter(|entry| {
2657            let name = entry.file_name();
2658            let name = name.to_string_lossy();
2659            name.starts_with(&prefix) && name.ends_with(&suffix)
2660        })
2661        .map(|entry| {
2662            let modified = entry
2663                .metadata()
2664                .ok()
2665                .and_then(|m| m.modified().ok())
2666                .unwrap_or(SystemTime::UNIX_EPOCH);
2667            (entry.path(), modified)
2668        })
2669        .collect::<Vec<_>>();
2670
2671    files.sort_by(|a, b| b.1.cmp(&a.1));
2672    for (path, _) in files.into_iter().skip(keep) {
2673        let _ = fs::remove_file(path);
2674    }
2675    Ok(())
2676}
2677
2678fn bus_capability_schema_from_policy(
2679    policy: Option<ranvier_core::bus::BusAccessPolicy>,
2680) -> Option<BusCapabilitySchema> {
2681    let policy = policy?;
2682
2683    let mut allow = policy
2684        .allow
2685        .unwrap_or_default()
2686        .into_iter()
2687        .map(|entry| entry.type_name.to_string())
2688        .collect::<Vec<_>>();
2689    let mut deny = policy
2690        .deny
2691        .into_iter()
2692        .map(|entry| entry.type_name.to_string())
2693        .collect::<Vec<_>>();
2694    allow.sort();
2695    deny.sort();
2696
2697    if allow.is_empty() && deny.is_empty() {
2698        return None;
2699    }
2700
2701    Some(BusCapabilitySchema { allow, deny })
2702}
2703
2704fn now_ms() -> u64 {
2705    SystemTime::now()
2706        .duration_since(UNIX_EPOCH)
2707        .map(|d| d.as_millis() as u64)
2708        .unwrap_or(0)
2709}
2710
2711#[cfg(test)]
2712mod tests {
2713    use super::{
2714        Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
2715        should_force_export,
2716    };
2717    use crate::persistence::{
2718        CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
2719        CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
2720        InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
2721        PersistenceHandle, PersistenceStore, PersistenceTraceId,
2722    };
2723    use anyhow::Result;
2724    use async_trait::async_trait;
2725    use ranvier_audit::{AuditError, AuditEvent, AuditSink};
2726    use ranvier_core::event::{DlqPolicy, DlqSink};
2727    use ranvier_core::saga::SagaStack;
2728    use ranvier_core::timeline::{Timeline, TimelineEvent};
2729    use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
2730    use serde::{Deserialize, Serialize};
2731    use std::sync::Arc;
2732    use tokio::sync::Mutex;
2733    use uuid::Uuid;
2734
2735    struct MockAuditSink {
2736        events: Arc<Mutex<Vec<AuditEvent>>>,
2737    }
2738
2739    #[async_trait]
2740    impl AuditSink for MockAuditSink {
2741        async fn append(&self, event: &AuditEvent) -> Result<(), AuditError> {
2742            self.events.lock().await.push(event.clone());
2743            Ok(())
2744        }
2745    }
2746
2747    #[tokio::test]
2748    async fn execute_logs_audit_events_for_intervention() {
2749        use ranvier_inspector::StateInspector;
2750
2751        let trace_id = "test-audit-trace";
2752        let store_impl = InMemoryPersistenceStore::new();
2753        let events = Arc::new(Mutex::new(Vec::new()));
2754        let sink = MockAuditSink {
2755            events: events.clone(),
2756        };
2757
2758        let axon = Axon::<i32, i32, TestInfallible>::start("AuditTest")
2759            .then(AddOne)
2760            .with_persistence_store(store_impl.clone())
2761            .with_audit_sink(sink);
2762
2763        let mut bus = Bus::new();
2764        bus.insert(PersistenceHandle::from_arc(Arc::new(store_impl.clone())));
2765        bus.insert(PersistenceTraceId::new(trace_id));
2766        let target_node_id = axon.schematic.nodes[0].id.clone();
2767
2768        // 0. Pre-requisite: Save an initial trace state so intervention has a target to attach to
2769        store_impl
2770            .append(crate::persistence::PersistenceEnvelope {
2771                trace_id: trace_id.to_string(),
2772                circuit: "AuditTest".to_string(),
2773                schematic_version: "v1.0".to_string(),
2774                step: 0,
2775                node_id: None,
2776                outcome_kind: "Next".to_string(),
2777                timestamp_ms: 0,
2778                payload_hash: None,
2779                payload: None,
2780            })
2781            .await
2782            .unwrap();
2783
2784        // 1. Trigger force_resume (should log ForceResume)
2785        axon.force_resume(trace_id, &target_node_id, None)
2786            .await
2787            .unwrap();
2788
2789        // 2. Execute (should log ApplyIntervention)
2790        axon.execute(10, &(), &mut bus).await;
2791
2792        let recorded = events.lock().await;
2793        assert_eq!(
2794            recorded.len(),
2795            2,
2796            "Should have 2 audit events: ForceResume and ApplyIntervention"
2797        );
2798        assert_eq!(recorded[0].action, "ForceResume");
2799        assert_eq!(recorded[0].target, trace_id);
2800        assert_eq!(recorded[1].action, "ApplyIntervention");
2801        assert_eq!(recorded[1].target, trace_id);
2802    }
2803
2804    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2805    pub enum TestInfallible {}
2806
2807    #[test]
2808    fn inspector_enabled_flag_matrix() {
2809        assert!(inspector_enabled_from_value(None));
2810        assert!(inspector_enabled_from_value(Some("1")));
2811        assert!(inspector_enabled_from_value(Some("true")));
2812        assert!(inspector_enabled_from_value(Some("on")));
2813        assert!(!inspector_enabled_from_value(Some("0")));
2814        assert!(!inspector_enabled_from_value(Some("false")));
2815    }
2816
2817    #[test]
2818    fn inspector_dev_mode_matrix() {
2819        assert!(inspector_dev_mode_from_value(None));
2820        assert!(inspector_dev_mode_from_value(Some("dev")));
2821        assert!(inspector_dev_mode_from_value(Some("staging")));
2822        assert!(!inspector_dev_mode_from_value(Some("prod")));
2823        assert!(!inspector_dev_mode_from_value(Some("production")));
2824    }
2825
2826    #[test]
2827    fn adaptive_policy_force_export_matrix() {
2828        let next = Outcome::<(), &'static str>::Next(());
2829        let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
2830        let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
2831        let fault = Outcome::<(), &'static str>::Fault("boom");
2832
2833        assert!(!should_force_export(&next, "off"));
2834        assert!(!should_force_export(&fault, "off"));
2835
2836        assert!(!should_force_export(&branch, "fault_only"));
2837        assert!(should_force_export(&fault, "fault_only"));
2838
2839        assert!(should_force_export(&branch, "fault_branch"));
2840        assert!(!should_force_export(&emit, "fault_branch"));
2841        assert!(should_force_export(&fault, "fault_branch"));
2842
2843        assert!(should_force_export(&branch, "fault_branch_emit"));
2844        assert!(should_force_export(&emit, "fault_branch_emit"));
2845        assert!(should_force_export(&fault, "fault_branch_emit"));
2846    }
2847
2848    #[test]
2849    fn sampling_and_adaptive_combination_decisions() {
2850        let bus_id = Uuid::nil();
2851        let next = Outcome::<(), &'static str>::Next(());
2852        let fault = Outcome::<(), &'static str>::Fault("boom");
2853
2854        let sampled_never = sampled_by_bus_id(bus_id, 0.0);
2855        assert!(!sampled_never);
2856        assert!(!(sampled_never || should_force_export(&next, "off")));
2857        assert!(sampled_never || should_force_export(&fault, "fault_only"));
2858
2859        let sampled_always = sampled_by_bus_id(bus_id, 1.0);
2860        assert!(sampled_always);
2861        assert!(sampled_always || should_force_export(&next, "off"));
2862        assert!(sampled_always || should_force_export(&fault, "off"));
2863    }
2864
2865    #[derive(Clone)]
2866    struct AddOne;
2867
2868    #[async_trait]
2869    impl Transition<i32, i32> for AddOne {
2870        type Error = TestInfallible;
2871        type Resources = ();
2872
2873        async fn run(
2874            &self,
2875            state: i32,
2876            _resources: &Self::Resources,
2877            _bus: &mut Bus,
2878        ) -> Outcome<i32, Self::Error> {
2879            Outcome::Next(state + 1)
2880        }
2881    }
2882
2883    #[derive(Clone)]
2884    struct AlwaysFault;
2885
2886    #[async_trait]
2887    impl Transition<i32, i32> for AlwaysFault {
2888        type Error = String;
2889        type Resources = ();
2890
2891        async fn run(
2892            &self,
2893            _state: i32,
2894            _resources: &Self::Resources,
2895            _bus: &mut Bus,
2896        ) -> Outcome<i32, Self::Error> {
2897            Outcome::Fault("boom".to_string())
2898        }
2899    }
2900
2901    #[derive(Clone)]
2902    struct CapabilityGuarded;
2903
2904    #[async_trait]
2905    impl Transition<(), ()> for CapabilityGuarded {
2906        type Error = String;
2907        type Resources = ();
2908
2909        fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
2910            Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
2911        }
2912
2913        async fn run(
2914            &self,
2915            _state: (),
2916            _resources: &Self::Resources,
2917            bus: &mut Bus,
2918        ) -> Outcome<(), Self::Error> {
2919            match bus.get::<String>() {
2920                Ok(_) => Outcome::Next(()),
2921                Err(err) => Outcome::Fault(err.to_string()),
2922            }
2923        }
2924    }
2925
2926    #[derive(Clone)]
2927    struct RecordingCompensationHook {
2928        calls: Arc<Mutex<Vec<CompensationContext>>>,
2929        should_fail: bool,
2930    }
2931
2932    #[async_trait]
2933    impl CompensationHook for RecordingCompensationHook {
2934        async fn compensate(&self, context: CompensationContext) -> Result<()> {
2935            self.calls.lock().await.push(context);
2936            if self.should_fail {
2937                return Err(anyhow::anyhow!("compensation failed"));
2938            }
2939            Ok(())
2940        }
2941    }
2942
2943    #[derive(Clone)]
2944    struct FlakyCompensationHook {
2945        calls: Arc<Mutex<u32>>,
2946        failures_remaining: Arc<Mutex<u32>>,
2947    }
2948
2949    #[async_trait]
2950    impl CompensationHook for FlakyCompensationHook {
2951        async fn compensate(&self, _context: CompensationContext) -> Result<()> {
2952            {
2953                let mut calls = self.calls.lock().await;
2954                *calls += 1;
2955            }
2956            let mut failures_remaining = self.failures_remaining.lock().await;
2957            if *failures_remaining > 0 {
2958                *failures_remaining -= 1;
2959                return Err(anyhow::anyhow!("transient compensation failure"));
2960            }
2961            Ok(())
2962        }
2963    }
2964
2965    #[derive(Clone)]
2966    struct FailingCompensationIdempotencyStore {
2967        read_calls: Arc<Mutex<u32>>,
2968        write_calls: Arc<Mutex<u32>>,
2969    }
2970
2971    #[async_trait]
2972    impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
2973        async fn was_compensated(&self, _key: &str) -> Result<bool> {
2974            let mut read_calls = self.read_calls.lock().await;
2975            *read_calls += 1;
2976            Err(anyhow::anyhow!("forced idempotency read failure"))
2977        }
2978
2979        async fn mark_compensated(&self, _key: &str) -> Result<()> {
2980            let mut write_calls = self.write_calls.lock().await;
2981            *write_calls += 1;
2982            Err(anyhow::anyhow!("forced idempotency write failure"))
2983        }
2984    }
2985
2986    #[tokio::test]
2987    async fn execute_persists_success_trace_when_handle_exists() {
2988        let store_impl = Arc::new(InMemoryPersistenceStore::new());
2989        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
2990
2991        let mut bus = Bus::new();
2992        bus.insert(PersistenceHandle::from_arc(store_dyn));
2993        bus.insert(PersistenceTraceId::new("trace-success"));
2994
2995        let axon = Axon::<i32, i32, TestInfallible>::start("PersistSuccess").then(AddOne);
2996        let outcome = axon.execute(41, &(), &mut bus).await;
2997        assert!(matches!(outcome, Outcome::Next(42)));
2998
2999        let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
3000        assert_eq!(persisted.events.len(), 3); // Enter + step-level Next + final Next
3001        assert_eq!(persisted.events[0].outcome_kind, "Enter");
3002        assert_eq!(persisted.events[1].outcome_kind, "Next"); // step-level
3003        assert_eq!(persisted.events[2].outcome_kind, "Next"); // final
3004        assert_eq!(persisted.completion, Some(CompletionState::Success));
3005    }
3006
3007    #[tokio::test]
3008    async fn execute_persists_fault_completion_state() {
3009        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3010        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3011
3012        let mut bus = Bus::new();
3013        bus.insert(PersistenceHandle::from_arc(store_dyn));
3014        bus.insert(PersistenceTraceId::new("trace-fault"));
3015
3016        let axon = Axon::<i32, i32, String>::start("PersistFault").then(AlwaysFault);
3017        let outcome = axon.execute(41, &(), &mut bus).await;
3018        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3019
3020        let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
3021        assert_eq!(persisted.events.len(), 3); // Enter + step-level Fault + final Fault
3022        assert_eq!(persisted.events[1].outcome_kind, "Fault"); // step-level
3023        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3024        assert_eq!(persisted.completion, Some(CompletionState::Fault));
3025    }
3026
3027    #[tokio::test]
3028    async fn execute_respects_persistence_auto_complete_off() {
3029        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3030        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3031
3032        let mut bus = Bus::new();
3033        bus.insert(PersistenceHandle::from_arc(store_dyn));
3034        bus.insert(PersistenceTraceId::new("trace-no-complete"));
3035        bus.insert(PersistenceAutoComplete(false));
3036
3037        let axon = Axon::<i32, i32, TestInfallible>::start("PersistNoComplete").then(AddOne);
3038        let outcome = axon.execute(1, &(), &mut bus).await;
3039        assert!(matches!(outcome, Outcome::Next(2)));
3040
3041        let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
3042        assert_eq!(persisted.events.len(), 3); // Enter + step-level Next + final Next
3043        assert_eq!(persisted.completion, None);
3044    }
3045
3046    #[tokio::test]
3047    async fn fault_triggers_compensation_and_marks_compensated() {
3048        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3049        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3050        let calls = Arc::new(Mutex::new(Vec::new()));
3051        let compensation = RecordingCompensationHook {
3052            calls: calls.clone(),
3053            should_fail: false,
3054        };
3055
3056        let mut bus = Bus::new();
3057        bus.insert(PersistenceHandle::from_arc(store_dyn));
3058        bus.insert(PersistenceTraceId::new("trace-compensated"));
3059        bus.insert(CompensationHandle::from_hook(compensation));
3060
3061        let axon = Axon::<i32, i32, String>::start("CompensatedFault").then(AlwaysFault);
3062        let outcome = axon.execute(7, &(), &mut bus).await;
3063        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3064
3065        let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
3066        assert_eq!(persisted.events.len(), 4); // Enter + step-level Fault + final Fault + Compensated
3067        assert_eq!(persisted.events[0].outcome_kind, "Enter");
3068        assert_eq!(persisted.events[1].outcome_kind, "Fault"); // step-level
3069        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3070        assert_eq!(persisted.events[3].outcome_kind, "Compensated");
3071        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3072
3073        let recorded = calls.lock().await;
3074        assert_eq!(recorded.len(), 1);
3075        assert_eq!(recorded[0].trace_id, "trace-compensated");
3076        assert_eq!(recorded[0].fault_kind, "Fault");
3077    }
3078
3079    #[tokio::test]
3080    async fn failed_compensation_keeps_fault_completion() {
3081        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3082        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3083        let calls = Arc::new(Mutex::new(Vec::new()));
3084        let compensation = RecordingCompensationHook {
3085            calls: calls.clone(),
3086            should_fail: true,
3087        };
3088
3089        let mut bus = Bus::new();
3090        bus.insert(PersistenceHandle::from_arc(store_dyn));
3091        bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
3092        bus.insert(CompensationHandle::from_hook(compensation));
3093
3094        let axon = Axon::<i32, i32, String>::start("CompensationFails").then(AlwaysFault);
3095        let outcome = axon.execute(7, &(), &mut bus).await;
3096        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3097
3098        let persisted = store_impl
3099            .load("trace-compensation-failed")
3100            .await
3101            .unwrap()
3102            .unwrap();
3103        assert_eq!(persisted.events.len(), 3); // Enter + step-level Fault + final Fault
3104        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
3105        assert_eq!(persisted.completion, Some(CompletionState::Fault));
3106
3107        let recorded = calls.lock().await;
3108        assert_eq!(recorded.len(), 1);
3109    }
3110
3111    #[tokio::test]
3112    async fn compensation_retry_policy_succeeds_after_retries() {
3113        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3114        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3115        let calls = Arc::new(Mutex::new(0u32));
3116        let failures_remaining = Arc::new(Mutex::new(2u32));
3117        let compensation = FlakyCompensationHook {
3118            calls: calls.clone(),
3119            failures_remaining,
3120        };
3121
3122        let mut bus = Bus::new();
3123        bus.insert(PersistenceHandle::from_arc(store_dyn));
3124        bus.insert(PersistenceTraceId::new("trace-retry-success"));
3125        bus.insert(CompensationHandle::from_hook(compensation));
3126        bus.insert(CompensationRetryPolicy {
3127            max_attempts: 3,
3128            backoff_ms: 0,
3129        });
3130
3131        let axon = Axon::<i32, i32, String>::start("CompensationRetry").then(AlwaysFault);
3132        let outcome = axon.execute(7, &(), &mut bus).await;
3133        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3134
3135        let persisted = store_impl
3136            .load("trace-retry-success")
3137            .await
3138            .unwrap()
3139            .unwrap();
3140        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3141        assert_eq!(
3142            persisted.events.last().map(|e| e.outcome_kind.as_str()),
3143            Some("Compensated")
3144        );
3145
3146        let attempt_count = calls.lock().await;
3147        assert_eq!(*attempt_count, 3);
3148    }
3149
3150    #[tokio::test]
3151    async fn compensation_idempotency_skips_duplicate_hook_execution() {
3152        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3153        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3154        let calls = Arc::new(Mutex::new(Vec::new()));
3155        let compensation = RecordingCompensationHook {
3156            calls: calls.clone(),
3157            should_fail: false,
3158        };
3159        let idempotency = InMemoryCompensationIdempotencyStore::new();
3160
3161        let mut bus = Bus::new();
3162        bus.insert(PersistenceHandle::from_arc(store_dyn));
3163        bus.insert(PersistenceTraceId::new("trace-idempotent"));
3164        bus.insert(PersistenceAutoComplete(false));
3165        bus.insert(CompensationHandle::from_hook(compensation));
3166        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3167
3168        let axon = Axon::<i32, i32, String>::start("CompensationIdempotency").then(AlwaysFault);
3169
3170        let outcome1 = axon.execute(7, &(), &mut bus).await;
3171        let outcome2 = axon.execute(8, &(), &mut bus).await;
3172        assert!(matches!(outcome1, Outcome::Fault(msg) if msg == "boom"));
3173        assert!(matches!(outcome2, Outcome::Fault(msg) if msg == "boom"));
3174
3175        let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
3176        assert_eq!(persisted.completion, None);
3177        // Verify that "Compensated" events are present for both executions
3178        let compensated_count = persisted
3179            .events
3180            .iter()
3181            .filter(|e| e.outcome_kind == "Compensated")
3182            .count();
3183        assert_eq!(
3184            compensated_count, 2,
3185            "Should have 2 Compensated events (one per execution)"
3186        );
3187
3188        let recorded = calls.lock().await;
3189        assert_eq!(recorded.len(), 1);
3190    }
3191
3192    #[tokio::test]
3193    async fn compensation_idempotency_store_failure_does_not_block_compensation() {
3194        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3195        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3196        let calls = Arc::new(Mutex::new(Vec::new()));
3197        let read_calls = Arc::new(Mutex::new(0u32));
3198        let write_calls = Arc::new(Mutex::new(0u32));
3199        let compensation = RecordingCompensationHook {
3200            calls: calls.clone(),
3201            should_fail: false,
3202        };
3203        let idempotency = FailingCompensationIdempotencyStore {
3204            read_calls: read_calls.clone(),
3205            write_calls: write_calls.clone(),
3206        };
3207
3208        let mut bus = Bus::new();
3209        bus.insert(PersistenceHandle::from_arc(store_dyn));
3210        bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
3211        bus.insert(CompensationHandle::from_hook(compensation));
3212        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3213
3214        let axon = Axon::<i32, i32, String>::start("IdempotencyStoreFailure").then(AlwaysFault);
3215        let outcome = axon.execute(9, &(), &mut bus).await;
3216        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3217
3218        let persisted = store_impl
3219            .load("trace-idempotency-store-failure")
3220            .await
3221            .unwrap()
3222            .unwrap();
3223        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3224        assert_eq!(
3225            persisted.events.last().map(|e| e.outcome_kind.as_str()),
3226            Some("Compensated")
3227        );
3228
3229        let recorded = calls.lock().await;
3230        assert_eq!(recorded.len(), 1);
3231        assert_eq!(*read_calls.lock().await, 1);
3232        assert_eq!(*write_calls.lock().await, 1);
3233    }
3234
3235    #[tokio::test]
3236    async fn transition_bus_policy_blocks_unauthorized_resource_access() {
3237        let mut bus = Bus::new();
3238        bus.insert(1_i32);
3239        bus.insert("secret".to_string());
3240
3241        let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
3242        let outcome = axon.execute((), &(), &mut bus).await;
3243
3244        match outcome {
3245            Outcome::Fault(msg) => {
3246                assert!(msg.contains("Bus access denied"), "{msg}");
3247                assert!(msg.contains("CapabilityGuarded"), "{msg}");
3248                assert!(msg.contains("alloc::string::String"), "{msg}");
3249            }
3250            other => panic!("expected fault, got {other:?}"),
3251        }
3252    }
3253
3254    #[tokio::test]
3255    async fn execute_fails_on_version_mismatch_without_migration() {
3256        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3257        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3258
3259        let trace_id = "v-mismatch";
3260        // Create an existing trace with an older version
3261        let old_envelope = crate::persistence::PersistenceEnvelope {
3262            trace_id: trace_id.to_string(),
3263            circuit: "TestCircuit".to_string(),
3264            schematic_version: "0.9".to_string(),
3265            step: 0,
3266            node_id: None,
3267            outcome_kind: "Enter".to_string(),
3268            timestamp_ms: 0,
3269            payload_hash: None,
3270            payload: None,
3271        };
3272        store_impl.append(old_envelope).await.unwrap();
3273
3274        let mut bus = Bus::new();
3275        bus.insert(PersistenceHandle::from_arc(store_dyn));
3276        bus.insert(PersistenceTraceId::new(trace_id));
3277
3278        // Current axon is version 1.0
3279        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3280        let outcome = axon.execute(10, &(), &mut bus).await;
3281
3282        if let Outcome::Emit(kind, _) = outcome {
3283            assert_eq!(kind, "execution.resumption.version_mismatch_failed");
3284        } else {
3285            panic!("Expected version mismatch emission, got {:?}", outcome);
3286        }
3287    }
3288
3289    #[tokio::test]
3290    async fn execute_resumes_from_start_on_migration_strategy() {
3291        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3292        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3293
3294        let trace_id = "v-migration";
3295        // Create an existing trace with an older version at step 5
3296        let old_envelope = crate::persistence::PersistenceEnvelope {
3297            trace_id: trace_id.to_string(),
3298            circuit: "TestCircuit".to_string(),
3299            schematic_version: "0.9".to_string(),
3300            step: 5,
3301            node_id: None,
3302            outcome_kind: "Next".to_string(),
3303            timestamp_ms: 0,
3304            payload_hash: None,
3305            payload: None,
3306        };
3307        store_impl.append(old_envelope).await.unwrap();
3308
3309        let mut registry = ranvier_core::schematic::MigrationRegistry::new("TestCircuit");
3310        registry.register(ranvier_core::schematic::SnapshotMigration {
3311            name: Some("v0.9 to v1.0".to_string()),
3312            from_version: "0.9".to_string(),
3313            to_version: "1.0".to_string(),
3314            default_strategy: ranvier_core::schematic::MigrationStrategy::ResumeFromStart,
3315            node_mapping: std::collections::HashMap::new(),
3316            payload_mapper: None,
3317        });
3318
3319        let mut bus = Bus::new();
3320        bus.insert(PersistenceHandle::from_arc(store_dyn));
3321        bus.insert(PersistenceTraceId::new(trace_id));
3322        bus.insert(registry);
3323
3324        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3325        let outcome = axon.execute(10, &(), &mut bus).await;
3326
3327        // Should have resumed from start (step 0), resulting in 11
3328        assert!(matches!(outcome, Outcome::Next(11)));
3329
3330        // Verify new event has current version
3331        let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3332        assert_eq!(persisted.schematic_version, "1.0");
3333    }
3334
3335    #[tokio::test]
3336    async fn execute_applies_manual_intervention_jump_and_payload() {
3337        let store_impl = Arc::new(InMemoryPersistenceStore::new());
3338        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3339
3340        let trace_id = "intervention-test";
3341        // 1. Run a normal trace part-way
3342        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit")
3343            .then(AddOne)
3344            .then(AddOne);
3345
3346        let mut bus = Bus::new();
3347        bus.insert(PersistenceHandle::from_arc(store_dyn));
3348        bus.insert(PersistenceTraceId::new(trace_id));
3349
3350        // Save an intervention: Jump to the second 'AddOne' node (which has the label 'AddOne')
3351        // with a payload override of 100.
3352        // The first node is 'AddOne', the second is ALSO 'AddOne'.
3353        // Schematic position: 0=Ingress, 1=AddOne, 2=AddOne
3354        let _target_node_label = "AddOne";
3355        // To be precise, let's find the ID of the second node.
3356        let target_node_id = axon.schematic.nodes[2].id.clone();
3357
3358        // Pre-seed an initial trace entry so save_intervention doesn't fail
3359        store_impl
3360            .append(crate::persistence::PersistenceEnvelope {
3361                trace_id: trace_id.to_string(),
3362                circuit: "TestCircuit".to_string(),
3363                schematic_version: "1.0".to_string(),
3364                step: 0,
3365                node_id: None,
3366                outcome_kind: "Enter".to_string(),
3367                timestamp_ms: 0,
3368                payload_hash: None,
3369                payload: None,
3370            })
3371            .await
3372            .unwrap();
3373
3374        store_impl
3375            .save_intervention(
3376                trace_id,
3377                crate::persistence::Intervention {
3378                    target_node: target_node_id.clone(),
3379                    payload_override: Some(serde_json::json!(100)),
3380                    timestamp_ms: 0,
3381                },
3382            )
3383            .await
3384            .unwrap();
3385
3386        // 2. Execute. It should skip the first AddOne and use 100 for the second AddOne.
3387        // Result should be 100 + 1 = 101.
3388        let outcome = axon.execute(10, &(), &mut bus).await;
3389
3390        match outcome {
3391            Outcome::Next(val) => assert_eq!(val, 101, "Should have used payload 100 and added 1"),
3392            other => panic!("Expected Outcome::Next(101), got {:?}", other),
3393        }
3394
3395        // Verify the jump was logged in trace
3396        let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3397        // The last event should be from the jump target's execution.
3398        assert_eq!(persisted.interventions.len(), 1);
3399        assert_eq!(persisted.interventions[0].target_node, target_node_id);
3400    }
3401
3402    // ── DLQ Retry Tests ──────────────────────────────────────────────
3403
3404    /// A transition that fails a configurable number of times before succeeding.
3405    #[derive(Clone)]
3406    struct FailNThenSucceed {
3407        remaining: Arc<tokio::sync::Mutex<u32>>,
3408    }
3409
3410    #[async_trait]
3411    impl Transition<i32, i32> for FailNThenSucceed {
3412        type Error = String;
3413        type Resources = ();
3414
3415        async fn run(
3416            &self,
3417            state: i32,
3418            _resources: &Self::Resources,
3419            _bus: &mut Bus,
3420        ) -> Outcome<i32, Self::Error> {
3421            let mut rem = self.remaining.lock().await;
3422            if *rem > 0 {
3423                *rem -= 1;
3424                Outcome::Fault("transient failure".to_string())
3425            } else {
3426                Outcome::Next(state + 1)
3427            }
3428        }
3429    }
3430
3431    /// A mock DLQ sink that records all dead letters.
3432    #[derive(Clone)]
3433    struct MockDlqSink {
3434        letters: Arc<tokio::sync::Mutex<Vec<String>>>,
3435    }
3436
3437    #[async_trait]
3438    impl DlqSink for MockDlqSink {
3439        async fn store_dead_letter(
3440            &self,
3441            workflow_id: &str,
3442            _circuit_label: &str,
3443            node_id: &str,
3444            error_msg: &str,
3445            _payload: &[u8],
3446        ) -> Result<(), String> {
3447            let entry = format!("{}:{}:{}", workflow_id, node_id, error_msg);
3448            self.letters.lock().await.push(entry);
3449            Ok(())
3450        }
3451    }
3452
3453    #[tokio::test]
3454    async fn retry_then_dlq_retries_and_succeeds_before_exhaustion() {
3455        // Fail 2 times, succeed on 3rd attempt. Policy allows 5 attempts.
3456        let remaining = Arc::new(tokio::sync::Mutex::new(2u32));
3457        let trans = FailNThenSucceed { remaining };
3458
3459        let dlq_sink = MockDlqSink {
3460            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3461        };
3462
3463        let mut bus = Bus::new();
3464        bus.insert(Timeline::new());
3465
3466        let axon = Axon::<i32, i32, String>::start("RetrySucceed")
3467            .then(trans)
3468            .with_dlq_policy(DlqPolicy::RetryThenDlq {
3469                max_attempts: 5,
3470                backoff_ms: 1,
3471            })
3472            .with_dlq_sink(dlq_sink.clone());
3473        let outcome = axon.execute(10, &(), &mut bus).await;
3474
3475        // Should succeed (10 + 1 = 11)
3476        assert!(
3477            matches!(outcome, Outcome::Next(11)),
3478            "Expected Next(11), got {:?}",
3479            outcome
3480        );
3481
3482        // No dead letters since it recovered
3483        let letters = dlq_sink.letters.lock().await;
3484        assert!(
3485            letters.is_empty(),
3486            "Should have 0 dead letters, got {}",
3487            letters.len()
3488        );
3489
3490        // Timeline should contain NodeRetry events
3491        let timeline = bus.read::<Timeline>().unwrap();
3492        let retry_count = timeline
3493            .events
3494            .iter()
3495            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3496            .count();
3497        assert_eq!(retry_count, 2, "Should have 2 retry events");
3498    }
3499
3500    #[tokio::test]
3501    async fn retry_then_dlq_exhausts_retries_and_sends_to_dlq() {
3502        // Always fails. Policy allows 3 attempts (1 initial + 2 retries).
3503        let mut bus = Bus::new();
3504        bus.insert(Timeline::new());
3505
3506        let dlq_sink = MockDlqSink {
3507            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3508        };
3509
3510        let axon = Axon::<i32, i32, String>::start("RetryExhaust")
3511            .then(AlwaysFault)
3512            .with_dlq_policy(DlqPolicy::RetryThenDlq {
3513                max_attempts: 3,
3514                backoff_ms: 1,
3515            })
3516            .with_dlq_sink(dlq_sink.clone());
3517        let outcome = axon.execute(42, &(), &mut bus).await;
3518
3519        assert!(
3520            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
3521            "Expected Fault(boom), got {:?}",
3522            outcome
3523        );
3524
3525        // Should have exactly 1 dead letter
3526        let letters = dlq_sink.letters.lock().await;
3527        assert_eq!(letters.len(), 1, "Should have 1 dead letter");
3528
3529        // Timeline should have 2 retry events and 1 DlqExhausted event
3530        let timeline = bus.read::<Timeline>().unwrap();
3531        let retry_count = timeline
3532            .events
3533            .iter()
3534            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3535            .count();
3536        let dlq_count = timeline
3537            .events
3538            .iter()
3539            .filter(|e| matches!(e, TimelineEvent::DlqExhausted { .. }))
3540            .count();
3541        assert_eq!(
3542            retry_count, 2,
3543            "Should have 2 retry events (attempts 2 and 3)"
3544        );
3545        assert_eq!(dlq_count, 1, "Should have 1 DlqExhausted event");
3546    }
3547
3548    #[tokio::test]
3549    async fn send_to_dlq_policy_sends_immediately_without_retry() {
3550        let mut bus = Bus::new();
3551        bus.insert(Timeline::new());
3552
3553        let dlq_sink = MockDlqSink {
3554            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3555        };
3556
3557        let axon = Axon::<i32, i32, String>::start("SendDlq")
3558            .then(AlwaysFault)
3559            .with_dlq_policy(DlqPolicy::SendToDlq)
3560            .with_dlq_sink(dlq_sink.clone());
3561        let outcome = axon.execute(1, &(), &mut bus).await;
3562
3563        assert!(matches!(outcome, Outcome::Fault(_)));
3564
3565        // Should have exactly 1 dead letter (immediate, no retries)
3566        let letters = dlq_sink.letters.lock().await;
3567        assert_eq!(letters.len(), 1);
3568
3569        // No retry or DlqExhausted events
3570        let timeline = bus.read::<Timeline>().unwrap();
3571        let retry_count = timeline
3572            .events
3573            .iter()
3574            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3575            .count();
3576        assert_eq!(retry_count, 0);
3577    }
3578
3579    #[tokio::test]
3580    async fn drop_policy_does_not_send_to_dlq() {
3581        let mut bus = Bus::new();
3582
3583        let dlq_sink = MockDlqSink {
3584            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3585        };
3586
3587        let axon = Axon::<i32, i32, String>::start("DropDlq")
3588            .then(AlwaysFault)
3589            .with_dlq_policy(DlqPolicy::Drop)
3590            .with_dlq_sink(dlq_sink.clone());
3591        let outcome = axon.execute(1, &(), &mut bus).await;
3592
3593        assert!(matches!(outcome, Outcome::Fault(_)));
3594
3595        // No dead letters
3596        let letters = dlq_sink.letters.lock().await;
3597        assert!(letters.is_empty());
3598    }
3599
3600    #[tokio::test]
3601    async fn dynamic_policy_hot_reload_changes_dlq_behavior() {
3602        use ranvier_core::policy::DynamicPolicy;
3603
3604        // Start with Drop policy (no DLQ)
3605        let (tx, dynamic) = DynamicPolicy::new(DlqPolicy::Drop);
3606        let dlq_sink = MockDlqSink {
3607            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3608        };
3609
3610        let axon = Axon::<i32, i32, String>::start("DynamicDlq")
3611            .then(AlwaysFault)
3612            .with_dynamic_dlq_policy(dynamic)
3613            .with_dlq_sink(dlq_sink.clone());
3614
3615        // First execution: Drop policy → no dead letters
3616        let mut bus = Bus::new();
3617        let outcome = axon.execute(1, &(), &mut bus).await;
3618        assert!(matches!(outcome, Outcome::Fault(_)));
3619        assert!(
3620            dlq_sink.letters.lock().await.is_empty(),
3621            "Drop policy should produce no DLQ entries"
3622        );
3623
3624        // Hot-reload: switch to SendToDlq
3625        tx.send(DlqPolicy::SendToDlq).unwrap();
3626
3627        // Second execution: SendToDlq policy → dead letter captured
3628        let mut bus2 = Bus::new();
3629        let outcome2 = axon.execute(2, &(), &mut bus2).await;
3630        assert!(matches!(outcome2, Outcome::Fault(_)));
3631        assert_eq!(
3632            dlq_sink.letters.lock().await.len(),
3633            1,
3634            "SendToDlq policy should produce 1 DLQ entry"
3635        );
3636    }
3637
3638    #[tokio::test]
3639    async fn dynamic_saga_policy_hot_reload() {
3640        use ranvier_core::policy::DynamicPolicy;
3641        use ranvier_core::saga::SagaPolicy;
3642
3643        // Start with Disabled saga
3644        let (tx, dynamic) = DynamicPolicy::new(SagaPolicy::Disabled);
3645
3646        let axon = Axon::<i32, i32, TestInfallible>::start("DynamicSaga")
3647            .then(AddOne)
3648            .with_dynamic_saga_policy(dynamic);
3649
3650        // First execution: Disabled → no SagaStack in bus
3651        let mut bus = Bus::new();
3652        let _outcome = axon.execute(1, &(), &mut bus).await;
3653        assert!(
3654            bus.read::<SagaStack>().is_none() || bus.read::<SagaStack>().unwrap().is_empty(),
3655            "SagaStack should be absent or empty when disabled"
3656        );
3657
3658        // Hot-reload: enable saga
3659        tx.send(SagaPolicy::Enabled).unwrap();
3660
3661        // Second execution: Enabled → SagaStack populated
3662        let mut bus2 = Bus::new();
3663        let _outcome2 = axon.execute(10, &(), &mut bus2).await;
3664        assert!(
3665            bus2.read::<SagaStack>().is_some(),
3666            "SagaStack should exist when saga is enabled"
3667        );
3668    }
3669
3670    // ── IAM Boundary Tests ──────────────────────────────────────
3671
3672    mod iam_tests {
3673        use super::*;
3674        use ranvier_core::iam::{IamError, IamIdentity, IamPolicy, IamToken, IamVerifier};
3675
3676        /// Mock IamVerifier that returns a fixed identity.
3677        #[derive(Clone)]
3678        struct MockVerifier {
3679            identity: IamIdentity,
3680            should_fail: bool,
3681        }
3682
3683        #[async_trait]
3684        impl IamVerifier for MockVerifier {
3685            async fn verify(&self, _token: &str) -> Result<IamIdentity, IamError> {
3686                if self.should_fail {
3687                    Err(IamError::InvalidToken("mock verification failure".into()))
3688                } else {
3689                    Ok(self.identity.clone())
3690                }
3691            }
3692        }
3693
3694        #[tokio::test]
3695        async fn iam_require_identity_passes_with_valid_token() {
3696            let verifier = MockVerifier {
3697                identity: IamIdentity::new("alice").with_role("user"),
3698                should_fail: false,
3699            };
3700
3701            let axon = Axon::<i32, i32, TestInfallible>::new("IamTest")
3702                .with_iam(IamPolicy::RequireIdentity, verifier)
3703                .then(AddOne);
3704
3705            let mut bus = Bus::new();
3706            bus.insert(IamToken("valid-token".to_string()));
3707            let outcome = axon.execute(10, &(), &mut bus).await;
3708
3709            assert!(matches!(outcome, Outcome::Next(11)));
3710            // Verify IamIdentity was injected into Bus
3711            let identity = bus
3712                .read::<IamIdentity>()
3713                .expect("IamIdentity should be in Bus");
3714            assert_eq!(identity.subject, "alice");
3715        }
3716
3717        #[tokio::test]
3718        async fn iam_require_identity_rejects_missing_token() {
3719            let verifier = MockVerifier {
3720                identity: IamIdentity::new("ignored"),
3721                should_fail: false,
3722            };
3723
3724            let axon = Axon::<i32, i32, TestInfallible>::new("IamNoToken")
3725                .with_iam(IamPolicy::RequireIdentity, verifier)
3726                .then(AddOne);
3727
3728            let mut bus = Bus::new();
3729            // No IamToken inserted
3730            let outcome = axon.execute(10, &(), &mut bus).await;
3731
3732            // Should emit missing_token event
3733            match &outcome {
3734                Outcome::Emit(label, _) => {
3735                    assert_eq!(label, "iam.missing_token");
3736                }
3737                other => panic!("Expected Emit(iam.missing_token), got {:?}", other),
3738            }
3739        }
3740
3741        #[tokio::test]
3742        async fn iam_rejects_failed_verification() {
3743            let verifier = MockVerifier {
3744                identity: IamIdentity::new("ignored"),
3745                should_fail: true,
3746            };
3747
3748            let axon = Axon::<i32, i32, TestInfallible>::new("IamBadToken")
3749                .with_iam(IamPolicy::RequireIdentity, verifier)
3750                .then(AddOne);
3751
3752            let mut bus = Bus::new();
3753            bus.insert(IamToken("bad-token".to_string()));
3754            let outcome = axon.execute(10, &(), &mut bus).await;
3755
3756            match &outcome {
3757                Outcome::Emit(label, _) => {
3758                    assert_eq!(label, "iam.verification_failed");
3759                }
3760                other => panic!("Expected Emit(iam.verification_failed), got {:?}", other),
3761            }
3762        }
3763
3764        #[tokio::test]
3765        async fn iam_require_role_passes_with_matching_role() {
3766            let verifier = MockVerifier {
3767                identity: IamIdentity::new("bob").with_role("admin").with_role("user"),
3768                should_fail: false,
3769            };
3770
3771            let axon = Axon::<i32, i32, TestInfallible>::new("IamRole")
3772                .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
3773                .then(AddOne);
3774
3775            let mut bus = Bus::new();
3776            bus.insert(IamToken("token".to_string()));
3777            let outcome = axon.execute(5, &(), &mut bus).await;
3778
3779            assert!(matches!(outcome, Outcome::Next(6)));
3780        }
3781
3782        #[tokio::test]
3783        async fn iam_require_role_denies_without_role() {
3784            let verifier = MockVerifier {
3785                identity: IamIdentity::new("carol").with_role("user"),
3786                should_fail: false,
3787            };
3788
3789            let axon = Axon::<i32, i32, TestInfallible>::new("IamRoleDeny")
3790                .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
3791                .then(AddOne);
3792
3793            let mut bus = Bus::new();
3794            bus.insert(IamToken("token".to_string()));
3795            let outcome = axon.execute(5, &(), &mut bus).await;
3796
3797            match &outcome {
3798                Outcome::Emit(label, _) => {
3799                    assert_eq!(label, "iam.policy_denied");
3800                }
3801                other => panic!("Expected Emit(iam.policy_denied), got {:?}", other),
3802            }
3803        }
3804
3805        #[tokio::test]
3806        async fn iam_policy_none_skips_verification() {
3807            let verifier = MockVerifier {
3808                identity: IamIdentity::new("ignored"),
3809                should_fail: true, // would fail if actually called
3810            };
3811
3812            let axon = Axon::<i32, i32, TestInfallible>::new("IamNone")
3813                .with_iam(IamPolicy::None, verifier)
3814                .then(AddOne);
3815
3816            let mut bus = Bus::new();
3817            // No token needed when policy is None
3818            let outcome = axon.execute(10, &(), &mut bus).await;
3819
3820            assert!(matches!(outcome, Outcome::Next(11)));
3821        }
3822    }
3823
3824    // ── Schema Propagation Tests (M201-RQ9, RQ12) ──────────────────
3825
3826    #[derive(Clone)]
3827    struct SchemaTransition;
3828
3829    #[async_trait]
3830    impl Transition<String, String> for SchemaTransition {
3831        type Error = String;
3832        type Resources = ();
3833
3834        fn input_schema(&self) -> Option<serde_json::Value> {
3835            Some(serde_json::json!({
3836                "type": "object",
3837                "required": ["name"],
3838                "properties": {
3839                    "name": { "type": "string" }
3840                }
3841            }))
3842        }
3843
3844        async fn run(
3845            &self,
3846            state: String,
3847            _resources: &Self::Resources,
3848            _bus: &mut Bus,
3849        ) -> Outcome<String, Self::Error> {
3850            Outcome::Next(state)
3851        }
3852    }
3853
3854    #[test]
3855    fn then_auto_populates_input_schema_from_transition() {
3856        let axon = Axon::<String, String, String>::new("SchemaTest").then(SchemaTransition);
3857
3858        // The last node (added by .then()) should have input_schema
3859        let last_node = axon.schematic.nodes.last().unwrap();
3860        assert!(last_node.input_schema.is_some());
3861        let schema = last_node.input_schema.as_ref().unwrap();
3862        assert_eq!(schema["type"], "object");
3863        assert_eq!(schema["required"][0], "name");
3864    }
3865
3866    #[test]
3867    fn then_leaves_input_schema_none_when_not_provided() {
3868        let axon = Axon::<i32, i32, TestInfallible>::new("NoSchema").then(AddOne);
3869
3870        let last_node = axon.schematic.nodes.last().unwrap();
3871        assert!(last_node.input_schema.is_none());
3872    }
3873
3874    #[test]
3875    fn with_input_schema_value_sets_on_last_node() {
3876        let schema = serde_json::json!({"type": "integer"});
3877        let axon = Axon::<i32, i32, TestInfallible>::new("ManualSchema")
3878            .then(AddOne)
3879            .with_input_schema_value(schema.clone());
3880
3881        let last_node = axon.schematic.nodes.last().unwrap();
3882        assert_eq!(last_node.input_schema.as_ref().unwrap(), &schema);
3883    }
3884
3885    #[test]
3886    fn with_output_schema_value_sets_on_last_node() {
3887        let schema = serde_json::json!({"type": "integer"});
3888        let axon = Axon::<i32, i32, TestInfallible>::new("OutputSchema")
3889            .then(AddOne)
3890            .with_output_schema_value(schema.clone());
3891
3892        let last_node = axon.schematic.nodes.last().unwrap();
3893        assert_eq!(last_node.output_schema.as_ref().unwrap(), &schema);
3894    }
3895
3896    #[test]
3897    fn schematic_export_includes_schema_fields() {
3898        let axon = Axon::<String, String, String>::new("ExportTest")
3899            .then(SchemaTransition)
3900            .with_output_schema_value(serde_json::json!({"type": "string"}));
3901
3902        let json = serde_json::to_value(&axon.schematic).unwrap();
3903        let nodes = json["nodes"].as_array().unwrap();
3904        // Find the SchemaTransition node (last one)
3905        let last = nodes.last().unwrap();
3906        assert!(last.get("input_schema").is_some());
3907        assert_eq!(last["input_schema"]["type"], "object");
3908        assert_eq!(last["output_schema"]["type"], "string");
3909    }
3910
3911    #[test]
3912    fn schematic_export_omits_schema_fields_when_none() {
3913        let axon = Axon::<i32, i32, TestInfallible>::new("NoSchemaExport").then(AddOne);
3914
3915        let json = serde_json::to_value(&axon.schematic).unwrap();
3916        let nodes = json["nodes"].as_array().unwrap();
3917        let last = nodes.last().unwrap();
3918        let obj = last.as_object().unwrap();
3919        assert!(!obj.contains_key("input_schema"));
3920        assert!(!obj.contains_key("output_schema"));
3921    }
3922
3923    #[test]
3924    fn schematic_json_roundtrip_preserves_schemas() {
3925        let axon = Axon::<String, String, String>::new("Roundtrip")
3926            .then(SchemaTransition)
3927            .with_output_schema_value(serde_json::json!({"type": "string"}));
3928
3929        let json_str = serde_json::to_string(&axon.schematic).unwrap();
3930        let deserialized: ranvier_core::schematic::Schematic =
3931            serde_json::from_str(&json_str).unwrap();
3932
3933        let last = deserialized.nodes.last().unwrap();
3934        assert!(last.input_schema.is_some());
3935        assert!(last.output_schema.is_some());
3936        assert_eq!(last.input_schema.as_ref().unwrap()["required"][0], "name");
3937        assert_eq!(last.output_schema.as_ref().unwrap()["type"], "string");
3938    }
3939}