Skip to main content

ranvier_runtime/axon/
mod.rs

1//! # Axon: Executable Decision Tree
2//!
3//! The `Axon` is the **runtime execution path** of a Typed Decision Tree.
4//! It functions as a reusable Typed Decision flow (Axon<In, Out, E>).
5//!
6//! ## Design Philosophy
7//!
8//! * **Axon flows, Schematic shows**: Axon executes; Schematic describes
9//! * **Builder pattern**: `Axon::start().then().then()`
10//! * **Schematic extraction**: Every Axon carries its structural metadata
11//!
12//! "Axon is the flowing thing, Schematic is the visible thing."
13
14use crate::persistence::{
15    CompensationAutoTrigger, CompensationContext, CompensationHandle,
16    CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17    PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use async_trait::async_trait;
20use ranvier_audit::{AuditEvent, AuditSink};
21use ranvier_core::bus::Bus;
22use ranvier_core::cluster::DistributedLock;
23use ranvier_core::event::{DlqPolicy, DlqSink};
24use ranvier_core::outcome::Outcome;
25use ranvier_core::policy::DynamicPolicy;
26use ranvier_core::saga::{SagaPolicy, SagaStack};
27use ranvier_core::schematic::{
28    BusCapabilitySchema, Schematic,
29};
30use ranvier_core::timeline::{Timeline, TimelineEvent};
31use ranvier_core::transition::Transition;
32
33use serde::{Serialize, de::DeserializeOwned};
34use serde_json::Value;
35use std::any::type_name;
36use std::ffi::OsString;
37use std::fs;
38use std::future::Future;
39
40use std::path::{Path, PathBuf};
41use std::pin::Pin;
42use std::sync::{Arc, Mutex, OnceLock};
43use std::time::{SystemTime, UNIX_EPOCH};
44use tracing::Instrument;
45
46/// Configuration for Execution Mode.
47#[derive(Clone)]
48pub enum ExecutionMode {
49    /// Normal, unpartitioned local execution.
50    Local,
51    /// Singleton execution, ensures only one instance runs across the entire cluster.
52    Singleton {
53        lock_key: String,
54        ttl_ms: u64,
55        lock_provider: Arc<dyn DistributedLock>,
56    },
57}
58
59/// Strategy for parallel step execution.
60///
61/// Controls how the Axon handles faults during parallel branch execution.
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum ParallelStrategy {
64    /// All parallel steps must succeed; if any faults, return the first fault.
65    AllMustSucceed,
66    /// Continue even if some steps fail; return first successful result.
67    /// If all branches fault, returns the first fault.
68    AnyCanFail,
69}
70
71/// Type alias for async boxed futures used in Axon execution.
72pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
73
74/// Executor type for Axon steps.
75/// Now takes an input state `In`, a resource bundle `Res`, and returns an `Outcome<Out, E>`.
76/// Must be Send + Sync to be reusable across threads and clones.
77pub type Executor<In, Out, E, Res> =
78    Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
79
80/// Manual intervention jump command injected into the Bus.
81#[derive(Debug, Clone)]
82pub struct ManualJump {
83    pub target_node: String,
84    pub payload_override: Option<serde_json::Value>,
85}
86
87/// Start step index for resumption, injected into the Bus.
88#[derive(Debug, Clone, Copy)]
89struct StartStep(u64);
90
91/// Persisted state for resumption, injected into the Bus.
92#[derive(Debug, Clone)]
93struct ResumptionState {
94    payload: Option<serde_json::Value>,
95}
96
97/// Helper to extract a readable type name from a type.
98fn type_name_of<T: ?Sized>() -> String {
99    let full = type_name::<T>();
100    full.split("::").last().unwrap_or(full).to_string()
101}
102
103/// The Axon Builder and Runtime.
104///
105/// `Axon` represents an executable decision tree.
106/// It is reusable and thread-safe.
107///
108/// ## Example
109///
110/// ```rust,ignore
111/// use ranvier_core::prelude::*;
112/// // ...
113/// // Start with an identity Axon (In -> In)
114/// let axon = Axon::<(), (), _>::new("My Axon")
115///     .then(StepA)
116///     .then(StepB);
117///
118/// // Execute multiple times
119/// let res1 = axon.execute((), &mut bus1).await;
120/// let res2 = axon.execute((), &mut bus2).await;
121/// ```
122pub struct Axon<In, Out, E, Res = ()> {
123    /// The static structure (for visualization/analysis)
124    pub schematic: Schematic,
125    /// The runtime executor
126    pub(crate) executor: Executor<In, Out, E, Res>,
127    /// How this Axon is executed across the cluster
128    pub execution_mode: ExecutionMode,
129    /// Optional persistence store for state inspection
130    pub persistence_store: Option<Arc<dyn crate::persistence::PersistenceStore>>,
131    /// Optional audit sink for tamper-evident logging of interventions
132    pub audit_sink: Option<Arc<dyn AuditSink>>,
133    /// Optional dead-letter queue sink for storing failed events
134    pub dlq_sink: Option<Arc<dyn DlqSink>>,
135    /// Policy for handling event failures
136    pub dlq_policy: DlqPolicy,
137    /// Optional dynamic (hot-reloadable) DLQ policy — takes precedence over static `dlq_policy`
138    pub dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>,
139    /// Policy for automated saga compensation
140    pub saga_policy: SagaPolicy,
141    /// Optional dynamic (hot-reloadable) Saga policy — takes precedence over static `saga_policy`
142    pub dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>,
143    /// Registry for Saga compensation handlers
144    pub saga_compensation_registry:
145        Arc<std::sync::RwLock<ranvier_core::saga::SagaCompensationRegistry<E, Res>>>,
146    /// Optional IAM handle for identity verification at the Schematic boundary
147    pub iam_handle: Option<ranvier_core::iam::IamHandle>,
148}
149
150/// Schematic export request derived from command-line args/env.
151#[derive(Debug, Clone)]
152pub struct SchematicExportRequest {
153    /// Optional output file path. If omitted, schematic is written to stdout.
154    pub output: Option<PathBuf>,
155}
156
157impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
158    fn clone(&self) -> Self {
159        Self {
160            schematic: self.schematic.clone(),
161            executor: self.executor.clone(),
162            execution_mode: self.execution_mode.clone(),
163            persistence_store: self.persistence_store.clone(),
164            audit_sink: self.audit_sink.clone(),
165            dlq_sink: self.dlq_sink.clone(),
166            dlq_policy: self.dlq_policy.clone(),
167            dynamic_dlq_policy: self.dynamic_dlq_policy.clone(),
168            saga_policy: self.saga_policy.clone(),
169            dynamic_saga_policy: self.dynamic_saga_policy.clone(),
170            saga_compensation_registry: self.saga_compensation_registry.clone(),
171            iam_handle: self.iam_handle.clone(),
172        }
173    }
174}
175
176mod builder;
177mod executor;
178mod parallel;
179
180#[async_trait]
181impl<In, Out, E, Res> ranvier_inspector::StateInspector for Axon<In, Out, E, Res>
182where
183    In: Send + Sync + Serialize + DeserializeOwned + 'static,
184    Out: Send + Sync + Serialize + DeserializeOwned + 'static,
185    E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
186    Res: ranvier_core::transition::ResourceRequirement,
187{
188    async fn get_state(&self, trace_id: &str) -> Option<serde_json::Value> {
189        let store = self.persistence_store.as_ref()?;
190        let trace = store.load(trace_id).await.ok().flatten()?;
191        Some(serde_json::to_value(trace).unwrap_or(serde_json::Value::Null))
192    }
193
194    async fn force_resume(
195        &self,
196        trace_id: &str,
197        target_node: &str,
198        payload_override: Option<Value>,
199    ) -> Result<(), String> {
200        let store = self
201            .persistence_store
202            .as_ref()
203            .ok_or("No persistence store attached to Axon")?;
204
205        let intervention = crate::persistence::Intervention {
206            target_node: target_node.to_string(),
207            payload_override,
208            timestamp_ms: now_ms(),
209        };
210
211        store
212            .save_intervention(trace_id, intervention)
213            .await
214            .map_err(|e| format!("Failed to save intervention: {}", e))?;
215
216        if let Some(sink) = self.audit_sink.as_ref() {
217            let event = AuditEvent::new(
218                uuid::Uuid::new_v4().to_string(),
219                "Inspector".to_string(),
220                "ForceResume".to_string(),
221                trace_id.to_string(),
222            )
223            .with_metadata("target_node", target_node);
224
225            let _ = sink.append(&event).await;
226        }
227
228        tracing::info!(trace_id = %trace_id, target_node = %target_node, "Force resume requested via Inspector");
229        Ok(())
230    }
231}
232
233fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
234    let args: Vec<OsString> = std::env::args_os().skip(1).collect();
235    let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
236    let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
237
238    let mut i = 0;
239    while i < args.len() {
240        let arg = args[i].to_string_lossy();
241
242        if arg == "--schematic" {
243            enabled = true;
244            i += 1;
245            continue;
246        }
247
248        if arg == "--schematic-output" || arg == "--output" {
249            if let Some(next) = args.get(i + 1) {
250                output = Some(PathBuf::from(next));
251                i += 2;
252                continue;
253            }
254        } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
255            output = Some(PathBuf::from(value));
256        } else if let Some(value) = arg.strip_prefix("--output=") {
257            output = Some(PathBuf::from(value));
258        }
259
260        i += 1;
261    }
262
263    if enabled {
264        Some(SchematicExportRequest { output })
265    } else {
266        None
267    }
268}
269
270fn env_flag_is_true(key: &str) -> bool {
271    match std::env::var(key) {
272        Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
273        Err(_) => false,
274    }
275}
276
277fn inspector_enabled_from_env() -> bool {
278    let raw = std::env::var("RANVIER_INSPECTOR").ok();
279    inspector_enabled_from_value(raw.as_deref())
280}
281
282fn inspector_enabled_from_value(value: Option<&str>) -> bool {
283    match value {
284        Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
285        None => true,
286    }
287}
288
289fn inspector_dev_mode_from_env() -> bool {
290    let raw = std::env::var("RANVIER_MODE").ok();
291    inspector_dev_mode_from_value(raw.as_deref())
292}
293
294fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
295    !matches!(
296        value.map(|v| v.to_ascii_lowercase()),
297        Some(mode) if mode == "prod" || mode == "production"
298    )
299}
300
301fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
302    let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
303        Ok(v) if !v.trim().is_empty() => v,
304        _ => return,
305    };
306
307    let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
308    let policy = timeline_adaptive_policy();
309    let forced = should_force_export(outcome, &policy);
310    let should_export = sampled || forced;
311    if !should_export {
312        record_sampling_stats(false, sampled, forced, "none", &policy);
313        return;
314    }
315
316    let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
317    timeline.sort();
318
319    let mode = std::env::var("RANVIER_TIMELINE_MODE")
320        .unwrap_or_else(|_| "overwrite".to_string())
321        .to_ascii_lowercase();
322
323    if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
324        tracing::warn!(
325            "Failed to persist timeline file {} (mode={}): {}",
326            path,
327            mode,
328            err
329        );
330        record_sampling_stats(false, sampled, forced, &mode, &policy);
331    } else {
332        record_sampling_stats(true, sampled, forced, &mode, &policy);
333    }
334}
335
336fn extract_panic_message(payload: &Box<dyn std::any::Any + Send>) -> String {
337    if let Some(s) = payload.downcast_ref::<&str>() {
338        (*s).to_string()
339    } else if let Some(s) = payload.downcast_ref::<String>() {
340        s.clone()
341    } else {
342        "unknown panic".to_string()
343    }
344}
345
346fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
347    match outcome {
348        Outcome::Next(_) => "Next".to_string(),
349        Outcome::Branch(id, _) => format!("Branch:{}", id),
350        Outcome::Jump(id, _) => format!("Jump:{}", id),
351        Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
352        Outcome::Fault(_) => "Fault".to_string(),
353    }
354}
355
356fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
357    match outcome {
358        Outcome::Next(_) => "Next",
359        Outcome::Branch(_, _) => "Branch",
360        Outcome::Jump(_, _) => "Jump",
361        Outcome::Emit(_, _) => "Emit",
362        Outcome::Fault(_) => "Fault",
363    }
364}
365
366fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
367    match outcome {
368        Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
369        Outcome::Jump(node_id, _) => Some(node_id.to_string()),
370        Outcome::Emit(event_type, _) => Some(event_type.clone()),
371        Outcome::Next(_) | Outcome::Fault(_) => None,
372    }
373}
374
375fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
376    match outcome {
377        Outcome::Fault(_) => CompletionState::Fault,
378        _ => CompletionState::Success,
379    }
380}
381
382fn persistence_trace_id(bus: &Bus) -> String {
383    if let Some(explicit) = bus.read::<PersistenceTraceId>() {
384        explicit.0.clone()
385    } else {
386        format!("{}:{}", bus.id, now_ms())
387    }
388}
389
390fn persistence_auto_complete(bus: &Bus) -> bool {
391    bus.read::<PersistenceAutoComplete>()
392        .map(|v| v.0)
393        .unwrap_or(true)
394}
395
396fn compensation_auto_trigger(bus: &Bus) -> bool {
397    bus.read::<CompensationAutoTrigger>()
398        .map(|v| v.0)
399        .unwrap_or(true)
400}
401
402fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
403    bus.read::<CompensationRetryPolicy>()
404        .copied()
405        .unwrap_or_default()
406}
407
408/// Unwrap the Outcome enum layer from a persisted event payload.
409///
410/// Events are stored with `outcome.to_json_value()` which serializes the full
411/// Outcome enum, e.g. `{"Next": {"order_id": "1001", ...}}`. The resumption
412/// handler expects the raw inner value, so we extract it here.
413fn unwrap_outcome_payload(payload: Option<&serde_json::Value>) -> Option<serde_json::Value> {
414    payload.map(|p| {
415        p.get("Next")
416            .or_else(|| p.get("Branch"))
417            .or_else(|| p.get("Jump"))
418            .cloned()
419            .unwrap_or_else(|| p.clone())
420    })
421}
422
423async fn load_persistence_version(
424    handle: &PersistenceHandle,
425    trace_id: &str,
426) -> (
427    u64,
428    Option<String>,
429    Option<crate::persistence::Intervention>,
430    Option<String>,
431    Option<serde_json::Value>,
432) {
433    let store = handle.store();
434    match store.load(trace_id).await {
435        Ok(Some(trace)) => {
436            let (next_step, last_node_id, last_payload) =
437                if let Some(resume_from_step) = trace.resumed_from_step {
438                    let anchor_event = trace
439                        .events
440                        .iter()
441                        .rev()
442                        .find(|event| {
443                            event.step <= resume_from_step
444                                && event.outcome_kind == "Next"
445                                && event.payload.is_some()
446                        })
447                        .or_else(|| {
448                            trace.events.iter().rev().find(|event| {
449                                event.step <= resume_from_step
450                                    && event.outcome_kind != "Fault"
451                                    && event.payload.is_some()
452                            })
453                        })
454                        .or_else(|| {
455                            trace.events.iter().rev().find(|event| {
456                                event.step <= resume_from_step && event.payload.is_some()
457                            })
458                        })
459                        .or_else(|| trace.events.last());
460
461                    (
462                        resume_from_step.saturating_add(1),
463                        anchor_event.and_then(|event| event.node_id.clone()),
464                        anchor_event.and_then(|event| {
465                            unwrap_outcome_payload(event.payload.as_ref())
466                        }),
467                    )
468                } else {
469                    let last_event = trace.events.last();
470                    (
471                        last_event
472                            .map(|event| event.step.saturating_add(1))
473                            .unwrap_or(0),
474                        last_event.and_then(|event| event.node_id.clone()),
475                        last_event.and_then(|event| {
476                            unwrap_outcome_payload(event.payload.as_ref())
477                        }),
478                    )
479                };
480
481            (
482                next_step,
483                Some(trace.schematic_version),
484                trace.interventions.last().cloned(),
485                last_node_id,
486                last_payload,
487            )
488        }
489        Ok(None) => (0, None, None, None, None),
490        Err(err) => {
491            tracing::warn!(
492                trace_id = %trace_id,
493                error = %err,
494                "Failed to load persistence trace; falling back to step=0"
495            );
496            (0, None, None, None, None)
497        }
498    }
499}
500
501#[allow(clippy::too_many_arguments)]
502async fn run_this_step<In, Out, E, Res>(
503    trans: &(impl Transition<In, Out, Resources = Res, Error = E> + Clone + 'static),
504    state: In,
505    res: &Res,
506    bus: &mut Bus,
507    node_id: &str,
508    node_label: &str,
509    bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
510    step_idx: u64,
511) -> Outcome<Out, E>
512where
513    In: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
514    Out: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
515    E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
516    Res: ranvier_core::transition::ResourceRequirement,
517{
518    let label = trans.label();
519    let res_type = std::any::type_name::<Res>()
520        .split("::")
521        .last()
522        .unwrap_or("unknown");
523
524    // Debug pausing
525    let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
526        debug.should_pause(node_id)
527    } else {
528        false
529    };
530
531    if should_pause {
532        let trace_id = persistence_trace_id(bus);
533        tracing::event!(
534            target: "ranvier.debugger",
535            tracing::Level::INFO,
536            trace_id = %trace_id,
537            node_id = %node_id,
538            "Node paused"
539        );
540
541        if let Some(timeline) = bus.read_mut::<Timeline>() {
542            timeline.push(TimelineEvent::NodePaused {
543                node_id: node_id.to_string(),
544                timestamp: now_ms(),
545            });
546        }
547        if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
548            debug.wait().await;
549        }
550    }
551
552    let enter_ts = now_ms();
553    if let Some(timeline) = bus.read_mut::<Timeline>() {
554        timeline.push(TimelineEvent::NodeEnter {
555            node_id: node_id.to_string(),
556            node_label: node_label.to_string(),
557            timestamp: enter_ts,
558        });
559    }
560
561    // Check DLQ retry policy and pre-serialize state for potential retries
562    let dlq_retry_config = bus.read::<DlqPolicy>().and_then(|p| {
563        if let DlqPolicy::RetryThenDlq {
564            max_attempts,
565            backoff_ms,
566        } = p
567        {
568            Some((*max_attempts, *backoff_ms))
569        } else {
570            None
571        }
572    });
573    let retry_state_snapshot = if dlq_retry_config.is_some() {
574        serde_json::to_value(&state).ok()
575    } else {
576        None
577    };
578
579    // State capture for Saga - SERIALIZE BEFORE CONSUMPTION
580    let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
581        Some(serde_json::to_vec(&state).unwrap_or_default())
582    } else {
583        None
584    };
585
586    let node_span = tracing::info_span!(
587        "Node",
588        ranvier.node = %label,
589        ranvier.resource_type = %res_type,
590        ranvier.outcome_kind = tracing::field::Empty,
591        ranvier.outcome_target = tracing::field::Empty
592    );
593    let started = std::time::Instant::now();
594    bus.set_access_policy(label.clone(), bus_policy.clone());
595    let result = trans
596        .run(state, res, bus)
597        .instrument(node_span.clone())
598        .await;
599    bus.clear_access_policy();
600
601    // DLQ Retry loop: if first attempt faulted and RetryThenDlq is configured,
602    // retry with exponential backoff before giving up.
603    let result = if let Outcome::Fault(_) = &result {
604        if let (Some((max_attempts, backoff_ms)), Some(snapshot)) =
605            (dlq_retry_config, &retry_state_snapshot)
606        {
607            let mut final_result = result;
608            // attempt 1 already done; retry from 2..=max_attempts
609            for attempt in 2..=max_attempts {
610                let delay = backoff_ms.saturating_mul(2u64.saturating_pow(attempt - 2));
611
612                tracing::info!(
613                    ranvier.node = %label,
614                    attempt = attempt,
615                    max_attempts = max_attempts,
616                    backoff_ms = delay,
617                    "Retrying faulted node"
618                );
619
620                if let Some(timeline) = bus.read_mut::<Timeline>() {
621                    timeline.push(TimelineEvent::NodeRetry {
622                        node_id: node_id.to_string(),
623                        attempt,
624                        max_attempts,
625                        backoff_ms: delay,
626                        timestamp: now_ms(),
627                    });
628                }
629
630                tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
631
632                if let Ok(retry_state) = serde_json::from_value::<In>(snapshot.clone()) {
633                    bus.set_access_policy(label.clone(), bus_policy.clone());
634                    let retry_result = trans
635                        .run(retry_state, res, bus)
636                        .instrument(tracing::info_span!(
637                            "NodeRetry",
638                            ranvier.node = %label,
639                            attempt = attempt
640                        ))
641                        .await;
642                    bus.clear_access_policy();
643
644                    match &retry_result {
645                        Outcome::Fault(_) => {
646                            final_result = retry_result;
647                        }
648                        _ => {
649                            final_result = retry_result;
650                            break;
651                        }
652                    }
653                }
654            }
655            final_result
656        } else {
657            result
658        }
659    } else {
660        result
661    };
662
663    node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
664    if let Some(target) = outcome_target(&result) {
665        node_span.record("ranvier.outcome_target", tracing::field::display(&target));
666    }
667
668    // Inject TransitionErrorContext on fault
669    if let Outcome::Fault(ref err) = result {
670        let pipeline_name = bus
671            .read::<ranvier_core::schematic::Schematic>()
672            .map(|s| s.name.clone())
673            .unwrap_or_default();
674        let ctx = ranvier_core::error::TransitionErrorContext {
675            pipeline_name: pipeline_name.clone(),
676            transition_name: label.clone(),
677            step_index: step_idx as usize,
678        };
679        tracing::error!(
680            pipeline = %pipeline_name,
681            transition = %label,
682            step = step_idx,
683            error = ?err,
684            "Transition fault"
685        );
686        bus.insert(ctx);
687    }
688
689    let duration_ms = started.elapsed().as_millis() as u64;
690    let exit_ts = now_ms();
691
692    if let Some(timeline) = bus.read_mut::<Timeline>() {
693        timeline.push(TimelineEvent::NodeExit {
694            node_id: node_id.to_string(),
695            outcome_type: outcome_type_name(&result),
696            duration_ms,
697            timestamp: exit_ts,
698        });
699
700        if let Outcome::Branch(branch_id, _) = &result {
701            timeline.push(TimelineEvent::Branchtaken {
702                branch_id: branch_id.clone(),
703                timestamp: exit_ts,
704            });
705        }
706    }
707
708    // Push to Saga Stack if Next outcome and snapshot taken
709    if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot)
710        && let Some(stack) = bus.read_mut::<SagaStack>()
711    {
712        stack.push(node_id.to_string(), label.clone(), snapshot);
713    }
714
715    if let Some(handle) = bus.read::<PersistenceHandle>() {
716        let trace_id = persistence_trace_id(bus);
717        let circuit = bus
718            .read::<ranvier_core::schematic::Schematic>()
719            .map(|s| s.name.clone())
720            .unwrap_or_default();
721        let version = bus
722            .read::<ranvier_core::schematic::Schematic>()
723            .map(|s| s.schema_version.clone())
724            .unwrap_or_default();
725
726        persist_execution_event(
727            handle,
728            &trace_id,
729            &circuit,
730            &version,
731            step_idx,
732            Some(node_id.to_string()),
733            outcome_kind_name(&result),
734            Some(result.to_json_value()),
735        )
736        .await;
737    }
738
739    // DLQ reporting — only fires after all retries are exhausted (RetryThenDlq)
740    // or immediately (SendToDlq). Drop policy skips entirely.
741    if let Outcome::Fault(f) = &result {
742        // Read policy and sink, then drop the borrows before mutable timeline access
743        let dlq_action = {
744            let policy = bus.read::<DlqPolicy>();
745            let sink = bus.read::<Arc<dyn DlqSink>>();
746            match (sink, policy) {
747                (Some(s), Some(p)) if !matches!(p, DlqPolicy::Drop) => Some(s.clone()),
748                _ => None,
749            }
750        };
751
752        if let Some(sink) = dlq_action {
753            if let Some((max_attempts, _)) = dlq_retry_config
754                && let Some(timeline) = bus.read_mut::<Timeline>()
755            {
756                timeline.push(TimelineEvent::DlqExhausted {
757                    node_id: node_id.to_string(),
758                    total_attempts: max_attempts,
759                    timestamp: now_ms(),
760                });
761            }
762
763            let trace_id = persistence_trace_id(bus);
764            let circuit = bus
765                .read::<ranvier_core::schematic::Schematic>()
766                .map(|s| s.name.clone())
767                .unwrap_or_default();
768            let _ = sink
769                .store_dead_letter(
770                    &trace_id,
771                    &circuit,
772                    node_id,
773                    &format!("{:?}", f),
774                    &serde_json::to_vec(&f).unwrap_or_default(),
775                )
776                .await;
777        }
778    }
779
780    result
781}
782
783#[allow(clippy::too_many_arguments)]
784async fn run_this_compensated_step<Out, Next, E, Res, Comp>(
785    trans: &(impl Transition<Out, Next, Resources = Res, Error = E> + Clone + 'static),
786    comp: &Comp,
787    state: Out,
788    res: &Res,
789    bus: &mut Bus,
790    node_id: &str,
791    comp_node_id: &str,
792    node_label: &str,
793    bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
794    step_idx: u64,
795) -> Outcome<Next, E>
796where
797    Out: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
798    Next: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
799    E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
800    Res: ranvier_core::transition::ResourceRequirement,
801    Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
802{
803    let label = trans.label();
804
805    // Debug pausing
806    let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
807        debug.should_pause(node_id)
808    } else {
809        false
810    };
811
812    if should_pause {
813        let trace_id = persistence_trace_id(bus);
814        tracing::event!(
815            target: "ranvier.debugger",
816            tracing::Level::INFO,
817            trace_id = %trace_id,
818            node_id = %node_id,
819            "Node paused (compensated)"
820        );
821
822        if let Some(timeline) = bus.read_mut::<Timeline>() {
823            timeline.push(TimelineEvent::NodePaused {
824                node_id: node_id.to_string(),
825                timestamp: now_ms(),
826            });
827        }
828        if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
829            debug.wait().await;
830        }
831    }
832
833    let enter_ts = now_ms();
834    if let Some(timeline) = bus.read_mut::<Timeline>() {
835        timeline.push(TimelineEvent::NodeEnter {
836            node_id: node_id.to_string(),
837            node_label: node_label.to_string(),
838            timestamp: enter_ts,
839        });
840    }
841
842    // State capture for Saga - SERIALIZE BEFORE CONSUMPTION
843    let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
844        Some(serde_json::to_vec(&state).unwrap_or_default())
845    } else {
846        None
847    };
848
849    let node_span = tracing::info_span!("Node", ranvier.node = %label);
850    bus.set_access_policy(label.clone(), bus_policy.clone());
851    let result = trans
852        .run(state.clone(), res, bus)
853        .instrument(node_span)
854        .await;
855    bus.clear_access_policy();
856
857    let duration_ms = 0; // Simplified
858    let exit_ts = now_ms();
859
860    if let Some(timeline) = bus.read_mut::<Timeline>() {
861        timeline.push(TimelineEvent::NodeExit {
862            node_id: node_id.to_string(),
863            outcome_type: outcome_type_name(&result),
864            duration_ms,
865            timestamp: exit_ts,
866        });
867    }
868
869    // Automated Compensation Trigger
870    if let Outcome::Fault(ref err) = result {
871        if compensation_auto_trigger(bus) {
872            tracing::info!(
873                ranvier.node = %label,
874                ranvier.compensation.trigger = "saga",
875                error = ?err,
876                "Saga compensation triggered"
877            );
878
879            if let Some(timeline) = bus.read_mut::<Timeline>() {
880                timeline.push(TimelineEvent::NodeEnter {
881                    node_id: comp_node_id.to_string(),
882                    node_label: format!("Compensate: {}", comp.label()),
883                    timestamp: exit_ts,
884                });
885            }
886
887            // Run compensation
888            let _ = comp.run(state, res, bus).await;
889
890            if let Some(timeline) = bus.read_mut::<Timeline>() {
891                timeline.push(TimelineEvent::NodeExit {
892                    node_id: comp_node_id.to_string(),
893                    outcome_type: "Compensated".to_string(),
894                    duration_ms: 0,
895                    timestamp: now_ms(),
896                });
897            }
898
899            if let Some(handle) = bus.read::<PersistenceHandle>() {
900                let trace_id = persistence_trace_id(bus);
901                let circuit = bus
902                    .read::<ranvier_core::schematic::Schematic>()
903                    .map(|s| s.name.clone())
904                    .unwrap_or_default();
905                let version = bus
906                    .read::<ranvier_core::schematic::Schematic>()
907                    .map(|s| s.schema_version.clone())
908                    .unwrap_or_default();
909
910                persist_execution_event(
911                    handle,
912                    &trace_id,
913                    &circuit,
914                    &version,
915                    step_idx + 1, // Compensation node index
916                    Some(comp_node_id.to_string()),
917                    "Compensated",
918                    None,
919                )
920                .await;
921            }
922        }
923    } else if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot) {
924        // Push to Saga Stack if Next outcome and snapshot taken
925        if let Some(stack) = bus.read_mut::<SagaStack>() {
926            stack.push(node_id.to_string(), label.clone(), snapshot);
927        }
928
929        if let Some(handle) = bus.read::<PersistenceHandle>() {
930            let trace_id = persistence_trace_id(bus);
931            let circuit = bus
932                .read::<ranvier_core::schematic::Schematic>()
933                .map(|s| s.name.clone())
934                .unwrap_or_default();
935            let version = bus
936                .read::<ranvier_core::schematic::Schematic>()
937                .map(|s| s.schema_version.clone())
938                .unwrap_or_default();
939
940            persist_execution_event(
941                handle,
942                &trace_id,
943                &circuit,
944                &version,
945                step_idx,
946                Some(node_id.to_string()),
947                outcome_kind_name(&result),
948                Some(result.to_json_value()),
949            )
950            .await;
951        }
952    }
953
954    // DLQ reporting for compensated steps
955    if let Outcome::Fault(f) = &result
956        && let (Some(sink), Some(policy)) =
957            (bus.read::<Arc<dyn DlqSink>>(), bus.read::<DlqPolicy>())
958    {
959        let should_dlq = !matches!(policy, DlqPolicy::Drop);
960        if should_dlq {
961            let trace_id = persistence_trace_id(bus);
962            let circuit = bus
963                .read::<ranvier_core::schematic::Schematic>()
964                .map(|s| s.name.clone())
965                .unwrap_or_default();
966            let _ = sink
967                .store_dead_letter(
968                    &trace_id,
969                    &circuit,
970                    node_id,
971                    &format!("{:?}", f),
972                    &serde_json::to_vec(&f).unwrap_or_default(),
973                )
974                .await;
975        }
976    }
977
978    result
979}
980
981#[allow(clippy::too_many_arguments)]
982pub async fn persist_execution_event(
983    handle: &PersistenceHandle,
984    trace_id: &str,
985    circuit: &str,
986    schematic_version: &str,
987    step: u64,
988    node_id: Option<String>,
989    outcome_kind: &str,
990    payload: Option<serde_json::Value>,
991) {
992    let store = handle.store();
993    let envelope = PersistenceEnvelope {
994        trace_id: trace_id.to_string(),
995        circuit: circuit.to_string(),
996        schematic_version: schematic_version.to_string(),
997        step,
998        node_id,
999        outcome_kind: outcome_kind.to_string(),
1000        timestamp_ms: now_ms(),
1001        payload_hash: None,
1002        payload,
1003    };
1004
1005    if let Err(err) = store.append(envelope).await {
1006        tracing::warn!(
1007            trace_id = %trace_id,
1008            circuit = %circuit,
1009            step,
1010            outcome_kind = %outcome_kind,
1011            error = %err,
1012            "Failed to append persistence envelope"
1013        );
1014    }
1015}
1016
1017async fn persist_completion(
1018    handle: &PersistenceHandle,
1019    trace_id: &str,
1020    completion: CompletionState,
1021) {
1022    let store = handle.store();
1023    if let Err(err) = store.complete(trace_id, completion).await {
1024        tracing::warn!(
1025            trace_id = %trace_id,
1026            error = %err,
1027            "Failed to complete persistence trace"
1028        );
1029    }
1030}
1031
1032fn compensation_idempotency_key(context: &CompensationContext) -> String {
1033    format!(
1034        "{}:{}:{}",
1035        context.trace_id, context.circuit, context.fault_kind
1036    )
1037}
1038
1039async fn run_compensation(
1040    handle: &CompensationHandle,
1041    context: CompensationContext,
1042    retry_policy: CompensationRetryPolicy,
1043    idempotency: Option<CompensationIdempotencyHandle>,
1044) -> bool {
1045    let hook = handle.hook();
1046    let key = compensation_idempotency_key(&context);
1047
1048    if let Some(store_handle) = idempotency.as_ref() {
1049        let store = store_handle.store();
1050        match store.was_compensated(&key).await {
1051            Ok(true) => {
1052                tracing::info!(
1053                    trace_id = %context.trace_id,
1054                    circuit = %context.circuit,
1055                    key = %key,
1056                    "Compensation already recorded; skipping duplicate hook execution"
1057                );
1058                return true;
1059            }
1060            Ok(false) => {}
1061            Err(err) => {
1062                tracing::warn!(
1063                    trace_id = %context.trace_id,
1064                    key = %key,
1065                    error = %err,
1066                    "Failed to check compensation idempotency state"
1067                );
1068            }
1069        }
1070    }
1071
1072    let max_attempts = retry_policy.max_attempts.max(1);
1073    for attempt in 1..=max_attempts {
1074        match hook.compensate(context.clone()).await {
1075            Ok(()) => {
1076                if let Some(store_handle) = idempotency.as_ref() {
1077                    let store = store_handle.store();
1078                    if let Err(err) = store.mark_compensated(&key).await {
1079                        tracing::warn!(
1080                            trace_id = %context.trace_id,
1081                            key = %key,
1082                            error = %err,
1083                            "Failed to mark compensation idempotency state"
1084                        );
1085                    }
1086                }
1087                return true;
1088            }
1089            Err(err) => {
1090                let is_last = attempt == max_attempts;
1091                tracing::warn!(
1092                    trace_id = %context.trace_id,
1093                    circuit = %context.circuit,
1094                    fault_kind = %context.fault_kind,
1095                    fault_step = context.fault_step,
1096                    attempt,
1097                    max_attempts,
1098                    error = %err,
1099                    "Compensation hook attempt failed"
1100                );
1101                if !is_last && retry_policy.backoff_ms > 0 {
1102                    tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
1103                        .await;
1104                }
1105            }
1106        }
1107    }
1108    false
1109}
1110
1111fn ensure_timeline(bus: &mut Bus) -> bool {
1112    if bus.has::<Timeline>() {
1113        false
1114    } else {
1115        bus.insert(Timeline::new());
1116        true
1117    }
1118}
1119
1120fn should_attach_timeline(bus: &Bus) -> bool {
1121    // Respect explicitly provided timeline collector from caller.
1122    if bus.has::<Timeline>() {
1123        return true;
1124    }
1125
1126    // Attach timeline when runtime export path exists.
1127    has_timeline_output_path()
1128}
1129
1130fn has_timeline_output_path() -> bool {
1131    std::env::var("RANVIER_TIMELINE_OUTPUT")
1132        .ok()
1133        .map(|v| !v.trim().is_empty())
1134        .unwrap_or(false)
1135}
1136
1137fn timeline_sample_rate() -> f64 {
1138    std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
1139        .ok()
1140        .and_then(|v| v.parse::<f64>().ok())
1141        .map(|v| v.clamp(0.0, 1.0))
1142        .unwrap_or(1.0)
1143}
1144
1145fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
1146    if rate <= 0.0 {
1147        return false;
1148    }
1149    if rate >= 1.0 {
1150        return true;
1151    }
1152    let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
1153    bucket < rate
1154}
1155
1156fn timeline_adaptive_policy() -> String {
1157    std::env::var("RANVIER_TIMELINE_ADAPTIVE")
1158        .unwrap_or_else(|_| "fault_branch".to_string())
1159        .to_ascii_lowercase()
1160}
1161
1162fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
1163    match policy {
1164        "off" => false,
1165        "fault_only" => matches!(outcome, Outcome::Fault(_)),
1166        "fault_branch_emit" => {
1167            matches!(
1168                outcome,
1169                Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
1170            )
1171        }
1172        _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
1173    }
1174}
1175
1176#[derive(Default, Clone)]
1177struct SamplingStats {
1178    total_decisions: u64,
1179    exported: u64,
1180    skipped: u64,
1181    sampled_exports: u64,
1182    forced_exports: u64,
1183    last_mode: String,
1184    last_policy: String,
1185    last_updated_ms: u64,
1186}
1187
1188static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
1189
1190fn stats_cell() -> &'static Mutex<SamplingStats> {
1191    TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
1192}
1193
1194fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
1195    let snapshot = {
1196        let mut stats = match stats_cell().lock() {
1197            Ok(guard) => guard,
1198            Err(_) => return,
1199        };
1200
1201        stats.total_decisions += 1;
1202        if exported {
1203            stats.exported += 1;
1204        } else {
1205            stats.skipped += 1;
1206        }
1207        if sampled && exported {
1208            stats.sampled_exports += 1;
1209        }
1210        if forced && exported {
1211            stats.forced_exports += 1;
1212        }
1213        stats.last_mode = mode.to_string();
1214        stats.last_policy = policy.to_string();
1215        stats.last_updated_ms = now_ms();
1216        stats.clone()
1217    };
1218
1219    tracing::debug!(
1220        ranvier.timeline.total_decisions = snapshot.total_decisions,
1221        ranvier.timeline.exported = snapshot.exported,
1222        ranvier.timeline.skipped = snapshot.skipped,
1223        ranvier.timeline.sampled_exports = snapshot.sampled_exports,
1224        ranvier.timeline.forced_exports = snapshot.forced_exports,
1225        ranvier.timeline.mode = %snapshot.last_mode,
1226        ranvier.timeline.policy = %snapshot.last_policy,
1227        "Timeline sampling stats updated"
1228    );
1229
1230    if let Some(path) = timeline_stats_output_path() {
1231        let payload = serde_json::json!({
1232            "total_decisions": snapshot.total_decisions,
1233            "exported": snapshot.exported,
1234            "skipped": snapshot.skipped,
1235            "sampled_exports": snapshot.sampled_exports,
1236            "forced_exports": snapshot.forced_exports,
1237            "last_mode": snapshot.last_mode,
1238            "last_policy": snapshot.last_policy,
1239            "last_updated_ms": snapshot.last_updated_ms
1240        });
1241        if let Some(parent) = Path::new(&path).parent() {
1242            let _ = fs::create_dir_all(parent);
1243        }
1244        if let Err(err) = fs::write(&path, payload.to_string()) {
1245            tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
1246        }
1247    }
1248}
1249
1250fn timeline_stats_output_path() -> Option<String> {
1251    std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
1252        .ok()
1253        .filter(|v| !v.trim().is_empty())
1254}
1255
1256fn write_timeline_with_policy(
1257    path: &str,
1258    mode: &str,
1259    mut timeline: Timeline,
1260) -> Result<(), String> {
1261    match mode {
1262        "append" => {
1263            if Path::new(path).exists() {
1264                let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
1265                match serde_json::from_str::<Timeline>(&content) {
1266                    Ok(mut existing) => {
1267                        existing.events.append(&mut timeline.events);
1268                        existing.sort();
1269                        if let Some(max_events) = max_events_limit() {
1270                            truncate_timeline_events(&mut existing, max_events);
1271                        }
1272                        write_timeline_json(path, &existing)
1273                    }
1274                    Err(_) => {
1275                        // Fallback: if existing is invalid, replace with current timeline
1276                        if let Some(max_events) = max_events_limit() {
1277                            truncate_timeline_events(&mut timeline, max_events);
1278                        }
1279                        write_timeline_json(path, &timeline)
1280                    }
1281                }
1282            } else {
1283                if let Some(max_events) = max_events_limit() {
1284                    truncate_timeline_events(&mut timeline, max_events);
1285                }
1286                write_timeline_json(path, &timeline)
1287            }
1288        }
1289        "rotate" => {
1290            let rotated_path = rotated_path(path, now_ms());
1291            write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
1292            if let Some(keep) = rotate_keep_limit() {
1293                cleanup_rotated_files(path, keep)?;
1294            }
1295            Ok(())
1296        }
1297        _ => write_timeline_json(path, &timeline),
1298    }
1299}
1300
1301fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
1302    if let Some(parent) = Path::new(path).parent()
1303        && !parent.as_os_str().is_empty()
1304    {
1305        fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1306    }
1307    let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
1308    fs::write(path, json).map_err(|e| e.to_string())
1309}
1310
1311fn rotated_path(path: &str, suffix: u64) -> PathBuf {
1312    let p = Path::new(path);
1313    let parent = p.parent().unwrap_or_else(|| Path::new(""));
1314    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
1315    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
1316    parent.join(format!("{}_{}.{}", stem, suffix, ext))
1317}
1318
1319fn max_events_limit() -> Option<usize> {
1320    std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
1321        .ok()
1322        .and_then(|v| v.parse::<usize>().ok())
1323        .filter(|v| *v > 0)
1324}
1325
1326fn rotate_keep_limit() -> Option<usize> {
1327    std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
1328        .ok()
1329        .and_then(|v| v.parse::<usize>().ok())
1330        .filter(|v| *v > 0)
1331}
1332
1333fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
1334    let len = timeline.events.len();
1335    if len > max_events {
1336        let keep_from = len - max_events;
1337        timeline.events = timeline.events.split_off(keep_from);
1338    }
1339}
1340
1341fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
1342    let p = Path::new(base_path);
1343    let parent = p.parent().unwrap_or_else(|| Path::new("."));
1344    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
1345    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
1346    let prefix = format!("{}_", stem);
1347    let suffix = format!(".{}", ext);
1348
1349    let mut files = fs::read_dir(parent)
1350        .map_err(|e| e.to_string())?
1351        .filter_map(|entry| entry.ok())
1352        .filter(|entry| {
1353            let name = entry.file_name();
1354            let name = name.to_string_lossy();
1355            name.starts_with(&prefix) && name.ends_with(&suffix)
1356        })
1357        .map(|entry| {
1358            let modified = entry
1359                .metadata()
1360                .ok()
1361                .and_then(|m| m.modified().ok())
1362                .unwrap_or(SystemTime::UNIX_EPOCH);
1363            (entry.path(), modified)
1364        })
1365        .collect::<Vec<_>>();
1366
1367    files.sort_by(|a, b| b.1.cmp(&a.1));
1368    for (path, _) in files.into_iter().skip(keep) {
1369        let _ = fs::remove_file(path);
1370    }
1371    Ok(())
1372}
1373
1374fn bus_capability_schema_from_policy(
1375    policy: Option<ranvier_core::bus::BusAccessPolicy>,
1376) -> Option<BusCapabilitySchema> {
1377    let policy = policy?;
1378
1379    let mut allow = policy
1380        .allow
1381        .unwrap_or_default()
1382        .into_iter()
1383        .map(|entry| entry.type_name.to_string())
1384        .collect::<Vec<_>>();
1385    let mut deny = policy
1386        .deny
1387        .into_iter()
1388        .map(|entry| entry.type_name.to_string())
1389        .collect::<Vec<_>>();
1390    allow.sort();
1391    deny.sort();
1392
1393    if allow.is_empty() && deny.is_empty() {
1394        return None;
1395    }
1396
1397    Some(BusCapabilitySchema { allow, deny })
1398}
1399
1400fn now_ms() -> u64 {
1401    SystemTime::now()
1402        .duration_since(UNIX_EPOCH)
1403        .map(|d| d.as_millis() as u64)
1404        .unwrap_or(0)
1405}
1406
1407#[cfg(test)]
1408mod tests {
1409    use super::{
1410        Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
1411        should_force_export,
1412    };
1413    use crate::persistence::{
1414        CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
1415        CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
1416        InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
1417        PersistenceHandle, PersistenceStore, PersistenceTraceId,
1418    };
1419    use anyhow::Result;
1420    use async_trait::async_trait;
1421    use ranvier_audit::{AuditError, AuditEvent, AuditSink};
1422    use ranvier_core::event::{DlqPolicy, DlqSink};
1423    use ranvier_core::saga::SagaStack;
1424    use ranvier_core::timeline::{Timeline, TimelineEvent};
1425    use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
1426    use serde::{Deserialize, Serialize};
1427    use std::sync::Arc;
1428    use tokio::sync::Mutex;
1429    use uuid::Uuid;
1430
1431    struct MockAuditSink {
1432        events: Arc<Mutex<Vec<AuditEvent>>>,
1433    }
1434
1435    #[async_trait]
1436    impl AuditSink for MockAuditSink {
1437        async fn append(&self, event: &AuditEvent) -> Result<(), AuditError> {
1438            self.events.lock().await.push(event.clone());
1439            Ok(())
1440        }
1441    }
1442
1443    #[tokio::test]
1444    async fn execute_logs_audit_events_for_intervention() {
1445        use ranvier_inspector::StateInspector;
1446
1447        let trace_id = "test-audit-trace";
1448        let store_impl = InMemoryPersistenceStore::new();
1449        let events = Arc::new(Mutex::new(Vec::new()));
1450        let sink = MockAuditSink {
1451            events: events.clone(),
1452        };
1453
1454        let axon = Axon::<i32, i32, TestInfallible>::start("AuditTest")
1455            .then(AddOne)
1456            .with_persistence_store(store_impl.clone())
1457            .with_audit_sink(sink);
1458
1459        let mut bus = Bus::new();
1460        bus.insert(PersistenceHandle::from_arc(Arc::new(store_impl.clone())));
1461        bus.insert(PersistenceTraceId::new(trace_id));
1462        let target_node_id = axon.schematic.nodes[0].id.clone();
1463
1464        // 0. Pre-requisite: Save an initial trace state so intervention has a target to attach to
1465        store_impl
1466            .append(crate::persistence::PersistenceEnvelope {
1467                trace_id: trace_id.to_string(),
1468                circuit: "AuditTest".to_string(),
1469                schematic_version: "v1.0".to_string(),
1470                step: 0,
1471                node_id: None,
1472                outcome_kind: "Next".to_string(),
1473                timestamp_ms: 0,
1474                payload_hash: None,
1475                payload: None,
1476            })
1477            .await
1478            .unwrap();
1479
1480        // 1. Trigger force_resume (should log ForceResume)
1481        axon.force_resume(trace_id, &target_node_id, None)
1482            .await
1483            .unwrap();
1484
1485        // 2. Execute (should log ApplyIntervention)
1486        axon.execute(10, &(), &mut bus).await;
1487
1488        let recorded = events.lock().await;
1489        assert_eq!(
1490            recorded.len(),
1491            2,
1492            "Should have 2 audit events: ForceResume and ApplyIntervention"
1493        );
1494        assert_eq!(recorded[0].action, "ForceResume");
1495        assert_eq!(recorded[0].target, trace_id);
1496        assert_eq!(recorded[1].action, "ApplyIntervention");
1497        assert_eq!(recorded[1].target, trace_id);
1498    }
1499
1500    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1501    pub enum TestInfallible {}
1502
1503    #[test]
1504    fn inspector_enabled_flag_matrix() {
1505        assert!(inspector_enabled_from_value(None));
1506        assert!(inspector_enabled_from_value(Some("1")));
1507        assert!(inspector_enabled_from_value(Some("true")));
1508        assert!(inspector_enabled_from_value(Some("on")));
1509        assert!(!inspector_enabled_from_value(Some("0")));
1510        assert!(!inspector_enabled_from_value(Some("false")));
1511    }
1512
1513    #[test]
1514    fn inspector_dev_mode_matrix() {
1515        assert!(inspector_dev_mode_from_value(None));
1516        assert!(inspector_dev_mode_from_value(Some("dev")));
1517        assert!(inspector_dev_mode_from_value(Some("staging")));
1518        assert!(!inspector_dev_mode_from_value(Some("prod")));
1519        assert!(!inspector_dev_mode_from_value(Some("production")));
1520    }
1521
1522    #[test]
1523    fn adaptive_policy_force_export_matrix() {
1524        let next = Outcome::<(), &'static str>::Next(());
1525        let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
1526        let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
1527        let fault = Outcome::<(), &'static str>::Fault("boom");
1528
1529        assert!(!should_force_export(&next, "off"));
1530        assert!(!should_force_export(&fault, "off"));
1531
1532        assert!(!should_force_export(&branch, "fault_only"));
1533        assert!(should_force_export(&fault, "fault_only"));
1534
1535        assert!(should_force_export(&branch, "fault_branch"));
1536        assert!(!should_force_export(&emit, "fault_branch"));
1537        assert!(should_force_export(&fault, "fault_branch"));
1538
1539        assert!(should_force_export(&branch, "fault_branch_emit"));
1540        assert!(should_force_export(&emit, "fault_branch_emit"));
1541        assert!(should_force_export(&fault, "fault_branch_emit"));
1542    }
1543
1544    #[test]
1545    fn sampling_and_adaptive_combination_decisions() {
1546        let bus_id = Uuid::nil();
1547        let next = Outcome::<(), &'static str>::Next(());
1548        let fault = Outcome::<(), &'static str>::Fault("boom");
1549
1550        let sampled_never = sampled_by_bus_id(bus_id, 0.0);
1551        assert!(!sampled_never);
1552        assert!(!(sampled_never || should_force_export(&next, "off")));
1553        assert!(sampled_never || should_force_export(&fault, "fault_only"));
1554
1555        let sampled_always = sampled_by_bus_id(bus_id, 1.0);
1556        assert!(sampled_always);
1557        assert!(sampled_always || should_force_export(&next, "off"));
1558        assert!(sampled_always || should_force_export(&fault, "off"));
1559    }
1560
1561    #[derive(Clone)]
1562    struct AddOne;
1563
1564    #[async_trait]
1565    impl Transition<i32, i32> for AddOne {
1566        type Error = TestInfallible;
1567        type Resources = ();
1568
1569        async fn run(
1570            &self,
1571            state: i32,
1572            _resources: &Self::Resources,
1573            _bus: &mut Bus,
1574        ) -> Outcome<i32, Self::Error> {
1575            Outcome::Next(state + 1)
1576        }
1577    }
1578
1579    #[derive(Clone)]
1580    struct AlwaysFault;
1581
1582    #[async_trait]
1583    impl Transition<i32, i32> for AlwaysFault {
1584        type Error = String;
1585        type Resources = ();
1586
1587        async fn run(
1588            &self,
1589            _state: i32,
1590            _resources: &Self::Resources,
1591            _bus: &mut Bus,
1592        ) -> Outcome<i32, Self::Error> {
1593            Outcome::Fault("boom".to_string())
1594        }
1595    }
1596
1597    #[derive(Clone)]
1598    struct CapabilityGuarded;
1599
1600    #[async_trait]
1601    impl Transition<(), ()> for CapabilityGuarded {
1602        type Error = String;
1603        type Resources = ();
1604
1605        fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
1606            Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
1607        }
1608
1609        async fn run(
1610            &self,
1611            _state: (),
1612            _resources: &Self::Resources,
1613            bus: &mut Bus,
1614        ) -> Outcome<(), Self::Error> {
1615            match bus.get::<String>() {
1616                Ok(_) => Outcome::Next(()),
1617                Err(err) => Outcome::Fault(err.to_string()),
1618            }
1619        }
1620    }
1621
1622    #[derive(Clone)]
1623    struct RecordingCompensationHook {
1624        calls: Arc<Mutex<Vec<CompensationContext>>>,
1625        should_fail: bool,
1626    }
1627
1628    #[async_trait]
1629    impl CompensationHook for RecordingCompensationHook {
1630        async fn compensate(&self, context: CompensationContext) -> Result<()> {
1631            self.calls.lock().await.push(context);
1632            if self.should_fail {
1633                return Err(anyhow::anyhow!("compensation failed"));
1634            }
1635            Ok(())
1636        }
1637    }
1638
1639    #[derive(Clone)]
1640    struct FlakyCompensationHook {
1641        calls: Arc<Mutex<u32>>,
1642        failures_remaining: Arc<Mutex<u32>>,
1643    }
1644
1645    #[async_trait]
1646    impl CompensationHook for FlakyCompensationHook {
1647        async fn compensate(&self, _context: CompensationContext) -> Result<()> {
1648            {
1649                let mut calls = self.calls.lock().await;
1650                *calls += 1;
1651            }
1652            let mut failures_remaining = self.failures_remaining.lock().await;
1653            if *failures_remaining > 0 {
1654                *failures_remaining -= 1;
1655                return Err(anyhow::anyhow!("transient compensation failure"));
1656            }
1657            Ok(())
1658        }
1659    }
1660
1661    #[derive(Clone)]
1662    struct FailingCompensationIdempotencyStore {
1663        read_calls: Arc<Mutex<u32>>,
1664        write_calls: Arc<Mutex<u32>>,
1665    }
1666
1667    #[async_trait]
1668    impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
1669        async fn was_compensated(&self, _key: &str) -> Result<bool> {
1670            let mut read_calls = self.read_calls.lock().await;
1671            *read_calls += 1;
1672            Err(anyhow::anyhow!("forced idempotency read failure"))
1673        }
1674
1675        async fn mark_compensated(&self, _key: &str) -> Result<()> {
1676            let mut write_calls = self.write_calls.lock().await;
1677            *write_calls += 1;
1678            Err(anyhow::anyhow!("forced idempotency write failure"))
1679        }
1680    }
1681
1682    #[tokio::test]
1683    async fn execute_persists_success_trace_when_handle_exists() {
1684        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1685        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1686
1687        let mut bus = Bus::new();
1688        bus.insert(PersistenceHandle::from_arc(store_dyn));
1689        bus.insert(PersistenceTraceId::new("trace-success"));
1690
1691        let axon = Axon::<i32, i32, TestInfallible>::start("PersistSuccess").then(AddOne);
1692        let outcome = axon.execute(41, &(), &mut bus).await;
1693        assert!(matches!(outcome, Outcome::Next(42)));
1694
1695        let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
1696        assert_eq!(persisted.events.len(), 3); // Enter + step-level Next + final Next
1697        assert_eq!(persisted.events[0].outcome_kind, "Enter");
1698        assert_eq!(persisted.events[1].outcome_kind, "Next"); // step-level
1699        assert_eq!(persisted.events[2].outcome_kind, "Next"); // final
1700        assert_eq!(persisted.completion, Some(CompletionState::Success));
1701    }
1702
1703    #[tokio::test]
1704    async fn execute_persists_fault_completion_state() {
1705        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1706        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1707
1708        let mut bus = Bus::new();
1709        bus.insert(PersistenceHandle::from_arc(store_dyn));
1710        bus.insert(PersistenceTraceId::new("trace-fault"));
1711
1712        let axon = Axon::<i32, i32, String>::start("PersistFault").then(AlwaysFault);
1713        let outcome = axon.execute(41, &(), &mut bus).await;
1714        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
1715
1716        let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
1717        assert_eq!(persisted.events.len(), 3); // Enter + step-level Fault + final Fault
1718        assert_eq!(persisted.events[1].outcome_kind, "Fault"); // step-level
1719        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
1720        assert_eq!(persisted.completion, Some(CompletionState::Fault));
1721    }
1722
1723    #[tokio::test]
1724    async fn execute_respects_persistence_auto_complete_off() {
1725        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1726        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1727
1728        let mut bus = Bus::new();
1729        bus.insert(PersistenceHandle::from_arc(store_dyn));
1730        bus.insert(PersistenceTraceId::new("trace-no-complete"));
1731        bus.insert(PersistenceAutoComplete(false));
1732
1733        let axon = Axon::<i32, i32, TestInfallible>::start("PersistNoComplete").then(AddOne);
1734        let outcome = axon.execute(1, &(), &mut bus).await;
1735        assert!(matches!(outcome, Outcome::Next(2)));
1736
1737        let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
1738        assert_eq!(persisted.events.len(), 3); // Enter + step-level Next + final Next
1739        assert_eq!(persisted.completion, None);
1740    }
1741
1742    #[tokio::test]
1743    async fn fault_triggers_compensation_and_marks_compensated() {
1744        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1745        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1746        let calls = Arc::new(Mutex::new(Vec::new()));
1747        let compensation = RecordingCompensationHook {
1748            calls: calls.clone(),
1749            should_fail: false,
1750        };
1751
1752        let mut bus = Bus::new();
1753        bus.insert(PersistenceHandle::from_arc(store_dyn));
1754        bus.insert(PersistenceTraceId::new("trace-compensated"));
1755        bus.insert(CompensationHandle::from_hook(compensation));
1756
1757        let axon = Axon::<i32, i32, String>::start("CompensatedFault").then(AlwaysFault);
1758        let outcome = axon.execute(7, &(), &mut bus).await;
1759        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
1760
1761        let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
1762        assert_eq!(persisted.events.len(), 4); // Enter + step-level Fault + final Fault + Compensated
1763        assert_eq!(persisted.events[0].outcome_kind, "Enter");
1764        assert_eq!(persisted.events[1].outcome_kind, "Fault"); // step-level
1765        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
1766        assert_eq!(persisted.events[3].outcome_kind, "Compensated");
1767        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
1768
1769        let recorded = calls.lock().await;
1770        assert_eq!(recorded.len(), 1);
1771        assert_eq!(recorded[0].trace_id, "trace-compensated");
1772        assert_eq!(recorded[0].fault_kind, "Fault");
1773    }
1774
1775    #[tokio::test]
1776    async fn failed_compensation_keeps_fault_completion() {
1777        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1778        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1779        let calls = Arc::new(Mutex::new(Vec::new()));
1780        let compensation = RecordingCompensationHook {
1781            calls: calls.clone(),
1782            should_fail: true,
1783        };
1784
1785        let mut bus = Bus::new();
1786        bus.insert(PersistenceHandle::from_arc(store_dyn));
1787        bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
1788        bus.insert(CompensationHandle::from_hook(compensation));
1789
1790        let axon = Axon::<i32, i32, String>::start("CompensationFails").then(AlwaysFault);
1791        let outcome = axon.execute(7, &(), &mut bus).await;
1792        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
1793
1794        let persisted = store_impl
1795            .load("trace-compensation-failed")
1796            .await
1797            .unwrap()
1798            .unwrap();
1799        assert_eq!(persisted.events.len(), 3); // Enter + step-level Fault + final Fault
1800        assert_eq!(persisted.events[2].outcome_kind, "Fault"); // final
1801        assert_eq!(persisted.completion, Some(CompletionState::Fault));
1802
1803        let recorded = calls.lock().await;
1804        assert_eq!(recorded.len(), 1);
1805    }
1806
1807    #[tokio::test]
1808    async fn compensation_retry_policy_succeeds_after_retries() {
1809        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1810        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1811        let calls = Arc::new(Mutex::new(0u32));
1812        let failures_remaining = Arc::new(Mutex::new(2u32));
1813        let compensation = FlakyCompensationHook {
1814            calls: calls.clone(),
1815            failures_remaining,
1816        };
1817
1818        let mut bus = Bus::new();
1819        bus.insert(PersistenceHandle::from_arc(store_dyn));
1820        bus.insert(PersistenceTraceId::new("trace-retry-success"));
1821        bus.insert(CompensationHandle::from_hook(compensation));
1822        bus.insert(CompensationRetryPolicy {
1823            max_attempts: 3,
1824            backoff_ms: 0,
1825        });
1826
1827        let axon = Axon::<i32, i32, String>::start("CompensationRetry").then(AlwaysFault);
1828        let outcome = axon.execute(7, &(), &mut bus).await;
1829        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
1830
1831        let persisted = store_impl
1832            .load("trace-retry-success")
1833            .await
1834            .unwrap()
1835            .unwrap();
1836        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
1837        assert_eq!(
1838            persisted.events.last().map(|e| e.outcome_kind.as_str()),
1839            Some("Compensated")
1840        );
1841
1842        let attempt_count = calls.lock().await;
1843        assert_eq!(*attempt_count, 3);
1844    }
1845
1846    #[tokio::test]
1847    async fn compensation_idempotency_skips_duplicate_hook_execution() {
1848        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1849        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1850        let calls = Arc::new(Mutex::new(Vec::new()));
1851        let compensation = RecordingCompensationHook {
1852            calls: calls.clone(),
1853            should_fail: false,
1854        };
1855        let idempotency = InMemoryCompensationIdempotencyStore::new();
1856
1857        let mut bus = Bus::new();
1858        bus.insert(PersistenceHandle::from_arc(store_dyn));
1859        bus.insert(PersistenceTraceId::new("trace-idempotent"));
1860        bus.insert(PersistenceAutoComplete(false));
1861        bus.insert(CompensationHandle::from_hook(compensation));
1862        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
1863
1864        let axon = Axon::<i32, i32, String>::start("CompensationIdempotency").then(AlwaysFault);
1865
1866        let outcome1 = axon.execute(7, &(), &mut bus).await;
1867        let outcome2 = axon.execute(8, &(), &mut bus).await;
1868        assert!(matches!(outcome1, Outcome::Fault(msg) if msg == "boom"));
1869        assert!(matches!(outcome2, Outcome::Fault(msg) if msg == "boom"));
1870
1871        let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
1872        assert_eq!(persisted.completion, None);
1873        // Verify that "Compensated" events are present for both executions
1874        let compensated_count = persisted
1875            .events
1876            .iter()
1877            .filter(|e| e.outcome_kind == "Compensated")
1878            .count();
1879        assert_eq!(
1880            compensated_count, 2,
1881            "Should have 2 Compensated events (one per execution)"
1882        );
1883
1884        let recorded = calls.lock().await;
1885        assert_eq!(recorded.len(), 1);
1886    }
1887
1888    #[tokio::test]
1889    async fn compensation_idempotency_store_failure_does_not_block_compensation() {
1890        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1891        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1892        let calls = Arc::new(Mutex::new(Vec::new()));
1893        let read_calls = Arc::new(Mutex::new(0u32));
1894        let write_calls = Arc::new(Mutex::new(0u32));
1895        let compensation = RecordingCompensationHook {
1896            calls: calls.clone(),
1897            should_fail: false,
1898        };
1899        let idempotency = FailingCompensationIdempotencyStore {
1900            read_calls: read_calls.clone(),
1901            write_calls: write_calls.clone(),
1902        };
1903
1904        let mut bus = Bus::new();
1905        bus.insert(PersistenceHandle::from_arc(store_dyn));
1906        bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
1907        bus.insert(CompensationHandle::from_hook(compensation));
1908        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
1909
1910        let axon = Axon::<i32, i32, String>::start("IdempotencyStoreFailure").then(AlwaysFault);
1911        let outcome = axon.execute(9, &(), &mut bus).await;
1912        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
1913
1914        let persisted = store_impl
1915            .load("trace-idempotency-store-failure")
1916            .await
1917            .unwrap()
1918            .unwrap();
1919        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
1920        assert_eq!(
1921            persisted.events.last().map(|e| e.outcome_kind.as_str()),
1922            Some("Compensated")
1923        );
1924
1925        let recorded = calls.lock().await;
1926        assert_eq!(recorded.len(), 1);
1927        assert_eq!(*read_calls.lock().await, 1);
1928        assert_eq!(*write_calls.lock().await, 1);
1929    }
1930
1931    #[tokio::test]
1932    async fn transition_bus_policy_blocks_unauthorized_resource_access() {
1933        let mut bus = Bus::new();
1934        bus.insert(1_i32);
1935        bus.insert("secret".to_string());
1936
1937        let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
1938        let outcome = axon.execute((), &(), &mut bus).await;
1939
1940        match outcome {
1941            Outcome::Fault(msg) => {
1942                assert!(msg.contains("Bus access denied"), "{msg}");
1943                assert!(msg.contains("CapabilityGuarded"), "{msg}");
1944                assert!(msg.contains("alloc::string::String"), "{msg}");
1945            }
1946            other => panic!("expected fault, got {other:?}"),
1947        }
1948    }
1949
1950    #[tokio::test]
1951    async fn execute_fails_on_version_mismatch_without_migration() {
1952        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1953        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1954
1955        let trace_id = "v-mismatch";
1956        // Create an existing trace with an older version
1957        let old_envelope = crate::persistence::PersistenceEnvelope {
1958            trace_id: trace_id.to_string(),
1959            circuit: "TestCircuit".to_string(),
1960            schematic_version: "0.9".to_string(),
1961            step: 0,
1962            node_id: None,
1963            outcome_kind: "Enter".to_string(),
1964            timestamp_ms: 0,
1965            payload_hash: None,
1966            payload: None,
1967        };
1968        store_impl.append(old_envelope).await.unwrap();
1969
1970        let mut bus = Bus::new();
1971        bus.insert(PersistenceHandle::from_arc(store_dyn));
1972        bus.insert(PersistenceTraceId::new(trace_id));
1973
1974        // Current axon is version 1.0
1975        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
1976        let outcome = axon.execute(10, &(), &mut bus).await;
1977
1978        if let Outcome::Emit(kind, _) = outcome {
1979            assert_eq!(kind, "execution.resumption.version_mismatch_failed");
1980        } else {
1981            panic!("Expected version mismatch emission, got {:?}", outcome);
1982        }
1983    }
1984
1985    #[tokio::test]
1986    async fn execute_resumes_from_start_on_migration_strategy() {
1987        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1988        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1989
1990        let trace_id = "v-migration";
1991        // Create an existing trace with an older version at step 5
1992        let old_envelope = crate::persistence::PersistenceEnvelope {
1993            trace_id: trace_id.to_string(),
1994            circuit: "TestCircuit".to_string(),
1995            schematic_version: "0.9".to_string(),
1996            step: 5,
1997            node_id: None,
1998            outcome_kind: "Next".to_string(),
1999            timestamp_ms: 0,
2000            payload_hash: None,
2001            payload: None,
2002        };
2003        store_impl.append(old_envelope).await.unwrap();
2004
2005        let mut registry = ranvier_core::schematic::MigrationRegistry::new("TestCircuit");
2006        registry.register(ranvier_core::schematic::SnapshotMigration {
2007            name: Some("v0.9 to v1.0".to_string()),
2008            from_version: "0.9".to_string(),
2009            to_version: "1.0".to_string(),
2010            default_strategy: ranvier_core::schematic::MigrationStrategy::ResumeFromStart,
2011            node_mapping: std::collections::HashMap::new(),
2012            payload_mapper: None,
2013        });
2014
2015        let mut bus = Bus::new();
2016        bus.insert(PersistenceHandle::from_arc(store_dyn));
2017        bus.insert(PersistenceTraceId::new(trace_id));
2018        bus.insert(registry);
2019
2020        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
2021        let outcome = axon.execute(10, &(), &mut bus).await;
2022
2023        // Should have resumed from start (step 0), resulting in 11
2024        assert!(matches!(outcome, Outcome::Next(11)));
2025
2026        // Verify new event has current version
2027        let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
2028        assert_eq!(persisted.schematic_version, "1.0");
2029    }
2030
2031    #[tokio::test]
2032    async fn execute_applies_manual_intervention_jump_and_payload() {
2033        let store_impl = Arc::new(InMemoryPersistenceStore::new());
2034        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
2035
2036        let trace_id = "intervention-test";
2037        // 1. Run a normal trace part-way
2038        let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit")
2039            .then(AddOne)
2040            .then(AddOne);
2041
2042        let mut bus = Bus::new();
2043        bus.insert(PersistenceHandle::from_arc(store_dyn));
2044        bus.insert(PersistenceTraceId::new(trace_id));
2045
2046        // Save an intervention: Jump to the second 'AddOne' node (which has the label 'AddOne')
2047        // with a payload override of 100.
2048        // The first node is 'AddOne', the second is ALSO 'AddOne'.
2049        // Schematic position: 0=Ingress, 1=AddOne, 2=AddOne
2050        let _target_node_label = "AddOne";
2051        // To be precise, let's find the ID of the second node.
2052        let target_node_id = axon.schematic.nodes[2].id.clone();
2053
2054        // Pre-seed an initial trace entry so save_intervention doesn't fail
2055        store_impl
2056            .append(crate::persistence::PersistenceEnvelope {
2057                trace_id: trace_id.to_string(),
2058                circuit: "TestCircuit".to_string(),
2059                schematic_version: "1.0".to_string(),
2060                step: 0,
2061                node_id: None,
2062                outcome_kind: "Enter".to_string(),
2063                timestamp_ms: 0,
2064                payload_hash: None,
2065                payload: None,
2066            })
2067            .await
2068            .unwrap();
2069
2070        store_impl
2071            .save_intervention(
2072                trace_id,
2073                crate::persistence::Intervention {
2074                    target_node: target_node_id.clone(),
2075                    payload_override: Some(serde_json::json!(100)),
2076                    timestamp_ms: 0,
2077                },
2078            )
2079            .await
2080            .unwrap();
2081
2082        // 2. Execute. It should skip the first AddOne and use 100 for the second AddOne.
2083        // Result should be 100 + 1 = 101.
2084        let outcome = axon.execute(10, &(), &mut bus).await;
2085
2086        match outcome {
2087            Outcome::Next(val) => assert_eq!(val, 101, "Should have used payload 100 and added 1"),
2088            other => panic!("Expected Outcome::Next(101), got {:?}", other),
2089        }
2090
2091        // Verify the jump was logged in trace
2092        let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
2093        // The last event should be from the jump target's execution.
2094        assert_eq!(persisted.interventions.len(), 1);
2095        assert_eq!(persisted.interventions[0].target_node, target_node_id);
2096    }
2097
2098    // ── DLQ Retry Tests ──────────────────────────────────────────────
2099
2100    /// A transition that fails a configurable number of times before succeeding.
2101    #[derive(Clone)]
2102    struct FailNThenSucceed {
2103        remaining: Arc<tokio::sync::Mutex<u32>>,
2104    }
2105
2106    #[async_trait]
2107    impl Transition<i32, i32> for FailNThenSucceed {
2108        type Error = String;
2109        type Resources = ();
2110
2111        async fn run(
2112            &self,
2113            state: i32,
2114            _resources: &Self::Resources,
2115            _bus: &mut Bus,
2116        ) -> Outcome<i32, Self::Error> {
2117            let mut rem = self.remaining.lock().await;
2118            if *rem > 0 {
2119                *rem -= 1;
2120                Outcome::Fault("transient failure".to_string())
2121            } else {
2122                Outcome::Next(state + 1)
2123            }
2124        }
2125    }
2126
2127    /// A mock DLQ sink that records all dead letters.
2128    #[derive(Clone)]
2129    struct MockDlqSink {
2130        letters: Arc<tokio::sync::Mutex<Vec<String>>>,
2131    }
2132
2133    #[async_trait]
2134    impl DlqSink for MockDlqSink {
2135        async fn store_dead_letter(
2136            &self,
2137            workflow_id: &str,
2138            _circuit_label: &str,
2139            node_id: &str,
2140            error_msg: &str,
2141            _payload: &[u8],
2142        ) -> Result<(), String> {
2143            let entry = format!("{}:{}:{}", workflow_id, node_id, error_msg);
2144            self.letters.lock().await.push(entry);
2145            Ok(())
2146        }
2147    }
2148
2149    #[tokio::test]
2150    async fn retry_then_dlq_retries_and_succeeds_before_exhaustion() {
2151        // Fail 2 times, succeed on 3rd attempt. Policy allows 5 attempts.
2152        let remaining = Arc::new(tokio::sync::Mutex::new(2u32));
2153        let trans = FailNThenSucceed { remaining };
2154
2155        let dlq_sink = MockDlqSink {
2156            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
2157        };
2158
2159        let mut bus = Bus::new();
2160        bus.insert(Timeline::new());
2161
2162        let axon = Axon::<i32, i32, String>::start("RetrySucceed")
2163            .then(trans)
2164            .with_dlq_policy(DlqPolicy::RetryThenDlq {
2165                max_attempts: 5,
2166                backoff_ms: 1,
2167            })
2168            .with_dlq_sink(dlq_sink.clone());
2169        let outcome = axon.execute(10, &(), &mut bus).await;
2170
2171        // Should succeed (10 + 1 = 11)
2172        assert!(
2173            matches!(outcome, Outcome::Next(11)),
2174            "Expected Next(11), got {:?}",
2175            outcome
2176        );
2177
2178        // No dead letters since it recovered
2179        let letters = dlq_sink.letters.lock().await;
2180        assert!(
2181            letters.is_empty(),
2182            "Should have 0 dead letters, got {}",
2183            letters.len()
2184        );
2185
2186        // Timeline should contain NodeRetry events
2187        let timeline = bus.read::<Timeline>().unwrap();
2188        let retry_count = timeline
2189            .events
2190            .iter()
2191            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
2192            .count();
2193        assert_eq!(retry_count, 2, "Should have 2 retry events");
2194    }
2195
2196    #[tokio::test]
2197    async fn retry_then_dlq_exhausts_retries_and_sends_to_dlq() {
2198        // Always fails. Policy allows 3 attempts (1 initial + 2 retries).
2199        let mut bus = Bus::new();
2200        bus.insert(Timeline::new());
2201
2202        let dlq_sink = MockDlqSink {
2203            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
2204        };
2205
2206        let axon = Axon::<i32, i32, String>::start("RetryExhaust")
2207            .then(AlwaysFault)
2208            .with_dlq_policy(DlqPolicy::RetryThenDlq {
2209                max_attempts: 3,
2210                backoff_ms: 1,
2211            })
2212            .with_dlq_sink(dlq_sink.clone());
2213        let outcome = axon.execute(42, &(), &mut bus).await;
2214
2215        assert!(
2216            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
2217            "Expected Fault(boom), got {:?}",
2218            outcome
2219        );
2220
2221        // Should have exactly 1 dead letter
2222        let letters = dlq_sink.letters.lock().await;
2223        assert_eq!(letters.len(), 1, "Should have 1 dead letter");
2224
2225        // Timeline should have 2 retry events and 1 DlqExhausted event
2226        let timeline = bus.read::<Timeline>().unwrap();
2227        let retry_count = timeline
2228            .events
2229            .iter()
2230            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
2231            .count();
2232        let dlq_count = timeline
2233            .events
2234            .iter()
2235            .filter(|e| matches!(e, TimelineEvent::DlqExhausted { .. }))
2236            .count();
2237        assert_eq!(
2238            retry_count, 2,
2239            "Should have 2 retry events (attempts 2 and 3)"
2240        );
2241        assert_eq!(dlq_count, 1, "Should have 1 DlqExhausted event");
2242    }
2243
2244    #[tokio::test]
2245    async fn send_to_dlq_policy_sends_immediately_without_retry() {
2246        let mut bus = Bus::new();
2247        bus.insert(Timeline::new());
2248
2249        let dlq_sink = MockDlqSink {
2250            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
2251        };
2252
2253        let axon = Axon::<i32, i32, String>::start("SendDlq")
2254            .then(AlwaysFault)
2255            .with_dlq_policy(DlqPolicy::SendToDlq)
2256            .with_dlq_sink(dlq_sink.clone());
2257        let outcome = axon.execute(1, &(), &mut bus).await;
2258
2259        assert!(matches!(outcome, Outcome::Fault(_)));
2260
2261        // Should have exactly 1 dead letter (immediate, no retries)
2262        let letters = dlq_sink.letters.lock().await;
2263        assert_eq!(letters.len(), 1);
2264
2265        // No retry or DlqExhausted events
2266        let timeline = bus.read::<Timeline>().unwrap();
2267        let retry_count = timeline
2268            .events
2269            .iter()
2270            .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
2271            .count();
2272        assert_eq!(retry_count, 0);
2273    }
2274
2275    #[tokio::test]
2276    async fn drop_policy_does_not_send_to_dlq() {
2277        let mut bus = Bus::new();
2278
2279        let dlq_sink = MockDlqSink {
2280            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
2281        };
2282
2283        let axon = Axon::<i32, i32, String>::start("DropDlq")
2284            .then(AlwaysFault)
2285            .with_dlq_policy(DlqPolicy::Drop)
2286            .with_dlq_sink(dlq_sink.clone());
2287        let outcome = axon.execute(1, &(), &mut bus).await;
2288
2289        assert!(matches!(outcome, Outcome::Fault(_)));
2290
2291        // No dead letters
2292        let letters = dlq_sink.letters.lock().await;
2293        assert!(letters.is_empty());
2294    }
2295
2296    #[tokio::test]
2297    async fn dynamic_policy_hot_reload_changes_dlq_behavior() {
2298        use ranvier_core::policy::DynamicPolicy;
2299
2300        // Start with Drop policy (no DLQ)
2301        let (tx, dynamic) = DynamicPolicy::new(DlqPolicy::Drop);
2302        let dlq_sink = MockDlqSink {
2303            letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
2304        };
2305
2306        let axon = Axon::<i32, i32, String>::start("DynamicDlq")
2307            .then(AlwaysFault)
2308            .with_dynamic_dlq_policy(dynamic)
2309            .with_dlq_sink(dlq_sink.clone());
2310
2311        // First execution: Drop policy → no dead letters
2312        let mut bus = Bus::new();
2313        let outcome = axon.execute(1, &(), &mut bus).await;
2314        assert!(matches!(outcome, Outcome::Fault(_)));
2315        assert!(
2316            dlq_sink.letters.lock().await.is_empty(),
2317            "Drop policy should produce no DLQ entries"
2318        );
2319
2320        // Hot-reload: switch to SendToDlq
2321        tx.send(DlqPolicy::SendToDlq).unwrap();
2322
2323        // Second execution: SendToDlq policy → dead letter captured
2324        let mut bus2 = Bus::new();
2325        let outcome2 = axon.execute(2, &(), &mut bus2).await;
2326        assert!(matches!(outcome2, Outcome::Fault(_)));
2327        assert_eq!(
2328            dlq_sink.letters.lock().await.len(),
2329            1,
2330            "SendToDlq policy should produce 1 DLQ entry"
2331        );
2332    }
2333
2334    #[tokio::test]
2335    async fn dynamic_saga_policy_hot_reload() {
2336        use ranvier_core::policy::DynamicPolicy;
2337        use ranvier_core::saga::SagaPolicy;
2338
2339        // Start with Disabled saga
2340        let (tx, dynamic) = DynamicPolicy::new(SagaPolicy::Disabled);
2341
2342        let axon = Axon::<i32, i32, TestInfallible>::start("DynamicSaga")
2343            .then(AddOne)
2344            .with_dynamic_saga_policy(dynamic);
2345
2346        // First execution: Disabled → no SagaStack in bus
2347        let mut bus = Bus::new();
2348        let _outcome = axon.execute(1, &(), &mut bus).await;
2349        assert!(
2350            bus.read::<SagaStack>().is_none() || bus.read::<SagaStack>().unwrap().is_empty(),
2351            "SagaStack should be absent or empty when disabled"
2352        );
2353
2354        // Hot-reload: enable saga
2355        tx.send(SagaPolicy::Enabled).unwrap();
2356
2357        // Second execution: Enabled → SagaStack populated
2358        let mut bus2 = Bus::new();
2359        let _outcome2 = axon.execute(10, &(), &mut bus2).await;
2360        assert!(
2361            bus2.read::<SagaStack>().is_some(),
2362            "SagaStack should exist when saga is enabled"
2363        );
2364    }
2365
2366    // ── IAM Boundary Tests ──────────────────────────────────────
2367
2368    mod iam_tests {
2369        use super::*;
2370        use ranvier_core::iam::{IamError, IamIdentity, IamPolicy, IamToken, IamVerifier};
2371
2372        /// Mock IamVerifier that returns a fixed identity.
2373        #[derive(Clone)]
2374        struct MockVerifier {
2375            identity: IamIdentity,
2376            should_fail: bool,
2377        }
2378
2379        #[async_trait]
2380        impl IamVerifier for MockVerifier {
2381            async fn verify(&self, _token: &str) -> Result<IamIdentity, IamError> {
2382                if self.should_fail {
2383                    Err(IamError::InvalidToken("mock verification failure".into()))
2384                } else {
2385                    Ok(self.identity.clone())
2386                }
2387            }
2388        }
2389
2390        #[tokio::test]
2391        async fn iam_require_identity_passes_with_valid_token() {
2392            let verifier = MockVerifier {
2393                identity: IamIdentity::new("alice").with_role("user"),
2394                should_fail: false,
2395            };
2396
2397            let axon = Axon::<i32, i32, TestInfallible>::new("IamTest")
2398                .with_iam(IamPolicy::RequireIdentity, verifier)
2399                .then(AddOne);
2400
2401            let mut bus = Bus::new();
2402            bus.insert(IamToken("valid-token".to_string()));
2403            let outcome = axon.execute(10, &(), &mut bus).await;
2404
2405            assert!(matches!(outcome, Outcome::Next(11)));
2406            // Verify IamIdentity was injected into Bus
2407            let identity = bus
2408                .read::<IamIdentity>()
2409                .expect("IamIdentity should be in Bus");
2410            assert_eq!(identity.subject, "alice");
2411        }
2412
2413        #[tokio::test]
2414        async fn iam_require_identity_rejects_missing_token() {
2415            let verifier = MockVerifier {
2416                identity: IamIdentity::new("ignored"),
2417                should_fail: false,
2418            };
2419
2420            let axon = Axon::<i32, i32, TestInfallible>::new("IamNoToken")
2421                .with_iam(IamPolicy::RequireIdentity, verifier)
2422                .then(AddOne);
2423
2424            let mut bus = Bus::new();
2425            // No IamToken inserted
2426            let outcome = axon.execute(10, &(), &mut bus).await;
2427
2428            // Should emit missing_token event
2429            match &outcome {
2430                Outcome::Emit(label, _) => {
2431                    assert_eq!(label, "iam.missing_token");
2432                }
2433                other => panic!("Expected Emit(iam.missing_token), got {:?}", other),
2434            }
2435        }
2436
2437        #[tokio::test]
2438        async fn iam_rejects_failed_verification() {
2439            let verifier = MockVerifier {
2440                identity: IamIdentity::new("ignored"),
2441                should_fail: true,
2442            };
2443
2444            let axon = Axon::<i32, i32, TestInfallible>::new("IamBadToken")
2445                .with_iam(IamPolicy::RequireIdentity, verifier)
2446                .then(AddOne);
2447
2448            let mut bus = Bus::new();
2449            bus.insert(IamToken("bad-token".to_string()));
2450            let outcome = axon.execute(10, &(), &mut bus).await;
2451
2452            match &outcome {
2453                Outcome::Emit(label, _) => {
2454                    assert_eq!(label, "iam.verification_failed");
2455                }
2456                other => panic!("Expected Emit(iam.verification_failed), got {:?}", other),
2457            }
2458        }
2459
2460        #[tokio::test]
2461        async fn iam_require_role_passes_with_matching_role() {
2462            let verifier = MockVerifier {
2463                identity: IamIdentity::new("bob").with_role("admin").with_role("user"),
2464                should_fail: false,
2465            };
2466
2467            let axon = Axon::<i32, i32, TestInfallible>::new("IamRole")
2468                .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
2469                .then(AddOne);
2470
2471            let mut bus = Bus::new();
2472            bus.insert(IamToken("token".to_string()));
2473            let outcome = axon.execute(5, &(), &mut bus).await;
2474
2475            assert!(matches!(outcome, Outcome::Next(6)));
2476        }
2477
2478        #[tokio::test]
2479        async fn iam_require_role_denies_without_role() {
2480            let verifier = MockVerifier {
2481                identity: IamIdentity::new("carol").with_role("user"),
2482                should_fail: false,
2483            };
2484
2485            let axon = Axon::<i32, i32, TestInfallible>::new("IamRoleDeny")
2486                .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
2487                .then(AddOne);
2488
2489            let mut bus = Bus::new();
2490            bus.insert(IamToken("token".to_string()));
2491            let outcome = axon.execute(5, &(), &mut bus).await;
2492
2493            match &outcome {
2494                Outcome::Emit(label, _) => {
2495                    assert_eq!(label, "iam.policy_denied");
2496                }
2497                other => panic!("Expected Emit(iam.policy_denied), got {:?}", other),
2498            }
2499        }
2500
2501        #[tokio::test]
2502        async fn iam_policy_none_skips_verification() {
2503            let verifier = MockVerifier {
2504                identity: IamIdentity::new("ignored"),
2505                should_fail: true, // would fail if actually called
2506            };
2507
2508            let axon = Axon::<i32, i32, TestInfallible>::new("IamNone")
2509                .with_iam(IamPolicy::None, verifier)
2510                .then(AddOne);
2511
2512            let mut bus = Bus::new();
2513            // No token needed when policy is None
2514            let outcome = axon.execute(10, &(), &mut bus).await;
2515
2516            assert!(matches!(outcome, Outcome::Next(11)));
2517        }
2518    }
2519
2520    // ── Schema Propagation Tests (M201-RQ9, RQ12) ──────────────────
2521
2522    #[derive(Clone)]
2523    struct SchemaTransition;
2524
2525    #[async_trait]
2526    impl Transition<String, String> for SchemaTransition {
2527        type Error = String;
2528        type Resources = ();
2529
2530        fn input_schema(&self) -> Option<serde_json::Value> {
2531            Some(serde_json::json!({
2532                "type": "object",
2533                "required": ["name"],
2534                "properties": {
2535                    "name": { "type": "string" }
2536                }
2537            }))
2538        }
2539
2540        async fn run(
2541            &self,
2542            state: String,
2543            _resources: &Self::Resources,
2544            _bus: &mut Bus,
2545        ) -> Outcome<String, Self::Error> {
2546            Outcome::Next(state)
2547        }
2548    }
2549
2550    #[test]
2551    fn then_auto_populates_input_schema_from_transition() {
2552        let axon = Axon::<String, String, String>::new("SchemaTest").then(SchemaTransition);
2553
2554        // The last node (added by .then()) should have input_schema
2555        let last_node = axon.schematic.nodes.last().unwrap();
2556        assert!(last_node.input_schema.is_some());
2557        let schema = last_node.input_schema.as_ref().unwrap();
2558        assert_eq!(schema["type"], "object");
2559        assert_eq!(schema["required"][0], "name");
2560    }
2561
2562    #[test]
2563    fn then_leaves_input_schema_none_when_not_provided() {
2564        let axon = Axon::<i32, i32, TestInfallible>::new("NoSchema").then(AddOne);
2565
2566        let last_node = axon.schematic.nodes.last().unwrap();
2567        assert!(last_node.input_schema.is_none());
2568    }
2569
2570    #[test]
2571    fn with_input_schema_value_sets_on_last_node() {
2572        let schema = serde_json::json!({"type": "integer"});
2573        let axon = Axon::<i32, i32, TestInfallible>::new("ManualSchema")
2574            .then(AddOne)
2575            .with_input_schema_value(schema.clone());
2576
2577        let last_node = axon.schematic.nodes.last().unwrap();
2578        assert_eq!(last_node.input_schema.as_ref().unwrap(), &schema);
2579    }
2580
2581    #[test]
2582    fn with_output_schema_value_sets_on_last_node() {
2583        let schema = serde_json::json!({"type": "integer"});
2584        let axon = Axon::<i32, i32, TestInfallible>::new("OutputSchema")
2585            .then(AddOne)
2586            .with_output_schema_value(schema.clone());
2587
2588        let last_node = axon.schematic.nodes.last().unwrap();
2589        assert_eq!(last_node.output_schema.as_ref().unwrap(), &schema);
2590    }
2591
2592    #[test]
2593    fn schematic_export_includes_schema_fields() {
2594        let axon = Axon::<String, String, String>::new("ExportTest")
2595            .then(SchemaTransition)
2596            .with_output_schema_value(serde_json::json!({"type": "string"}));
2597
2598        let json = serde_json::to_value(&axon.schematic).unwrap();
2599        let nodes = json["nodes"].as_array().unwrap();
2600        // Find the SchemaTransition node (last one)
2601        let last = nodes.last().unwrap();
2602        assert!(last.get("input_schema").is_some());
2603        assert_eq!(last["input_schema"]["type"], "object");
2604        assert_eq!(last["output_schema"]["type"], "string");
2605    }
2606
2607    #[test]
2608    fn schematic_export_omits_schema_fields_when_none() {
2609        let axon = Axon::<i32, i32, TestInfallible>::new("NoSchemaExport").then(AddOne);
2610
2611        let json = serde_json::to_value(&axon.schematic).unwrap();
2612        let nodes = json["nodes"].as_array().unwrap();
2613        let last = nodes.last().unwrap();
2614        let obj = last.as_object().unwrap();
2615        assert!(!obj.contains_key("input_schema"));
2616        assert!(!obj.contains_key("output_schema"));
2617    }
2618
2619    #[test]
2620    fn schematic_json_roundtrip_preserves_schemas() {
2621        let axon = Axon::<String, String, String>::new("Roundtrip")
2622            .then(SchemaTransition)
2623            .with_output_schema_value(serde_json::json!({"type": "string"}));
2624
2625        let json_str = serde_json::to_string(&axon.schematic).unwrap();
2626        let deserialized: ranvier_core::schematic::Schematic =
2627            serde_json::from_str(&json_str).unwrap();
2628
2629        let last = deserialized.nodes.last().unwrap();
2630        assert!(last.input_schema.is_some());
2631        assert!(last.output_schema.is_some());
2632        assert_eq!(last.input_schema.as_ref().unwrap()["required"][0], "name");
2633        assert_eq!(last.output_schema.as_ref().unwrap()["type"], "string");
2634    }
2635
2636    // Test transitions for new unit tests
2637    #[derive(Clone)]
2638    struct MultiplyByTwo;
2639
2640    #[async_trait]
2641    impl Transition<i32, i32> for MultiplyByTwo {
2642        type Error = TestInfallible;
2643        type Resources = ();
2644
2645        async fn run(
2646            &self,
2647            state: i32,
2648            _resources: &Self::Resources,
2649            _bus: &mut Bus,
2650        ) -> Outcome<i32, Self::Error> {
2651            Outcome::Next(state * 2)
2652        }
2653    }
2654
2655    #[derive(Clone)]
2656    struct AddTen;
2657
2658    #[async_trait]
2659    impl Transition<i32, i32> for AddTen {
2660        type Error = TestInfallible;
2661        type Resources = ();
2662
2663        async fn run(
2664            &self,
2665            state: i32,
2666            _resources: &Self::Resources,
2667            _bus: &mut Bus,
2668        ) -> Outcome<i32, Self::Error> {
2669            Outcome::Next(state + 10)
2670        }
2671    }
2672
2673    #[derive(Clone)]
2674    struct AddOneString;
2675
2676    #[async_trait]
2677    impl Transition<i32, i32> for AddOneString {
2678        type Error = String;
2679        type Resources = ();
2680
2681        async fn run(
2682            &self,
2683            state: i32,
2684            _resources: &Self::Resources,
2685            _bus: &mut Bus,
2686        ) -> Outcome<i32, Self::Error> {
2687            Outcome::Next(state + 1)
2688        }
2689    }
2690
2691    #[derive(Clone)]
2692    struct AddTenString;
2693
2694    #[async_trait]
2695    impl Transition<i32, i32> for AddTenString {
2696        type Error = String;
2697        type Resources = ();
2698
2699        async fn run(
2700            &self,
2701            state: i32,
2702            _resources: &Self::Resources,
2703            _bus: &mut Bus,
2704        ) -> Outcome<i32, Self::Error> {
2705            Outcome::Next(state + 10)
2706        }
2707    }
2708
2709    #[tokio::test]
2710    async fn axon_single_step_chain_executes_and_returns_next() {
2711        let mut bus = Bus::new();
2712        let axon = Axon::<i32, i32, TestInfallible>::start("SingleStep").then(AddOne);
2713
2714        let outcome = axon.execute(5, &(), &mut bus).await;
2715        assert!(matches!(outcome, Outcome::Next(6)));
2716    }
2717
2718    #[tokio::test]
2719    async fn axon_three_step_chain_executes_in_order() {
2720        let mut bus = Bus::new();
2721        let axon = Axon::<i32, i32, TestInfallible>::start("ThreeStep")
2722            .then(AddOne)
2723            .then(MultiplyByTwo)
2724            .then(AddTen);
2725
2726        // Starting with 5: AddOne -> 6, MultiplyByTwo -> 12, AddTen -> 22
2727        let outcome = axon.execute(5, &(), &mut bus).await;
2728        assert!(matches!(outcome, Outcome::Next(22)));
2729    }
2730
2731    #[tokio::test]
2732    async fn axon_with_fault_in_middle_step_propagates_error() {
2733        let mut bus = Bus::new();
2734
2735        // Create a 3-step chain where the middle step faults
2736        // Step 1: AddOneString (5 -> 6)
2737        // Step 2: AlwaysFault (should fault here)
2738        // Step 3: AddTenString (never reached)
2739        let axon = Axon::<i32, i32, String>::start("FaultInMiddle")
2740            .then(AddOneString)
2741            .then(AlwaysFault)
2742            .then(AddTenString);
2743
2744        let outcome = axon.execute(5, &(), &mut bus).await;
2745        assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
2746    }
2747
2748    #[tokio::test]
2749    async fn fault_injects_transition_error_context_into_bus() {
2750        let mut bus = Bus::new();
2751
2752        // 3-step chain: AddOneString → AlwaysFault → AddTenString
2753        let axon = Axon::<i32, i32, String>::start("my-pipeline")
2754            .then(AddOneString)
2755            .then(AlwaysFault)
2756            .then(AddTenString);
2757
2758        let outcome = axon.execute(5, &(), &mut bus).await;
2759        assert!(matches!(outcome, Outcome::Fault(_)));
2760
2761        let ctx = bus
2762            .read::<ranvier_core::error::TransitionErrorContext>()
2763            .expect("TransitionErrorContext should be in Bus after fault");
2764        assert_eq!(ctx.pipeline_name, "my-pipeline");
2765        assert_eq!(ctx.transition_name, "AlwaysFault");
2766        assert_eq!(ctx.step_index, 2); // 0=ingress, 1=AddOneString, 2=AlwaysFault
2767    }
2768
2769    #[test]
2770    fn axon_schematic_has_correct_node_count_after_chaining() {
2771        let axon = Axon::<i32, i32, TestInfallible>::start("NodeCount")
2772            .then(AddOne)
2773            .then(MultiplyByTwo)
2774            .then(AddTen);
2775
2776        // Should have 4 nodes: ingress + 3 transitions
2777        assert_eq!(axon.schematic.nodes.len(), 4);
2778        assert_eq!(axon.schematic.name, "NodeCount");
2779    }
2780
2781    #[tokio::test]
2782    async fn axon_execution_records_timeline_events() {
2783        let mut bus = Bus::new();
2784        bus.insert(Timeline::new());
2785
2786        let axon = Axon::<i32, i32, TestInfallible>::start("TimelineTest")
2787            .then(AddOne)
2788            .then(MultiplyByTwo);
2789
2790        let outcome = axon.execute(3, &(), &mut bus).await;
2791        assert!(matches!(outcome, Outcome::Next(8))); // (3 + 1) * 2 = 8
2792
2793        let timeline = bus.read::<Timeline>().unwrap();
2794
2795        // Should have NodeEnter and NodeExit events
2796        let enter_count = timeline
2797            .events
2798            .iter()
2799            .filter(|e| matches!(e, TimelineEvent::NodeEnter { .. }))
2800            .count();
2801        let exit_count = timeline
2802            .events
2803            .iter()
2804            .filter(|e| matches!(e, TimelineEvent::NodeExit { .. }))
2805            .count();
2806
2807        // Expect at least 1 enter and 1 exit (ingress node)
2808        assert!(enter_count >= 1, "Should have at least 1 NodeEnter event");
2809        assert!(exit_count >= 1, "Should have at least 1 NodeExit event");
2810    }
2811
2812    // ── Parallel Step Tests (M231) ───────────────────────────────
2813
2814    #[tokio::test]
2815    async fn parallel_all_succeed_returns_first_next() {
2816        use super::ParallelStrategy;
2817
2818        let mut bus = Bus::new();
2819        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelAllSucceed")
2820            .parallel(
2821                vec![
2822                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
2823                    Arc::new(MultiplyByTwo),
2824                ],
2825                ParallelStrategy::AllMustSucceed,
2826            );
2827
2828        // Input 5: AddOne -> 6, MultiplyByTwo -> 10.
2829        // AllMustSucceed returns the first Next (AddOne = 6).
2830        let outcome = axon.execute(5, &(), &mut bus).await;
2831        assert!(matches!(outcome, Outcome::Next(6)));
2832    }
2833
2834    #[tokio::test]
2835    async fn parallel_all_must_succeed_returns_fault_when_any_fails() {
2836        use super::ParallelStrategy;
2837
2838        let mut bus = Bus::new();
2839        let axon = Axon::<i32, i32, String>::start("ParallelAllFault")
2840            .parallel(
2841                vec![
2842                    Arc::new(AddOneString) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
2843                    Arc::new(AlwaysFault),
2844                ],
2845                ParallelStrategy::AllMustSucceed,
2846            );
2847
2848        let outcome = axon.execute(5, &(), &mut bus).await;
2849        assert!(
2850            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
2851            "Expected Fault(boom), got {:?}",
2852            outcome
2853        );
2854    }
2855
2856    #[tokio::test]
2857    async fn parallel_any_can_fail_returns_success_despite_fault() {
2858        use super::ParallelStrategy;
2859
2860        let mut bus = Bus::new();
2861        let axon = Axon::<i32, i32, String>::start("ParallelAnyCanFail")
2862            .parallel(
2863                vec![
2864                    Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
2865                    Arc::new(AddOneString),
2866                ],
2867                ParallelStrategy::AnyCanFail,
2868            );
2869
2870        // AlwaysFault faults, but AddOneString succeeds (5 + 1 = 6).
2871        let outcome = axon.execute(5, &(), &mut bus).await;
2872        assert!(
2873            matches!(outcome, Outcome::Next(6)),
2874            "Expected Next(6), got {:?}",
2875            outcome
2876        );
2877    }
2878
2879    #[tokio::test]
2880    async fn parallel_any_can_fail_all_fault_returns_first_fault() {
2881        use super::ParallelStrategy;
2882
2883        #[derive(Clone)]
2884        struct AlwaysFault2;
2885        #[async_trait]
2886        impl Transition<i32, i32> for AlwaysFault2 {
2887            type Error = String;
2888            type Resources = ();
2889            async fn run(&self, _state: i32, _resources: &(), _bus: &mut Bus) -> Outcome<i32, String> {
2890                Outcome::Fault("boom2".to_string())
2891            }
2892        }
2893
2894        let mut bus = Bus::new();
2895        let axon = Axon::<i32, i32, String>::start("ParallelAllFault2")
2896            .parallel(
2897                vec![
2898                    Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
2899                    Arc::new(AlwaysFault2),
2900                ],
2901                ParallelStrategy::AnyCanFail,
2902            );
2903
2904        let outcome = axon.execute(5, &(), &mut bus).await;
2905        // Should return the first fault
2906        assert!(
2907            matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
2908            "Expected Fault(boom), got {:?}",
2909            outcome
2910        );
2911    }
2912
2913    #[test]
2914    fn parallel_schematic_has_fanout_fanin_nodes() {
2915        use super::ParallelStrategy;
2916        use ranvier_core::schematic::{EdgeType, NodeKind};
2917
2918        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelSchematic")
2919            .parallel(
2920                vec![
2921                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
2922                    Arc::new(MultiplyByTwo),
2923                    Arc::new(AddTen),
2924                ],
2925                ParallelStrategy::AllMustSucceed,
2926            );
2927
2928        // Should have: Ingress + FanOut + 3 branch Atoms + FanIn = 6 nodes
2929        assert_eq!(axon.schematic.nodes.len(), 6);
2930        assert!(matches!(axon.schematic.nodes[1].kind, NodeKind::FanOut));
2931        assert!(matches!(axon.schematic.nodes[2].kind, NodeKind::Atom));
2932        assert!(matches!(axon.schematic.nodes[3].kind, NodeKind::Atom));
2933        assert!(matches!(axon.schematic.nodes[4].kind, NodeKind::Atom));
2934        assert!(matches!(axon.schematic.nodes[5].kind, NodeKind::FanIn));
2935
2936        // Check FanOut description
2937        assert!(axon.schematic.nodes[1]
2938            .description
2939            .as_ref()
2940            .unwrap()
2941            .contains("3 branches"));
2942
2943        // Check parallel edges from FanOut to branches
2944        let parallel_edges: Vec<_> = axon
2945            .schematic
2946            .edges
2947            .iter()
2948            .filter(|e| matches!(e.kind, EdgeType::Parallel))
2949            .collect();
2950        // 3 from FanOut->branches + 3 from branches->FanIn = 6
2951        assert_eq!(parallel_edges.len(), 6);
2952    }
2953
2954    #[tokio::test]
2955    async fn parallel_then_chain_composes_correctly() {
2956        use super::ParallelStrategy;
2957
2958        let mut bus = Bus::new();
2959        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelThenChain")
2960            .then(AddOne)
2961            .parallel(
2962                vec![
2963                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
2964                    Arc::new(MultiplyByTwo),
2965                ],
2966                ParallelStrategy::AllMustSucceed,
2967            )
2968            .then(AddTen);
2969
2970        // 5 -> AddOne -> 6 -> Parallel(AddOne=7, MultiplyByTwo=12) -> first=7 -> AddTen -> 17
2971        let outcome = axon.execute(5, &(), &mut bus).await;
2972        assert!(
2973            matches!(outcome, Outcome::Next(17)),
2974            "Expected Next(17), got {:?}",
2975            outcome
2976        );
2977    }
2978
2979    #[tokio::test]
2980    async fn parallel_records_timeline_events() {
2981        use super::ParallelStrategy;
2982        use ranvier_core::timeline::TimelineEvent;
2983
2984        let mut bus = Bus::new();
2985        bus.insert(Timeline::new());
2986
2987        let axon = Axon::<i32, i32, TestInfallible>::start("ParallelTimeline")
2988            .parallel(
2989                vec![
2990                    Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
2991                    Arc::new(MultiplyByTwo),
2992                ],
2993                ParallelStrategy::AllMustSucceed,
2994            );
2995
2996        let outcome = axon.execute(3, &(), &mut bus).await;
2997        assert!(matches!(outcome, Outcome::Next(4)));
2998
2999        let timeline = bus.read::<Timeline>().unwrap();
3000
3001        // Check for FanOut enter/exit and FanIn enter/exit
3002        let fanout_enters = timeline
3003            .events
3004            .iter()
3005            .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanOut"))
3006            .count();
3007        let fanin_enters = timeline
3008            .events
3009            .iter()
3010            .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanIn"))
3011            .count();
3012
3013        assert_eq!(fanout_enters, 1, "Should have 1 FanOut enter");
3014        assert_eq!(fanin_enters, 1, "Should have 1 FanIn enter");
3015    }
3016
3017    // ── Axon::simple() convenience constructor ───────────────────────────────
3018
3019    #[derive(Clone)]
3020    struct Greet;
3021
3022    #[async_trait]
3023    impl Transition<(), String> for Greet {
3024        type Error = String;
3025        type Resources = ();
3026
3027        async fn run(
3028            &self,
3029            _state: (),
3030            _resources: &Self::Resources,
3031            _bus: &mut Bus,
3032        ) -> Outcome<String, Self::Error> {
3033            Outcome::Next("Hello from simple!".to_string())
3034        }
3035    }
3036
3037    #[tokio::test]
3038    async fn axon_simple_creates_pipeline() {
3039        let axon = Axon::simple::<String>("SimpleTest").then(Greet);
3040
3041        let mut bus = Bus::new();
3042        let result = axon.execute((), &(), &mut bus).await;
3043
3044        match result {
3045            Outcome::Next(msg) => assert_eq!(msg, "Hello from simple!"),
3046            other => panic!("Expected Outcome::Next, got {:?}", other),
3047        }
3048    }
3049
3050    #[tokio::test]
3051    async fn axon_simple_equivalent_to_explicit() {
3052        // Axon::simple::<E>("label") should behave identically to Axon::<(), (), E>::new("label")
3053        let simple = Axon::simple::<String>("Equiv").then(Greet);
3054        let explicit = Axon::<(), (), String>::new("Equiv").then(Greet);
3055
3056        let mut bus1 = Bus::new();
3057        let mut bus2 = Bus::new();
3058
3059        let r1 = simple.execute((), &(), &mut bus1).await;
3060        let r2 = explicit.execute((), &(), &mut bus2).await;
3061
3062        match (r1, r2) {
3063            (Outcome::Next(a), Outcome::Next(b)) => assert_eq!(a, b),
3064            _ => panic!("Both should produce Outcome::Next"),
3065        }
3066    }
3067
3068    #[tokio::test]
3069    async fn then_fn_closure_transition() {
3070        let axon = Axon::simple::<String>("ClosureTest")
3071            .then_fn("to_greeting", |_input: (), _bus: &mut Bus| {
3072                Outcome::next("hello from closure".to_string())
3073            });
3074
3075        let mut bus = Bus::new();
3076        let result = axon.execute((), &(), &mut bus).await;
3077
3078        match result {
3079            Outcome::Next(msg) => assert_eq!(msg, "hello from closure"),
3080            other => panic!("Expected Outcome::Next, got {:?}", other),
3081        }
3082    }
3083
3084    #[tokio::test]
3085    async fn then_fn_reads_bus() {
3086        let axon = Axon::simple::<String>("BusReadClosure")
3087            .then_fn("check_score", |_input: (), bus: &mut Bus| {
3088                let score = bus.read::<u32>().copied().unwrap_or(0);
3089                if score > 75 {
3090                    Outcome::next("REJECTED".to_string())
3091                } else {
3092                    Outcome::next("APPROVED".to_string())
3093                }
3094            });
3095
3096        let mut bus = Bus::new();
3097        bus.insert(80u32);
3098        let result = axon.execute((), &(), &mut bus).await;
3099        match result {
3100            Outcome::Next(msg) => assert_eq!(msg, "REJECTED"),
3101            other => panic!("Expected REJECTED, got {:?}", other),
3102        }
3103    }
3104
3105    #[tokio::test]
3106    async fn then_fn_mixed_with_transition() {
3107        // Closure and macro Transition in the same chain
3108        let axon = Axon::simple::<String>("MixedPipeline")
3109            .then(Greet)
3110            .then_fn("uppercase", |input: String, _bus: &mut Bus| {
3111                Outcome::next(input.to_uppercase())
3112            });
3113
3114        let mut bus = Bus::new();
3115        let result = axon.execute((), &(), &mut bus).await;
3116        match result {
3117            Outcome::Next(msg) => assert_eq!(msg, "HELLO FROM SIMPLE!"),
3118            other => panic!("Expected uppercase greeting, got {:?}", other),
3119        }
3120    }
3121
3122    #[tokio::test]
3123    async fn then_fn_schematic_label() {
3124        let axon = Axon::simple::<String>("SchematicTest")
3125            .then_fn("my_custom_label", |_: (), _: &mut Bus| {
3126                Outcome::next("ok".to_string())
3127            });
3128
3129        // Node 0 is the identity start node, node 1 is our closure
3130        assert_eq!(axon.schematic.nodes.len(), 2);
3131        assert_eq!(axon.schematic.nodes[1].label, "my_custom_label");
3132    }
3133}