1use crate::persistence::{
15 CompensationAutoTrigger, CompensationContext, CompensationHandle,
16 CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17 PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use async_trait::async_trait;
20use ranvier_audit::{AuditEvent, AuditSink};
21use ranvier_core::bus::Bus;
22use ranvier_core::cluster::DistributedLock;
23use ranvier_core::event::{DlqPolicy, DlqSink};
24use ranvier_core::outcome::Outcome;
25use ranvier_core::policy::DynamicPolicy;
26use ranvier_core::saga::{SagaPolicy, SagaStack};
27use ranvier_core::schematic::{
28 BusCapabilitySchema, Schematic,
29};
30use ranvier_core::timeline::{Timeline, TimelineEvent};
31use ranvier_core::transition::Transition;
32
33use serde::{Serialize, de::DeserializeOwned};
34use serde_json::Value;
35use std::any::type_name;
36use std::ffi::OsString;
37use std::fs;
38use std::future::Future;
39
40use std::path::{Path, PathBuf};
41use std::pin::Pin;
42use std::sync::{Arc, Mutex, OnceLock};
43use std::time::{SystemTime, UNIX_EPOCH};
44use tracing::Instrument;
45
46#[derive(Clone)]
48pub enum ExecutionMode {
49 Local,
51 Singleton {
53 lock_key: String,
54 ttl_ms: u64,
55 lock_provider: Arc<dyn DistributedLock>,
56 },
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum ParallelStrategy {
64 AllMustSucceed,
66 AnyCanFail,
69}
70
71pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
73
74pub type Executor<In, Out, E, Res> =
78 Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
79
80#[derive(Debug, Clone)]
82pub struct ManualJump {
83 pub target_node: String,
84 pub payload_override: Option<serde_json::Value>,
85}
86
87#[derive(Debug, Clone, Copy)]
89struct StartStep(u64);
90
91#[derive(Debug, Clone)]
93struct ResumptionState {
94 payload: Option<serde_json::Value>,
95}
96
97fn type_name_of<T: ?Sized>() -> String {
99 let full = type_name::<T>();
100 full.split("::").last().unwrap_or(full).to_string()
101}
102
103pub struct Axon<In, Out, E, Res = ()> {
123 pub schematic: Schematic,
125 pub(crate) executor: Executor<In, Out, E, Res>,
127 pub execution_mode: ExecutionMode,
129 pub persistence_store: Option<Arc<dyn crate::persistence::PersistenceStore>>,
131 pub audit_sink: Option<Arc<dyn AuditSink>>,
133 pub dlq_sink: Option<Arc<dyn DlqSink>>,
135 pub dlq_policy: DlqPolicy,
137 pub dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>,
139 pub saga_policy: SagaPolicy,
141 pub dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>,
143 pub saga_compensation_registry:
145 Arc<std::sync::RwLock<ranvier_core::saga::SagaCompensationRegistry<E, Res>>>,
146 pub iam_handle: Option<ranvier_core::iam::IamHandle>,
148}
149
150#[derive(Debug, Clone)]
152pub struct SchematicExportRequest {
153 pub output: Option<PathBuf>,
155}
156
157impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
158 fn clone(&self) -> Self {
159 Self {
160 schematic: self.schematic.clone(),
161 executor: self.executor.clone(),
162 execution_mode: self.execution_mode.clone(),
163 persistence_store: self.persistence_store.clone(),
164 audit_sink: self.audit_sink.clone(),
165 dlq_sink: self.dlq_sink.clone(),
166 dlq_policy: self.dlq_policy.clone(),
167 dynamic_dlq_policy: self.dynamic_dlq_policy.clone(),
168 saga_policy: self.saga_policy.clone(),
169 dynamic_saga_policy: self.dynamic_saga_policy.clone(),
170 saga_compensation_registry: self.saga_compensation_registry.clone(),
171 iam_handle: self.iam_handle.clone(),
172 }
173 }
174}
175
176mod builder;
177mod executor;
178mod parallel;
179
180#[async_trait]
181impl<In, Out, E, Res> ranvier_inspector::StateInspector for Axon<In, Out, E, Res>
182where
183 In: Send + Sync + Serialize + DeserializeOwned + 'static,
184 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
185 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
186 Res: ranvier_core::transition::ResourceRequirement,
187{
188 async fn get_state(&self, trace_id: &str) -> Option<serde_json::Value> {
189 let store = self.persistence_store.as_ref()?;
190 let trace = store.load(trace_id).await.ok().flatten()?;
191 Some(serde_json::to_value(trace).unwrap_or(serde_json::Value::Null))
192 }
193
194 async fn force_resume(
195 &self,
196 trace_id: &str,
197 target_node: &str,
198 payload_override: Option<Value>,
199 ) -> Result<(), String> {
200 let store = self
201 .persistence_store
202 .as_ref()
203 .ok_or("No persistence store attached to Axon")?;
204
205 let intervention = crate::persistence::Intervention {
206 target_node: target_node.to_string(),
207 payload_override,
208 timestamp_ms: now_ms(),
209 };
210
211 store
212 .save_intervention(trace_id, intervention)
213 .await
214 .map_err(|e| format!("Failed to save intervention: {}", e))?;
215
216 if let Some(sink) = self.audit_sink.as_ref() {
217 let event = AuditEvent::new(
218 uuid::Uuid::new_v4().to_string(),
219 "Inspector".to_string(),
220 "ForceResume".to_string(),
221 trace_id.to_string(),
222 )
223 .with_metadata("target_node", target_node);
224
225 let _ = sink.append(&event).await;
226 }
227
228 tracing::info!(trace_id = %trace_id, target_node = %target_node, "Force resume requested via Inspector");
229 Ok(())
230 }
231}
232
233fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
234 let args: Vec<OsString> = std::env::args_os().skip(1).collect();
235 let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
236 let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
237
238 let mut i = 0;
239 while i < args.len() {
240 let arg = args[i].to_string_lossy();
241
242 if arg == "--schematic" {
243 enabled = true;
244 i += 1;
245 continue;
246 }
247
248 if arg == "--schematic-output" || arg == "--output" {
249 if let Some(next) = args.get(i + 1) {
250 output = Some(PathBuf::from(next));
251 i += 2;
252 continue;
253 }
254 } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
255 output = Some(PathBuf::from(value));
256 } else if let Some(value) = arg.strip_prefix("--output=") {
257 output = Some(PathBuf::from(value));
258 }
259
260 i += 1;
261 }
262
263 if enabled {
264 Some(SchematicExportRequest { output })
265 } else {
266 None
267 }
268}
269
270fn env_flag_is_true(key: &str) -> bool {
271 match std::env::var(key) {
272 Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
273 Err(_) => false,
274 }
275}
276
277fn inspector_enabled_from_env() -> bool {
278 let raw = std::env::var("RANVIER_INSPECTOR").ok();
279 inspector_enabled_from_value(raw.as_deref())
280}
281
282fn inspector_enabled_from_value(value: Option<&str>) -> bool {
283 match value {
284 Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
285 None => true,
286 }
287}
288
289fn inspector_dev_mode_from_env() -> bool {
290 let raw = std::env::var("RANVIER_MODE").ok();
291 inspector_dev_mode_from_value(raw.as_deref())
292}
293
294fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
295 !matches!(
296 value.map(|v| v.to_ascii_lowercase()),
297 Some(mode) if mode == "prod" || mode == "production"
298 )
299}
300
301fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
302 let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
303 Ok(v) if !v.trim().is_empty() => v,
304 _ => return,
305 };
306
307 let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
308 let policy = timeline_adaptive_policy();
309 let forced = should_force_export(outcome, &policy);
310 let should_export = sampled || forced;
311 if !should_export {
312 record_sampling_stats(false, sampled, forced, "none", &policy);
313 return;
314 }
315
316 let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
317 timeline.sort();
318
319 let mode = std::env::var("RANVIER_TIMELINE_MODE")
320 .unwrap_or_else(|_| "overwrite".to_string())
321 .to_ascii_lowercase();
322
323 if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
324 tracing::warn!(
325 "Failed to persist timeline file {} (mode={}): {}",
326 path,
327 mode,
328 err
329 );
330 record_sampling_stats(false, sampled, forced, &mode, &policy);
331 } else {
332 record_sampling_stats(true, sampled, forced, &mode, &policy);
333 }
334}
335
336fn extract_panic_message(payload: &Box<dyn std::any::Any + Send>) -> String {
337 if let Some(s) = payload.downcast_ref::<&str>() {
338 (*s).to_string()
339 } else if let Some(s) = payload.downcast_ref::<String>() {
340 s.clone()
341 } else {
342 "unknown panic".to_string()
343 }
344}
345
346fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
347 match outcome {
348 Outcome::Next(_) => "Next".to_string(),
349 Outcome::Branch(id, _) => format!("Branch:{}", id),
350 Outcome::Jump(id, _) => format!("Jump:{}", id),
351 Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
352 Outcome::Fault(_) => "Fault".to_string(),
353 }
354}
355
356fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
357 match outcome {
358 Outcome::Next(_) => "Next",
359 Outcome::Branch(_, _) => "Branch",
360 Outcome::Jump(_, _) => "Jump",
361 Outcome::Emit(_, _) => "Emit",
362 Outcome::Fault(_) => "Fault",
363 }
364}
365
366fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
367 match outcome {
368 Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
369 Outcome::Jump(node_id, _) => Some(node_id.to_string()),
370 Outcome::Emit(event_type, _) => Some(event_type.clone()),
371 Outcome::Next(_) | Outcome::Fault(_) => None,
372 }
373}
374
375fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
376 match outcome {
377 Outcome::Fault(_) => CompletionState::Fault,
378 _ => CompletionState::Success,
379 }
380}
381
382fn persistence_trace_id(bus: &Bus) -> String {
383 if let Some(explicit) = bus.read::<PersistenceTraceId>() {
384 explicit.0.clone()
385 } else {
386 format!("{}:{}", bus.id, now_ms())
387 }
388}
389
390fn persistence_auto_complete(bus: &Bus) -> bool {
391 bus.read::<PersistenceAutoComplete>()
392 .map(|v| v.0)
393 .unwrap_or(true)
394}
395
396fn compensation_auto_trigger(bus: &Bus) -> bool {
397 bus.read::<CompensationAutoTrigger>()
398 .map(|v| v.0)
399 .unwrap_or(true)
400}
401
402fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
403 bus.read::<CompensationRetryPolicy>()
404 .copied()
405 .unwrap_or_default()
406}
407
408fn unwrap_outcome_payload(payload: Option<&serde_json::Value>) -> Option<serde_json::Value> {
414 payload.map(|p| {
415 p.get("Next")
416 .or_else(|| p.get("Branch"))
417 .or_else(|| p.get("Jump"))
418 .cloned()
419 .unwrap_or_else(|| p.clone())
420 })
421}
422
423async fn load_persistence_version(
424 handle: &PersistenceHandle,
425 trace_id: &str,
426) -> (
427 u64,
428 Option<String>,
429 Option<crate::persistence::Intervention>,
430 Option<String>,
431 Option<serde_json::Value>,
432) {
433 let store = handle.store();
434 match store.load(trace_id).await {
435 Ok(Some(trace)) => {
436 let (next_step, last_node_id, last_payload) =
437 if let Some(resume_from_step) = trace.resumed_from_step {
438 let anchor_event = trace
439 .events
440 .iter()
441 .rev()
442 .find(|event| {
443 event.step <= resume_from_step
444 && event.outcome_kind == "Next"
445 && event.payload.is_some()
446 })
447 .or_else(|| {
448 trace.events.iter().rev().find(|event| {
449 event.step <= resume_from_step
450 && event.outcome_kind != "Fault"
451 && event.payload.is_some()
452 })
453 })
454 .or_else(|| {
455 trace.events.iter().rev().find(|event| {
456 event.step <= resume_from_step && event.payload.is_some()
457 })
458 })
459 .or_else(|| trace.events.last());
460
461 (
462 resume_from_step.saturating_add(1),
463 anchor_event.and_then(|event| event.node_id.clone()),
464 anchor_event.and_then(|event| {
465 unwrap_outcome_payload(event.payload.as_ref())
466 }),
467 )
468 } else {
469 let last_event = trace.events.last();
470 (
471 last_event
472 .map(|event| event.step.saturating_add(1))
473 .unwrap_or(0),
474 last_event.and_then(|event| event.node_id.clone()),
475 last_event.and_then(|event| {
476 unwrap_outcome_payload(event.payload.as_ref())
477 }),
478 )
479 };
480
481 (
482 next_step,
483 Some(trace.schematic_version),
484 trace.interventions.last().cloned(),
485 last_node_id,
486 last_payload,
487 )
488 }
489 Ok(None) => (0, None, None, None, None),
490 Err(err) => {
491 tracing::warn!(
492 trace_id = %trace_id,
493 error = %err,
494 "Failed to load persistence trace; falling back to step=0"
495 );
496 (0, None, None, None, None)
497 }
498 }
499}
500
501#[allow(clippy::too_many_arguments)]
502async fn run_this_step<In, Out, E, Res>(
503 trans: &(impl Transition<In, Out, Resources = Res, Error = E> + Clone + 'static),
504 state: In,
505 res: &Res,
506 bus: &mut Bus,
507 node_id: &str,
508 node_label: &str,
509 bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
510 step_idx: u64,
511) -> Outcome<Out, E>
512where
513 In: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
514 Out: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
515 E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
516 Res: ranvier_core::transition::ResourceRequirement,
517{
518 let label = trans.label();
519 let res_type = std::any::type_name::<Res>()
520 .split("::")
521 .last()
522 .unwrap_or("unknown");
523
524 let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
526 debug.should_pause(node_id)
527 } else {
528 false
529 };
530
531 if should_pause {
532 let trace_id = persistence_trace_id(bus);
533 tracing::event!(
534 target: "ranvier.debugger",
535 tracing::Level::INFO,
536 trace_id = %trace_id,
537 node_id = %node_id,
538 "Node paused"
539 );
540
541 if let Some(timeline) = bus.read_mut::<Timeline>() {
542 timeline.push(TimelineEvent::NodePaused {
543 node_id: node_id.to_string(),
544 timestamp: now_ms(),
545 });
546 }
547 if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
548 debug.wait().await;
549 }
550 }
551
552 let enter_ts = now_ms();
553 if let Some(timeline) = bus.read_mut::<Timeline>() {
554 timeline.push(TimelineEvent::NodeEnter {
555 node_id: node_id.to_string(),
556 node_label: node_label.to_string(),
557 timestamp: enter_ts,
558 });
559 }
560
561 let dlq_retry_config = bus.read::<DlqPolicy>().and_then(|p| {
563 if let DlqPolicy::RetryThenDlq {
564 max_attempts,
565 backoff_ms,
566 } = p
567 {
568 Some((*max_attempts, *backoff_ms))
569 } else {
570 None
571 }
572 });
573 let retry_state_snapshot = if dlq_retry_config.is_some() {
574 serde_json::to_value(&state).ok()
575 } else {
576 None
577 };
578
579 let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
581 Some(serde_json::to_vec(&state).unwrap_or_default())
582 } else {
583 None
584 };
585
586 let node_span = tracing::info_span!(
587 "Node",
588 ranvier.node = %label,
589 ranvier.resource_type = %res_type,
590 ranvier.outcome_kind = tracing::field::Empty,
591 ranvier.outcome_target = tracing::field::Empty
592 );
593 let started = std::time::Instant::now();
594 bus.set_access_policy(label.clone(), bus_policy.clone());
595 let result = trans
596 .run(state, res, bus)
597 .instrument(node_span.clone())
598 .await;
599 bus.clear_access_policy();
600
601 let result = if let Outcome::Fault(_) = &result {
604 if let (Some((max_attempts, backoff_ms)), Some(snapshot)) =
605 (dlq_retry_config, &retry_state_snapshot)
606 {
607 let mut final_result = result;
608 for attempt in 2..=max_attempts {
610 let delay = backoff_ms.saturating_mul(2u64.saturating_pow(attempt - 2));
611
612 tracing::info!(
613 ranvier.node = %label,
614 attempt = attempt,
615 max_attempts = max_attempts,
616 backoff_ms = delay,
617 "Retrying faulted node"
618 );
619
620 if let Some(timeline) = bus.read_mut::<Timeline>() {
621 timeline.push(TimelineEvent::NodeRetry {
622 node_id: node_id.to_string(),
623 attempt,
624 max_attempts,
625 backoff_ms: delay,
626 timestamp: now_ms(),
627 });
628 }
629
630 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
631
632 if let Ok(retry_state) = serde_json::from_value::<In>(snapshot.clone()) {
633 bus.set_access_policy(label.clone(), bus_policy.clone());
634 let retry_result = trans
635 .run(retry_state, res, bus)
636 .instrument(tracing::info_span!(
637 "NodeRetry",
638 ranvier.node = %label,
639 attempt = attempt
640 ))
641 .await;
642 bus.clear_access_policy();
643
644 match &retry_result {
645 Outcome::Fault(_) => {
646 final_result = retry_result;
647 }
648 _ => {
649 final_result = retry_result;
650 break;
651 }
652 }
653 }
654 }
655 final_result
656 } else {
657 result
658 }
659 } else {
660 result
661 };
662
663 node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
664 if let Some(target) = outcome_target(&result) {
665 node_span.record("ranvier.outcome_target", tracing::field::display(&target));
666 }
667
668 if let Outcome::Fault(ref err) = result {
670 let pipeline_name = bus
671 .read::<ranvier_core::schematic::Schematic>()
672 .map(|s| s.name.clone())
673 .unwrap_or_default();
674 let ctx = ranvier_core::error::TransitionErrorContext {
675 pipeline_name: pipeline_name.clone(),
676 transition_name: label.clone(),
677 step_index: step_idx as usize,
678 };
679 tracing::error!(
680 pipeline = %pipeline_name,
681 transition = %label,
682 step = step_idx,
683 error = ?err,
684 "Transition fault"
685 );
686 bus.insert(ctx);
687 }
688
689 let duration_ms = started.elapsed().as_millis() as u64;
690 let exit_ts = now_ms();
691
692 if let Some(timeline) = bus.read_mut::<Timeline>() {
693 timeline.push(TimelineEvent::NodeExit {
694 node_id: node_id.to_string(),
695 outcome_type: outcome_type_name(&result),
696 duration_ms,
697 timestamp: exit_ts,
698 });
699
700 if let Outcome::Branch(branch_id, _) = &result {
701 timeline.push(TimelineEvent::Branchtaken {
702 branch_id: branch_id.clone(),
703 timestamp: exit_ts,
704 });
705 }
706 }
707
708 if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot)
710 && let Some(stack) = bus.read_mut::<SagaStack>()
711 {
712 stack.push(node_id.to_string(), label.clone(), snapshot);
713 }
714
715 if let Some(handle) = bus.read::<PersistenceHandle>() {
716 let trace_id = persistence_trace_id(bus);
717 let circuit = bus
718 .read::<ranvier_core::schematic::Schematic>()
719 .map(|s| s.name.clone())
720 .unwrap_or_default();
721 let version = bus
722 .read::<ranvier_core::schematic::Schematic>()
723 .map(|s| s.schema_version.clone())
724 .unwrap_or_default();
725
726 persist_execution_event(
727 handle,
728 &trace_id,
729 &circuit,
730 &version,
731 step_idx,
732 Some(node_id.to_string()),
733 outcome_kind_name(&result),
734 Some(result.to_json_value()),
735 )
736 .await;
737 }
738
739 if let Outcome::Fault(f) = &result {
742 let dlq_action = {
744 let policy = bus.read::<DlqPolicy>();
745 let sink = bus.read::<Arc<dyn DlqSink>>();
746 match (sink, policy) {
747 (Some(s), Some(p)) if !matches!(p, DlqPolicy::Drop) => Some(s.clone()),
748 _ => None,
749 }
750 };
751
752 if let Some(sink) = dlq_action {
753 if let Some((max_attempts, _)) = dlq_retry_config
754 && let Some(timeline) = bus.read_mut::<Timeline>()
755 {
756 timeline.push(TimelineEvent::DlqExhausted {
757 node_id: node_id.to_string(),
758 total_attempts: max_attempts,
759 timestamp: now_ms(),
760 });
761 }
762
763 let trace_id = persistence_trace_id(bus);
764 let circuit = bus
765 .read::<ranvier_core::schematic::Schematic>()
766 .map(|s| s.name.clone())
767 .unwrap_or_default();
768 let _ = sink
769 .store_dead_letter(
770 &trace_id,
771 &circuit,
772 node_id,
773 &format!("{:?}", f),
774 &serde_json::to_vec(&f).unwrap_or_default(),
775 )
776 .await;
777 }
778 }
779
780 result
781}
782
783#[allow(clippy::too_many_arguments)]
784async fn run_this_compensated_step<Out, Next, E, Res, Comp>(
785 trans: &(impl Transition<Out, Next, Resources = Res, Error = E> + Clone + 'static),
786 comp: &Comp,
787 state: Out,
788 res: &Res,
789 bus: &mut Bus,
790 node_id: &str,
791 comp_node_id: &str,
792 node_label: &str,
793 bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
794 step_idx: u64,
795) -> Outcome<Next, E>
796where
797 Out: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
798 Next: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
799 E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
800 Res: ranvier_core::transition::ResourceRequirement,
801 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
802{
803 let label = trans.label();
804
805 let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
807 debug.should_pause(node_id)
808 } else {
809 false
810 };
811
812 if should_pause {
813 let trace_id = persistence_trace_id(bus);
814 tracing::event!(
815 target: "ranvier.debugger",
816 tracing::Level::INFO,
817 trace_id = %trace_id,
818 node_id = %node_id,
819 "Node paused (compensated)"
820 );
821
822 if let Some(timeline) = bus.read_mut::<Timeline>() {
823 timeline.push(TimelineEvent::NodePaused {
824 node_id: node_id.to_string(),
825 timestamp: now_ms(),
826 });
827 }
828 if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
829 debug.wait().await;
830 }
831 }
832
833 let enter_ts = now_ms();
834 if let Some(timeline) = bus.read_mut::<Timeline>() {
835 timeline.push(TimelineEvent::NodeEnter {
836 node_id: node_id.to_string(),
837 node_label: node_label.to_string(),
838 timestamp: enter_ts,
839 });
840 }
841
842 let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
844 Some(serde_json::to_vec(&state).unwrap_or_default())
845 } else {
846 None
847 };
848
849 let node_span = tracing::info_span!("Node", ranvier.node = %label);
850 bus.set_access_policy(label.clone(), bus_policy.clone());
851 let result = trans
852 .run(state.clone(), res, bus)
853 .instrument(node_span)
854 .await;
855 bus.clear_access_policy();
856
857 let duration_ms = 0; let exit_ts = now_ms();
859
860 if let Some(timeline) = bus.read_mut::<Timeline>() {
861 timeline.push(TimelineEvent::NodeExit {
862 node_id: node_id.to_string(),
863 outcome_type: outcome_type_name(&result),
864 duration_ms,
865 timestamp: exit_ts,
866 });
867 }
868
869 if let Outcome::Fault(ref err) = result {
871 if compensation_auto_trigger(bus) {
872 tracing::info!(
873 ranvier.node = %label,
874 ranvier.compensation.trigger = "saga",
875 error = ?err,
876 "Saga compensation triggered"
877 );
878
879 if let Some(timeline) = bus.read_mut::<Timeline>() {
880 timeline.push(TimelineEvent::NodeEnter {
881 node_id: comp_node_id.to_string(),
882 node_label: format!("Compensate: {}", comp.label()),
883 timestamp: exit_ts,
884 });
885 }
886
887 let _ = comp.run(state, res, bus).await;
889
890 if let Some(timeline) = bus.read_mut::<Timeline>() {
891 timeline.push(TimelineEvent::NodeExit {
892 node_id: comp_node_id.to_string(),
893 outcome_type: "Compensated".to_string(),
894 duration_ms: 0,
895 timestamp: now_ms(),
896 });
897 }
898
899 if let Some(handle) = bus.read::<PersistenceHandle>() {
900 let trace_id = persistence_trace_id(bus);
901 let circuit = bus
902 .read::<ranvier_core::schematic::Schematic>()
903 .map(|s| s.name.clone())
904 .unwrap_or_default();
905 let version = bus
906 .read::<ranvier_core::schematic::Schematic>()
907 .map(|s| s.schema_version.clone())
908 .unwrap_or_default();
909
910 persist_execution_event(
911 handle,
912 &trace_id,
913 &circuit,
914 &version,
915 step_idx + 1, Some(comp_node_id.to_string()),
917 "Compensated",
918 None,
919 )
920 .await;
921 }
922 }
923 } else if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot) {
924 if let Some(stack) = bus.read_mut::<SagaStack>() {
926 stack.push(node_id.to_string(), label.clone(), snapshot);
927 }
928
929 if let Some(handle) = bus.read::<PersistenceHandle>() {
930 let trace_id = persistence_trace_id(bus);
931 let circuit = bus
932 .read::<ranvier_core::schematic::Schematic>()
933 .map(|s| s.name.clone())
934 .unwrap_or_default();
935 let version = bus
936 .read::<ranvier_core::schematic::Schematic>()
937 .map(|s| s.schema_version.clone())
938 .unwrap_or_default();
939
940 persist_execution_event(
941 handle,
942 &trace_id,
943 &circuit,
944 &version,
945 step_idx,
946 Some(node_id.to_string()),
947 outcome_kind_name(&result),
948 Some(result.to_json_value()),
949 )
950 .await;
951 }
952 }
953
954 if let Outcome::Fault(f) = &result
956 && let (Some(sink), Some(policy)) =
957 (bus.read::<Arc<dyn DlqSink>>(), bus.read::<DlqPolicy>())
958 {
959 let should_dlq = !matches!(policy, DlqPolicy::Drop);
960 if should_dlq {
961 let trace_id = persistence_trace_id(bus);
962 let circuit = bus
963 .read::<ranvier_core::schematic::Schematic>()
964 .map(|s| s.name.clone())
965 .unwrap_or_default();
966 let _ = sink
967 .store_dead_letter(
968 &trace_id,
969 &circuit,
970 node_id,
971 &format!("{:?}", f),
972 &serde_json::to_vec(&f).unwrap_or_default(),
973 )
974 .await;
975 }
976 }
977
978 result
979}
980
981#[allow(clippy::too_many_arguments)]
982pub async fn persist_execution_event(
983 handle: &PersistenceHandle,
984 trace_id: &str,
985 circuit: &str,
986 schematic_version: &str,
987 step: u64,
988 node_id: Option<String>,
989 outcome_kind: &str,
990 payload: Option<serde_json::Value>,
991) {
992 let store = handle.store();
993 let envelope = PersistenceEnvelope {
994 trace_id: trace_id.to_string(),
995 circuit: circuit.to_string(),
996 schematic_version: schematic_version.to_string(),
997 step,
998 node_id,
999 outcome_kind: outcome_kind.to_string(),
1000 timestamp_ms: now_ms(),
1001 payload_hash: None,
1002 payload,
1003 };
1004
1005 if let Err(err) = store.append(envelope).await {
1006 tracing::warn!(
1007 trace_id = %trace_id,
1008 circuit = %circuit,
1009 step,
1010 outcome_kind = %outcome_kind,
1011 error = %err,
1012 "Failed to append persistence envelope"
1013 );
1014 }
1015}
1016
1017async fn persist_completion(
1018 handle: &PersistenceHandle,
1019 trace_id: &str,
1020 completion: CompletionState,
1021) {
1022 let store = handle.store();
1023 if let Err(err) = store.complete(trace_id, completion).await {
1024 tracing::warn!(
1025 trace_id = %trace_id,
1026 error = %err,
1027 "Failed to complete persistence trace"
1028 );
1029 }
1030}
1031
1032fn compensation_idempotency_key(context: &CompensationContext) -> String {
1033 format!(
1034 "{}:{}:{}",
1035 context.trace_id, context.circuit, context.fault_kind
1036 )
1037}
1038
1039async fn run_compensation(
1040 handle: &CompensationHandle,
1041 context: CompensationContext,
1042 retry_policy: CompensationRetryPolicy,
1043 idempotency: Option<CompensationIdempotencyHandle>,
1044) -> bool {
1045 let hook = handle.hook();
1046 let key = compensation_idempotency_key(&context);
1047
1048 if let Some(store_handle) = idempotency.as_ref() {
1049 let store = store_handle.store();
1050 match store.was_compensated(&key).await {
1051 Ok(true) => {
1052 tracing::info!(
1053 trace_id = %context.trace_id,
1054 circuit = %context.circuit,
1055 key = %key,
1056 "Compensation already recorded; skipping duplicate hook execution"
1057 );
1058 return true;
1059 }
1060 Ok(false) => {}
1061 Err(err) => {
1062 tracing::warn!(
1063 trace_id = %context.trace_id,
1064 key = %key,
1065 error = %err,
1066 "Failed to check compensation idempotency state"
1067 );
1068 }
1069 }
1070 }
1071
1072 let max_attempts = retry_policy.max_attempts.max(1);
1073 for attempt in 1..=max_attempts {
1074 match hook.compensate(context.clone()).await {
1075 Ok(()) => {
1076 if let Some(store_handle) = idempotency.as_ref() {
1077 let store = store_handle.store();
1078 if let Err(err) = store.mark_compensated(&key).await {
1079 tracing::warn!(
1080 trace_id = %context.trace_id,
1081 key = %key,
1082 error = %err,
1083 "Failed to mark compensation idempotency state"
1084 );
1085 }
1086 }
1087 return true;
1088 }
1089 Err(err) => {
1090 let is_last = attempt == max_attempts;
1091 tracing::warn!(
1092 trace_id = %context.trace_id,
1093 circuit = %context.circuit,
1094 fault_kind = %context.fault_kind,
1095 fault_step = context.fault_step,
1096 attempt,
1097 max_attempts,
1098 error = %err,
1099 "Compensation hook attempt failed"
1100 );
1101 if !is_last && retry_policy.backoff_ms > 0 {
1102 tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
1103 .await;
1104 }
1105 }
1106 }
1107 }
1108 false
1109}
1110
1111fn ensure_timeline(bus: &mut Bus) -> bool {
1112 if bus.has::<Timeline>() {
1113 false
1114 } else {
1115 bus.insert(Timeline::new());
1116 true
1117 }
1118}
1119
1120fn should_attach_timeline(bus: &Bus) -> bool {
1121 if bus.has::<Timeline>() {
1123 return true;
1124 }
1125
1126 has_timeline_output_path()
1128}
1129
1130fn has_timeline_output_path() -> bool {
1131 std::env::var("RANVIER_TIMELINE_OUTPUT")
1132 .ok()
1133 .map(|v| !v.trim().is_empty())
1134 .unwrap_or(false)
1135}
1136
1137fn timeline_sample_rate() -> f64 {
1138 std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
1139 .ok()
1140 .and_then(|v| v.parse::<f64>().ok())
1141 .map(|v| v.clamp(0.0, 1.0))
1142 .unwrap_or(1.0)
1143}
1144
1145fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
1146 if rate <= 0.0 {
1147 return false;
1148 }
1149 if rate >= 1.0 {
1150 return true;
1151 }
1152 let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
1153 bucket < rate
1154}
1155
1156fn timeline_adaptive_policy() -> String {
1157 std::env::var("RANVIER_TIMELINE_ADAPTIVE")
1158 .unwrap_or_else(|_| "fault_branch".to_string())
1159 .to_ascii_lowercase()
1160}
1161
1162fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
1163 match policy {
1164 "off" => false,
1165 "fault_only" => matches!(outcome, Outcome::Fault(_)),
1166 "fault_branch_emit" => {
1167 matches!(
1168 outcome,
1169 Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
1170 )
1171 }
1172 _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
1173 }
1174}
1175
1176#[derive(Default, Clone)]
1177struct SamplingStats {
1178 total_decisions: u64,
1179 exported: u64,
1180 skipped: u64,
1181 sampled_exports: u64,
1182 forced_exports: u64,
1183 last_mode: String,
1184 last_policy: String,
1185 last_updated_ms: u64,
1186}
1187
1188static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
1189
1190fn stats_cell() -> &'static Mutex<SamplingStats> {
1191 TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
1192}
1193
1194fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
1195 let snapshot = {
1196 let mut stats = match stats_cell().lock() {
1197 Ok(guard) => guard,
1198 Err(_) => return,
1199 };
1200
1201 stats.total_decisions += 1;
1202 if exported {
1203 stats.exported += 1;
1204 } else {
1205 stats.skipped += 1;
1206 }
1207 if sampled && exported {
1208 stats.sampled_exports += 1;
1209 }
1210 if forced && exported {
1211 stats.forced_exports += 1;
1212 }
1213 stats.last_mode = mode.to_string();
1214 stats.last_policy = policy.to_string();
1215 stats.last_updated_ms = now_ms();
1216 stats.clone()
1217 };
1218
1219 tracing::debug!(
1220 ranvier.timeline.total_decisions = snapshot.total_decisions,
1221 ranvier.timeline.exported = snapshot.exported,
1222 ranvier.timeline.skipped = snapshot.skipped,
1223 ranvier.timeline.sampled_exports = snapshot.sampled_exports,
1224 ranvier.timeline.forced_exports = snapshot.forced_exports,
1225 ranvier.timeline.mode = %snapshot.last_mode,
1226 ranvier.timeline.policy = %snapshot.last_policy,
1227 "Timeline sampling stats updated"
1228 );
1229
1230 if let Some(path) = timeline_stats_output_path() {
1231 let payload = serde_json::json!({
1232 "total_decisions": snapshot.total_decisions,
1233 "exported": snapshot.exported,
1234 "skipped": snapshot.skipped,
1235 "sampled_exports": snapshot.sampled_exports,
1236 "forced_exports": snapshot.forced_exports,
1237 "last_mode": snapshot.last_mode,
1238 "last_policy": snapshot.last_policy,
1239 "last_updated_ms": snapshot.last_updated_ms
1240 });
1241 if let Some(parent) = Path::new(&path).parent() {
1242 let _ = fs::create_dir_all(parent);
1243 }
1244 if let Err(err) = fs::write(&path, payload.to_string()) {
1245 tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
1246 }
1247 }
1248}
1249
1250fn timeline_stats_output_path() -> Option<String> {
1251 std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
1252 .ok()
1253 .filter(|v| !v.trim().is_empty())
1254}
1255
1256fn write_timeline_with_policy(
1257 path: &str,
1258 mode: &str,
1259 mut timeline: Timeline,
1260) -> Result<(), String> {
1261 match mode {
1262 "append" => {
1263 if Path::new(path).exists() {
1264 let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
1265 match serde_json::from_str::<Timeline>(&content) {
1266 Ok(mut existing) => {
1267 existing.events.append(&mut timeline.events);
1268 existing.sort();
1269 if let Some(max_events) = max_events_limit() {
1270 truncate_timeline_events(&mut existing, max_events);
1271 }
1272 write_timeline_json(path, &existing)
1273 }
1274 Err(_) => {
1275 if let Some(max_events) = max_events_limit() {
1277 truncate_timeline_events(&mut timeline, max_events);
1278 }
1279 write_timeline_json(path, &timeline)
1280 }
1281 }
1282 } else {
1283 if let Some(max_events) = max_events_limit() {
1284 truncate_timeline_events(&mut timeline, max_events);
1285 }
1286 write_timeline_json(path, &timeline)
1287 }
1288 }
1289 "rotate" => {
1290 let rotated_path = rotated_path(path, now_ms());
1291 write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
1292 if let Some(keep) = rotate_keep_limit() {
1293 cleanup_rotated_files(path, keep)?;
1294 }
1295 Ok(())
1296 }
1297 _ => write_timeline_json(path, &timeline),
1298 }
1299}
1300
1301fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
1302 if let Some(parent) = Path::new(path).parent()
1303 && !parent.as_os_str().is_empty()
1304 {
1305 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1306 }
1307 let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
1308 fs::write(path, json).map_err(|e| e.to_string())
1309}
1310
1311fn rotated_path(path: &str, suffix: u64) -> PathBuf {
1312 let p = Path::new(path);
1313 let parent = p.parent().unwrap_or_else(|| Path::new(""));
1314 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
1315 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
1316 parent.join(format!("{}_{}.{}", stem, suffix, ext))
1317}
1318
1319fn max_events_limit() -> Option<usize> {
1320 std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
1321 .ok()
1322 .and_then(|v| v.parse::<usize>().ok())
1323 .filter(|v| *v > 0)
1324}
1325
1326fn rotate_keep_limit() -> Option<usize> {
1327 std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
1328 .ok()
1329 .and_then(|v| v.parse::<usize>().ok())
1330 .filter(|v| *v > 0)
1331}
1332
1333fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
1334 let len = timeline.events.len();
1335 if len > max_events {
1336 let keep_from = len - max_events;
1337 timeline.events = timeline.events.split_off(keep_from);
1338 }
1339}
1340
1341fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
1342 let p = Path::new(base_path);
1343 let parent = p.parent().unwrap_or_else(|| Path::new("."));
1344 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
1345 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
1346 let prefix = format!("{}_", stem);
1347 let suffix = format!(".{}", ext);
1348
1349 let mut files = fs::read_dir(parent)
1350 .map_err(|e| e.to_string())?
1351 .filter_map(|entry| entry.ok())
1352 .filter(|entry| {
1353 let name = entry.file_name();
1354 let name = name.to_string_lossy();
1355 name.starts_with(&prefix) && name.ends_with(&suffix)
1356 })
1357 .map(|entry| {
1358 let modified = entry
1359 .metadata()
1360 .ok()
1361 .and_then(|m| m.modified().ok())
1362 .unwrap_or(SystemTime::UNIX_EPOCH);
1363 (entry.path(), modified)
1364 })
1365 .collect::<Vec<_>>();
1366
1367 files.sort_by(|a, b| b.1.cmp(&a.1));
1368 for (path, _) in files.into_iter().skip(keep) {
1369 let _ = fs::remove_file(path);
1370 }
1371 Ok(())
1372}
1373
1374fn bus_capability_schema_from_policy(
1375 policy: Option<ranvier_core::bus::BusAccessPolicy>,
1376) -> Option<BusCapabilitySchema> {
1377 let policy = policy?;
1378
1379 let mut allow = policy
1380 .allow
1381 .unwrap_or_default()
1382 .into_iter()
1383 .map(|entry| entry.type_name.to_string())
1384 .collect::<Vec<_>>();
1385 let mut deny = policy
1386 .deny
1387 .into_iter()
1388 .map(|entry| entry.type_name.to_string())
1389 .collect::<Vec<_>>();
1390 allow.sort();
1391 deny.sort();
1392
1393 if allow.is_empty() && deny.is_empty() {
1394 return None;
1395 }
1396
1397 Some(BusCapabilitySchema { allow, deny })
1398}
1399
1400fn now_ms() -> u64 {
1401 SystemTime::now()
1402 .duration_since(UNIX_EPOCH)
1403 .map(|d| d.as_millis() as u64)
1404 .unwrap_or(0)
1405}
1406
1407#[cfg(test)]
1408mod tests {
1409 use super::{
1410 Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
1411 should_force_export,
1412 };
1413 use crate::persistence::{
1414 CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
1415 CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
1416 InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
1417 PersistenceHandle, PersistenceStore, PersistenceTraceId,
1418 };
1419 use anyhow::Result;
1420 use async_trait::async_trait;
1421 use ranvier_audit::{AuditError, AuditEvent, AuditSink};
1422 use ranvier_core::event::{DlqPolicy, DlqSink};
1423 use ranvier_core::saga::SagaStack;
1424 use ranvier_core::timeline::{Timeline, TimelineEvent};
1425 use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
1426 use serde::{Deserialize, Serialize};
1427 use std::sync::Arc;
1428 use tokio::sync::Mutex;
1429 use uuid::Uuid;
1430
1431 struct MockAuditSink {
1432 events: Arc<Mutex<Vec<AuditEvent>>>,
1433 }
1434
1435 #[async_trait]
1436 impl AuditSink for MockAuditSink {
1437 async fn append(&self, event: &AuditEvent) -> Result<(), AuditError> {
1438 self.events.lock().await.push(event.clone());
1439 Ok(())
1440 }
1441 }
1442
1443 #[tokio::test]
1444 async fn execute_logs_audit_events_for_intervention() {
1445 use ranvier_inspector::StateInspector;
1446
1447 let trace_id = "test-audit-trace";
1448 let store_impl = InMemoryPersistenceStore::new();
1449 let events = Arc::new(Mutex::new(Vec::new()));
1450 let sink = MockAuditSink {
1451 events: events.clone(),
1452 };
1453
1454 let axon = Axon::<i32, i32, TestInfallible>::start("AuditTest")
1455 .then(AddOne)
1456 .with_persistence_store(store_impl.clone())
1457 .with_audit_sink(sink);
1458
1459 let mut bus = Bus::new();
1460 bus.insert(PersistenceHandle::from_arc(Arc::new(store_impl.clone())));
1461 bus.insert(PersistenceTraceId::new(trace_id));
1462 let target_node_id = axon.schematic.nodes[0].id.clone();
1463
1464 store_impl
1466 .append(crate::persistence::PersistenceEnvelope {
1467 trace_id: trace_id.to_string(),
1468 circuit: "AuditTest".to_string(),
1469 schematic_version: "v1.0".to_string(),
1470 step: 0,
1471 node_id: None,
1472 outcome_kind: "Next".to_string(),
1473 timestamp_ms: 0,
1474 payload_hash: None,
1475 payload: None,
1476 })
1477 .await
1478 .unwrap();
1479
1480 axon.force_resume(trace_id, &target_node_id, None)
1482 .await
1483 .unwrap();
1484
1485 axon.execute(10, &(), &mut bus).await;
1487
1488 let recorded = events.lock().await;
1489 assert_eq!(
1490 recorded.len(),
1491 2,
1492 "Should have 2 audit events: ForceResume and ApplyIntervention"
1493 );
1494 assert_eq!(recorded[0].action, "ForceResume");
1495 assert_eq!(recorded[0].target, trace_id);
1496 assert_eq!(recorded[1].action, "ApplyIntervention");
1497 assert_eq!(recorded[1].target, trace_id);
1498 }
1499
1500 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1501 pub enum TestInfallible {}
1502
1503 #[test]
1504 fn inspector_enabled_flag_matrix() {
1505 assert!(inspector_enabled_from_value(None));
1506 assert!(inspector_enabled_from_value(Some("1")));
1507 assert!(inspector_enabled_from_value(Some("true")));
1508 assert!(inspector_enabled_from_value(Some("on")));
1509 assert!(!inspector_enabled_from_value(Some("0")));
1510 assert!(!inspector_enabled_from_value(Some("false")));
1511 }
1512
1513 #[test]
1514 fn inspector_dev_mode_matrix() {
1515 assert!(inspector_dev_mode_from_value(None));
1516 assert!(inspector_dev_mode_from_value(Some("dev")));
1517 assert!(inspector_dev_mode_from_value(Some("staging")));
1518 assert!(!inspector_dev_mode_from_value(Some("prod")));
1519 assert!(!inspector_dev_mode_from_value(Some("production")));
1520 }
1521
1522 #[test]
1523 fn adaptive_policy_force_export_matrix() {
1524 let next = Outcome::<(), &'static str>::Next(());
1525 let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
1526 let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
1527 let fault = Outcome::<(), &'static str>::Fault("boom");
1528
1529 assert!(!should_force_export(&next, "off"));
1530 assert!(!should_force_export(&fault, "off"));
1531
1532 assert!(!should_force_export(&branch, "fault_only"));
1533 assert!(should_force_export(&fault, "fault_only"));
1534
1535 assert!(should_force_export(&branch, "fault_branch"));
1536 assert!(!should_force_export(&emit, "fault_branch"));
1537 assert!(should_force_export(&fault, "fault_branch"));
1538
1539 assert!(should_force_export(&branch, "fault_branch_emit"));
1540 assert!(should_force_export(&emit, "fault_branch_emit"));
1541 assert!(should_force_export(&fault, "fault_branch_emit"));
1542 }
1543
1544 #[test]
1545 fn sampling_and_adaptive_combination_decisions() {
1546 let bus_id = Uuid::nil();
1547 let next = Outcome::<(), &'static str>::Next(());
1548 let fault = Outcome::<(), &'static str>::Fault("boom");
1549
1550 let sampled_never = sampled_by_bus_id(bus_id, 0.0);
1551 assert!(!sampled_never);
1552 assert!(!(sampled_never || should_force_export(&next, "off")));
1553 assert!(sampled_never || should_force_export(&fault, "fault_only"));
1554
1555 let sampled_always = sampled_by_bus_id(bus_id, 1.0);
1556 assert!(sampled_always);
1557 assert!(sampled_always || should_force_export(&next, "off"));
1558 assert!(sampled_always || should_force_export(&fault, "off"));
1559 }
1560
1561 #[derive(Clone)]
1562 struct AddOne;
1563
1564 #[async_trait]
1565 impl Transition<i32, i32> for AddOne {
1566 type Error = TestInfallible;
1567 type Resources = ();
1568
1569 async fn run(
1570 &self,
1571 state: i32,
1572 _resources: &Self::Resources,
1573 _bus: &mut Bus,
1574 ) -> Outcome<i32, Self::Error> {
1575 Outcome::Next(state + 1)
1576 }
1577 }
1578
1579 #[derive(Clone)]
1580 struct AlwaysFault;
1581
1582 #[async_trait]
1583 impl Transition<i32, i32> for AlwaysFault {
1584 type Error = String;
1585 type Resources = ();
1586
1587 async fn run(
1588 &self,
1589 _state: i32,
1590 _resources: &Self::Resources,
1591 _bus: &mut Bus,
1592 ) -> Outcome<i32, Self::Error> {
1593 Outcome::Fault("boom".to_string())
1594 }
1595 }
1596
1597 #[derive(Clone)]
1598 struct CapabilityGuarded;
1599
1600 #[async_trait]
1601 impl Transition<(), ()> for CapabilityGuarded {
1602 type Error = String;
1603 type Resources = ();
1604
1605 fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
1606 Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
1607 }
1608
1609 async fn run(
1610 &self,
1611 _state: (),
1612 _resources: &Self::Resources,
1613 bus: &mut Bus,
1614 ) -> Outcome<(), Self::Error> {
1615 match bus.get::<String>() {
1616 Ok(_) => Outcome::Next(()),
1617 Err(err) => Outcome::Fault(err.to_string()),
1618 }
1619 }
1620 }
1621
1622 #[derive(Clone)]
1623 struct RecordingCompensationHook {
1624 calls: Arc<Mutex<Vec<CompensationContext>>>,
1625 should_fail: bool,
1626 }
1627
1628 #[async_trait]
1629 impl CompensationHook for RecordingCompensationHook {
1630 async fn compensate(&self, context: CompensationContext) -> Result<()> {
1631 self.calls.lock().await.push(context);
1632 if self.should_fail {
1633 return Err(anyhow::anyhow!("compensation failed"));
1634 }
1635 Ok(())
1636 }
1637 }
1638
1639 #[derive(Clone)]
1640 struct FlakyCompensationHook {
1641 calls: Arc<Mutex<u32>>,
1642 failures_remaining: Arc<Mutex<u32>>,
1643 }
1644
1645 #[async_trait]
1646 impl CompensationHook for FlakyCompensationHook {
1647 async fn compensate(&self, _context: CompensationContext) -> Result<()> {
1648 {
1649 let mut calls = self.calls.lock().await;
1650 *calls += 1;
1651 }
1652 let mut failures_remaining = self.failures_remaining.lock().await;
1653 if *failures_remaining > 0 {
1654 *failures_remaining -= 1;
1655 return Err(anyhow::anyhow!("transient compensation failure"));
1656 }
1657 Ok(())
1658 }
1659 }
1660
1661 #[derive(Clone)]
1662 struct FailingCompensationIdempotencyStore {
1663 read_calls: Arc<Mutex<u32>>,
1664 write_calls: Arc<Mutex<u32>>,
1665 }
1666
1667 #[async_trait]
1668 impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
1669 async fn was_compensated(&self, _key: &str) -> Result<bool> {
1670 let mut read_calls = self.read_calls.lock().await;
1671 *read_calls += 1;
1672 Err(anyhow::anyhow!("forced idempotency read failure"))
1673 }
1674
1675 async fn mark_compensated(&self, _key: &str) -> Result<()> {
1676 let mut write_calls = self.write_calls.lock().await;
1677 *write_calls += 1;
1678 Err(anyhow::anyhow!("forced idempotency write failure"))
1679 }
1680 }
1681
1682 #[tokio::test]
1683 async fn execute_persists_success_trace_when_handle_exists() {
1684 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1685 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1686
1687 let mut bus = Bus::new();
1688 bus.insert(PersistenceHandle::from_arc(store_dyn));
1689 bus.insert(PersistenceTraceId::new("trace-success"));
1690
1691 let axon = Axon::<i32, i32, TestInfallible>::start("PersistSuccess").then(AddOne);
1692 let outcome = axon.execute(41, &(), &mut bus).await;
1693 assert!(matches!(outcome, Outcome::Next(42)));
1694
1695 let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
1696 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[0].outcome_kind, "Enter");
1698 assert_eq!(persisted.events[1].outcome_kind, "Next"); assert_eq!(persisted.events[2].outcome_kind, "Next"); assert_eq!(persisted.completion, Some(CompletionState::Success));
1701 }
1702
1703 #[tokio::test]
1704 async fn execute_persists_fault_completion_state() {
1705 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1706 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1707
1708 let mut bus = Bus::new();
1709 bus.insert(PersistenceHandle::from_arc(store_dyn));
1710 bus.insert(PersistenceTraceId::new("trace-fault"));
1711
1712 let axon = Axon::<i32, i32, String>::start("PersistFault").then(AlwaysFault);
1713 let outcome = axon.execute(41, &(), &mut bus).await;
1714 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
1715
1716 let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
1717 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[1].outcome_kind, "Fault"); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.completion, Some(CompletionState::Fault));
1721 }
1722
1723 #[tokio::test]
1724 async fn execute_respects_persistence_auto_complete_off() {
1725 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1726 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1727
1728 let mut bus = Bus::new();
1729 bus.insert(PersistenceHandle::from_arc(store_dyn));
1730 bus.insert(PersistenceTraceId::new("trace-no-complete"));
1731 bus.insert(PersistenceAutoComplete(false));
1732
1733 let axon = Axon::<i32, i32, TestInfallible>::start("PersistNoComplete").then(AddOne);
1734 let outcome = axon.execute(1, &(), &mut bus).await;
1735 assert!(matches!(outcome, Outcome::Next(2)));
1736
1737 let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
1738 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.completion, None);
1740 }
1741
1742 #[tokio::test]
1743 async fn fault_triggers_compensation_and_marks_compensated() {
1744 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1745 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1746 let calls = Arc::new(Mutex::new(Vec::new()));
1747 let compensation = RecordingCompensationHook {
1748 calls: calls.clone(),
1749 should_fail: false,
1750 };
1751
1752 let mut bus = Bus::new();
1753 bus.insert(PersistenceHandle::from_arc(store_dyn));
1754 bus.insert(PersistenceTraceId::new("trace-compensated"));
1755 bus.insert(CompensationHandle::from_hook(compensation));
1756
1757 let axon = Axon::<i32, i32, String>::start("CompensatedFault").then(AlwaysFault);
1758 let outcome = axon.execute(7, &(), &mut bus).await;
1759 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
1760
1761 let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
1762 assert_eq!(persisted.events.len(), 4); assert_eq!(persisted.events[0].outcome_kind, "Enter");
1764 assert_eq!(persisted.events[1].outcome_kind, "Fault"); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.events[3].outcome_kind, "Compensated");
1767 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
1768
1769 let recorded = calls.lock().await;
1770 assert_eq!(recorded.len(), 1);
1771 assert_eq!(recorded[0].trace_id, "trace-compensated");
1772 assert_eq!(recorded[0].fault_kind, "Fault");
1773 }
1774
1775 #[tokio::test]
1776 async fn failed_compensation_keeps_fault_completion() {
1777 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1778 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1779 let calls = Arc::new(Mutex::new(Vec::new()));
1780 let compensation = RecordingCompensationHook {
1781 calls: calls.clone(),
1782 should_fail: true,
1783 };
1784
1785 let mut bus = Bus::new();
1786 bus.insert(PersistenceHandle::from_arc(store_dyn));
1787 bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
1788 bus.insert(CompensationHandle::from_hook(compensation));
1789
1790 let axon = Axon::<i32, i32, String>::start("CompensationFails").then(AlwaysFault);
1791 let outcome = axon.execute(7, &(), &mut bus).await;
1792 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
1793
1794 let persisted = store_impl
1795 .load("trace-compensation-failed")
1796 .await
1797 .unwrap()
1798 .unwrap();
1799 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.completion, Some(CompletionState::Fault));
1802
1803 let recorded = calls.lock().await;
1804 assert_eq!(recorded.len(), 1);
1805 }
1806
1807 #[tokio::test]
1808 async fn compensation_retry_policy_succeeds_after_retries() {
1809 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1810 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1811 let calls = Arc::new(Mutex::new(0u32));
1812 let failures_remaining = Arc::new(Mutex::new(2u32));
1813 let compensation = FlakyCompensationHook {
1814 calls: calls.clone(),
1815 failures_remaining,
1816 };
1817
1818 let mut bus = Bus::new();
1819 bus.insert(PersistenceHandle::from_arc(store_dyn));
1820 bus.insert(PersistenceTraceId::new("trace-retry-success"));
1821 bus.insert(CompensationHandle::from_hook(compensation));
1822 bus.insert(CompensationRetryPolicy {
1823 max_attempts: 3,
1824 backoff_ms: 0,
1825 });
1826
1827 let axon = Axon::<i32, i32, String>::start("CompensationRetry").then(AlwaysFault);
1828 let outcome = axon.execute(7, &(), &mut bus).await;
1829 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
1830
1831 let persisted = store_impl
1832 .load("trace-retry-success")
1833 .await
1834 .unwrap()
1835 .unwrap();
1836 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
1837 assert_eq!(
1838 persisted.events.last().map(|e| e.outcome_kind.as_str()),
1839 Some("Compensated")
1840 );
1841
1842 let attempt_count = calls.lock().await;
1843 assert_eq!(*attempt_count, 3);
1844 }
1845
1846 #[tokio::test]
1847 async fn compensation_idempotency_skips_duplicate_hook_execution() {
1848 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1849 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1850 let calls = Arc::new(Mutex::new(Vec::new()));
1851 let compensation = RecordingCompensationHook {
1852 calls: calls.clone(),
1853 should_fail: false,
1854 };
1855 let idempotency = InMemoryCompensationIdempotencyStore::new();
1856
1857 let mut bus = Bus::new();
1858 bus.insert(PersistenceHandle::from_arc(store_dyn));
1859 bus.insert(PersistenceTraceId::new("trace-idempotent"));
1860 bus.insert(PersistenceAutoComplete(false));
1861 bus.insert(CompensationHandle::from_hook(compensation));
1862 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
1863
1864 let axon = Axon::<i32, i32, String>::start("CompensationIdempotency").then(AlwaysFault);
1865
1866 let outcome1 = axon.execute(7, &(), &mut bus).await;
1867 let outcome2 = axon.execute(8, &(), &mut bus).await;
1868 assert!(matches!(outcome1, Outcome::Fault(msg) if msg == "boom"));
1869 assert!(matches!(outcome2, Outcome::Fault(msg) if msg == "boom"));
1870
1871 let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
1872 assert_eq!(persisted.completion, None);
1873 let compensated_count = persisted
1875 .events
1876 .iter()
1877 .filter(|e| e.outcome_kind == "Compensated")
1878 .count();
1879 assert_eq!(
1880 compensated_count, 2,
1881 "Should have 2 Compensated events (one per execution)"
1882 );
1883
1884 let recorded = calls.lock().await;
1885 assert_eq!(recorded.len(), 1);
1886 }
1887
1888 #[tokio::test]
1889 async fn compensation_idempotency_store_failure_does_not_block_compensation() {
1890 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1891 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1892 let calls = Arc::new(Mutex::new(Vec::new()));
1893 let read_calls = Arc::new(Mutex::new(0u32));
1894 let write_calls = Arc::new(Mutex::new(0u32));
1895 let compensation = RecordingCompensationHook {
1896 calls: calls.clone(),
1897 should_fail: false,
1898 };
1899 let idempotency = FailingCompensationIdempotencyStore {
1900 read_calls: read_calls.clone(),
1901 write_calls: write_calls.clone(),
1902 };
1903
1904 let mut bus = Bus::new();
1905 bus.insert(PersistenceHandle::from_arc(store_dyn));
1906 bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
1907 bus.insert(CompensationHandle::from_hook(compensation));
1908 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
1909
1910 let axon = Axon::<i32, i32, String>::start("IdempotencyStoreFailure").then(AlwaysFault);
1911 let outcome = axon.execute(9, &(), &mut bus).await;
1912 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
1913
1914 let persisted = store_impl
1915 .load("trace-idempotency-store-failure")
1916 .await
1917 .unwrap()
1918 .unwrap();
1919 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
1920 assert_eq!(
1921 persisted.events.last().map(|e| e.outcome_kind.as_str()),
1922 Some("Compensated")
1923 );
1924
1925 let recorded = calls.lock().await;
1926 assert_eq!(recorded.len(), 1);
1927 assert_eq!(*read_calls.lock().await, 1);
1928 assert_eq!(*write_calls.lock().await, 1);
1929 }
1930
1931 #[tokio::test]
1932 async fn transition_bus_policy_blocks_unauthorized_resource_access() {
1933 let mut bus = Bus::new();
1934 bus.insert(1_i32);
1935 bus.insert("secret".to_string());
1936
1937 let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
1938 let outcome = axon.execute((), &(), &mut bus).await;
1939
1940 match outcome {
1941 Outcome::Fault(msg) => {
1942 assert!(msg.contains("Bus access denied"), "{msg}");
1943 assert!(msg.contains("CapabilityGuarded"), "{msg}");
1944 assert!(msg.contains("alloc::string::String"), "{msg}");
1945 }
1946 other => panic!("expected fault, got {other:?}"),
1947 }
1948 }
1949
1950 #[tokio::test]
1951 async fn execute_fails_on_version_mismatch_without_migration() {
1952 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1953 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1954
1955 let trace_id = "v-mismatch";
1956 let old_envelope = crate::persistence::PersistenceEnvelope {
1958 trace_id: trace_id.to_string(),
1959 circuit: "TestCircuit".to_string(),
1960 schematic_version: "0.9".to_string(),
1961 step: 0,
1962 node_id: None,
1963 outcome_kind: "Enter".to_string(),
1964 timestamp_ms: 0,
1965 payload_hash: None,
1966 payload: None,
1967 };
1968 store_impl.append(old_envelope).await.unwrap();
1969
1970 let mut bus = Bus::new();
1971 bus.insert(PersistenceHandle::from_arc(store_dyn));
1972 bus.insert(PersistenceTraceId::new(trace_id));
1973
1974 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
1976 let outcome = axon.execute(10, &(), &mut bus).await;
1977
1978 if let Outcome::Emit(kind, _) = outcome {
1979 assert_eq!(kind, "execution.resumption.version_mismatch_failed");
1980 } else {
1981 panic!("Expected version mismatch emission, got {:?}", outcome);
1982 }
1983 }
1984
1985 #[tokio::test]
1986 async fn execute_resumes_from_start_on_migration_strategy() {
1987 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1988 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1989
1990 let trace_id = "v-migration";
1991 let old_envelope = crate::persistence::PersistenceEnvelope {
1993 trace_id: trace_id.to_string(),
1994 circuit: "TestCircuit".to_string(),
1995 schematic_version: "0.9".to_string(),
1996 step: 5,
1997 node_id: None,
1998 outcome_kind: "Next".to_string(),
1999 timestamp_ms: 0,
2000 payload_hash: None,
2001 payload: None,
2002 };
2003 store_impl.append(old_envelope).await.unwrap();
2004
2005 let mut registry = ranvier_core::schematic::MigrationRegistry::new("TestCircuit");
2006 registry.register(ranvier_core::schematic::SnapshotMigration {
2007 name: Some("v0.9 to v1.0".to_string()),
2008 from_version: "0.9".to_string(),
2009 to_version: "1.0".to_string(),
2010 default_strategy: ranvier_core::schematic::MigrationStrategy::ResumeFromStart,
2011 node_mapping: std::collections::HashMap::new(),
2012 payload_mapper: None,
2013 });
2014
2015 let mut bus = Bus::new();
2016 bus.insert(PersistenceHandle::from_arc(store_dyn));
2017 bus.insert(PersistenceTraceId::new(trace_id));
2018 bus.insert(registry);
2019
2020 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
2021 let outcome = axon.execute(10, &(), &mut bus).await;
2022
2023 assert!(matches!(outcome, Outcome::Next(11)));
2025
2026 let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
2028 assert_eq!(persisted.schematic_version, "1.0");
2029 }
2030
2031 #[tokio::test]
2032 async fn execute_applies_manual_intervention_jump_and_payload() {
2033 let store_impl = Arc::new(InMemoryPersistenceStore::new());
2034 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
2035
2036 let trace_id = "intervention-test";
2037 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit")
2039 .then(AddOne)
2040 .then(AddOne);
2041
2042 let mut bus = Bus::new();
2043 bus.insert(PersistenceHandle::from_arc(store_dyn));
2044 bus.insert(PersistenceTraceId::new(trace_id));
2045
2046 let _target_node_label = "AddOne";
2051 let target_node_id = axon.schematic.nodes[2].id.clone();
2053
2054 store_impl
2056 .append(crate::persistence::PersistenceEnvelope {
2057 trace_id: trace_id.to_string(),
2058 circuit: "TestCircuit".to_string(),
2059 schematic_version: "1.0".to_string(),
2060 step: 0,
2061 node_id: None,
2062 outcome_kind: "Enter".to_string(),
2063 timestamp_ms: 0,
2064 payload_hash: None,
2065 payload: None,
2066 })
2067 .await
2068 .unwrap();
2069
2070 store_impl
2071 .save_intervention(
2072 trace_id,
2073 crate::persistence::Intervention {
2074 target_node: target_node_id.clone(),
2075 payload_override: Some(serde_json::json!(100)),
2076 timestamp_ms: 0,
2077 },
2078 )
2079 .await
2080 .unwrap();
2081
2082 let outcome = axon.execute(10, &(), &mut bus).await;
2085
2086 match outcome {
2087 Outcome::Next(val) => assert_eq!(val, 101, "Should have used payload 100 and added 1"),
2088 other => panic!("Expected Outcome::Next(101), got {:?}", other),
2089 }
2090
2091 let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
2093 assert_eq!(persisted.interventions.len(), 1);
2095 assert_eq!(persisted.interventions[0].target_node, target_node_id);
2096 }
2097
2098 #[derive(Clone)]
2102 struct FailNThenSucceed {
2103 remaining: Arc<tokio::sync::Mutex<u32>>,
2104 }
2105
2106 #[async_trait]
2107 impl Transition<i32, i32> for FailNThenSucceed {
2108 type Error = String;
2109 type Resources = ();
2110
2111 async fn run(
2112 &self,
2113 state: i32,
2114 _resources: &Self::Resources,
2115 _bus: &mut Bus,
2116 ) -> Outcome<i32, Self::Error> {
2117 let mut rem = self.remaining.lock().await;
2118 if *rem > 0 {
2119 *rem -= 1;
2120 Outcome::Fault("transient failure".to_string())
2121 } else {
2122 Outcome::Next(state + 1)
2123 }
2124 }
2125 }
2126
2127 #[derive(Clone)]
2129 struct MockDlqSink {
2130 letters: Arc<tokio::sync::Mutex<Vec<String>>>,
2131 }
2132
2133 #[async_trait]
2134 impl DlqSink for MockDlqSink {
2135 async fn store_dead_letter(
2136 &self,
2137 workflow_id: &str,
2138 _circuit_label: &str,
2139 node_id: &str,
2140 error_msg: &str,
2141 _payload: &[u8],
2142 ) -> Result<(), String> {
2143 let entry = format!("{}:{}:{}", workflow_id, node_id, error_msg);
2144 self.letters.lock().await.push(entry);
2145 Ok(())
2146 }
2147 }
2148
2149 #[tokio::test]
2150 async fn retry_then_dlq_retries_and_succeeds_before_exhaustion() {
2151 let remaining = Arc::new(tokio::sync::Mutex::new(2u32));
2153 let trans = FailNThenSucceed { remaining };
2154
2155 let dlq_sink = MockDlqSink {
2156 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
2157 };
2158
2159 let mut bus = Bus::new();
2160 bus.insert(Timeline::new());
2161
2162 let axon = Axon::<i32, i32, String>::start("RetrySucceed")
2163 .then(trans)
2164 .with_dlq_policy(DlqPolicy::RetryThenDlq {
2165 max_attempts: 5,
2166 backoff_ms: 1,
2167 })
2168 .with_dlq_sink(dlq_sink.clone());
2169 let outcome = axon.execute(10, &(), &mut bus).await;
2170
2171 assert!(
2173 matches!(outcome, Outcome::Next(11)),
2174 "Expected Next(11), got {:?}",
2175 outcome
2176 );
2177
2178 let letters = dlq_sink.letters.lock().await;
2180 assert!(
2181 letters.is_empty(),
2182 "Should have 0 dead letters, got {}",
2183 letters.len()
2184 );
2185
2186 let timeline = bus.read::<Timeline>().unwrap();
2188 let retry_count = timeline
2189 .events
2190 .iter()
2191 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
2192 .count();
2193 assert_eq!(retry_count, 2, "Should have 2 retry events");
2194 }
2195
2196 #[tokio::test]
2197 async fn retry_then_dlq_exhausts_retries_and_sends_to_dlq() {
2198 let mut bus = Bus::new();
2200 bus.insert(Timeline::new());
2201
2202 let dlq_sink = MockDlqSink {
2203 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
2204 };
2205
2206 let axon = Axon::<i32, i32, String>::start("RetryExhaust")
2207 .then(AlwaysFault)
2208 .with_dlq_policy(DlqPolicy::RetryThenDlq {
2209 max_attempts: 3,
2210 backoff_ms: 1,
2211 })
2212 .with_dlq_sink(dlq_sink.clone());
2213 let outcome = axon.execute(42, &(), &mut bus).await;
2214
2215 assert!(
2216 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
2217 "Expected Fault(boom), got {:?}",
2218 outcome
2219 );
2220
2221 let letters = dlq_sink.letters.lock().await;
2223 assert_eq!(letters.len(), 1, "Should have 1 dead letter");
2224
2225 let timeline = bus.read::<Timeline>().unwrap();
2227 let retry_count = timeline
2228 .events
2229 .iter()
2230 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
2231 .count();
2232 let dlq_count = timeline
2233 .events
2234 .iter()
2235 .filter(|e| matches!(e, TimelineEvent::DlqExhausted { .. }))
2236 .count();
2237 assert_eq!(
2238 retry_count, 2,
2239 "Should have 2 retry events (attempts 2 and 3)"
2240 );
2241 assert_eq!(dlq_count, 1, "Should have 1 DlqExhausted event");
2242 }
2243
2244 #[tokio::test]
2245 async fn send_to_dlq_policy_sends_immediately_without_retry() {
2246 let mut bus = Bus::new();
2247 bus.insert(Timeline::new());
2248
2249 let dlq_sink = MockDlqSink {
2250 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
2251 };
2252
2253 let axon = Axon::<i32, i32, String>::start("SendDlq")
2254 .then(AlwaysFault)
2255 .with_dlq_policy(DlqPolicy::SendToDlq)
2256 .with_dlq_sink(dlq_sink.clone());
2257 let outcome = axon.execute(1, &(), &mut bus).await;
2258
2259 assert!(matches!(outcome, Outcome::Fault(_)));
2260
2261 let letters = dlq_sink.letters.lock().await;
2263 assert_eq!(letters.len(), 1);
2264
2265 let timeline = bus.read::<Timeline>().unwrap();
2267 let retry_count = timeline
2268 .events
2269 .iter()
2270 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
2271 .count();
2272 assert_eq!(retry_count, 0);
2273 }
2274
2275 #[tokio::test]
2276 async fn drop_policy_does_not_send_to_dlq() {
2277 let mut bus = Bus::new();
2278
2279 let dlq_sink = MockDlqSink {
2280 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
2281 };
2282
2283 let axon = Axon::<i32, i32, String>::start("DropDlq")
2284 .then(AlwaysFault)
2285 .with_dlq_policy(DlqPolicy::Drop)
2286 .with_dlq_sink(dlq_sink.clone());
2287 let outcome = axon.execute(1, &(), &mut bus).await;
2288
2289 assert!(matches!(outcome, Outcome::Fault(_)));
2290
2291 let letters = dlq_sink.letters.lock().await;
2293 assert!(letters.is_empty());
2294 }
2295
2296 #[tokio::test]
2297 async fn dynamic_policy_hot_reload_changes_dlq_behavior() {
2298 use ranvier_core::policy::DynamicPolicy;
2299
2300 let (tx, dynamic) = DynamicPolicy::new(DlqPolicy::Drop);
2302 let dlq_sink = MockDlqSink {
2303 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
2304 };
2305
2306 let axon = Axon::<i32, i32, String>::start("DynamicDlq")
2307 .then(AlwaysFault)
2308 .with_dynamic_dlq_policy(dynamic)
2309 .with_dlq_sink(dlq_sink.clone());
2310
2311 let mut bus = Bus::new();
2313 let outcome = axon.execute(1, &(), &mut bus).await;
2314 assert!(matches!(outcome, Outcome::Fault(_)));
2315 assert!(
2316 dlq_sink.letters.lock().await.is_empty(),
2317 "Drop policy should produce no DLQ entries"
2318 );
2319
2320 tx.send(DlqPolicy::SendToDlq).unwrap();
2322
2323 let mut bus2 = Bus::new();
2325 let outcome2 = axon.execute(2, &(), &mut bus2).await;
2326 assert!(matches!(outcome2, Outcome::Fault(_)));
2327 assert_eq!(
2328 dlq_sink.letters.lock().await.len(),
2329 1,
2330 "SendToDlq policy should produce 1 DLQ entry"
2331 );
2332 }
2333
2334 #[tokio::test]
2335 async fn dynamic_saga_policy_hot_reload() {
2336 use ranvier_core::policy::DynamicPolicy;
2337 use ranvier_core::saga::SagaPolicy;
2338
2339 let (tx, dynamic) = DynamicPolicy::new(SagaPolicy::Disabled);
2341
2342 let axon = Axon::<i32, i32, TestInfallible>::start("DynamicSaga")
2343 .then(AddOne)
2344 .with_dynamic_saga_policy(dynamic);
2345
2346 let mut bus = Bus::new();
2348 let _outcome = axon.execute(1, &(), &mut bus).await;
2349 assert!(
2350 bus.read::<SagaStack>().is_none() || bus.read::<SagaStack>().unwrap().is_empty(),
2351 "SagaStack should be absent or empty when disabled"
2352 );
2353
2354 tx.send(SagaPolicy::Enabled).unwrap();
2356
2357 let mut bus2 = Bus::new();
2359 let _outcome2 = axon.execute(10, &(), &mut bus2).await;
2360 assert!(
2361 bus2.read::<SagaStack>().is_some(),
2362 "SagaStack should exist when saga is enabled"
2363 );
2364 }
2365
2366 mod iam_tests {
2369 use super::*;
2370 use ranvier_core::iam::{IamError, IamIdentity, IamPolicy, IamToken, IamVerifier};
2371
2372 #[derive(Clone)]
2374 struct MockVerifier {
2375 identity: IamIdentity,
2376 should_fail: bool,
2377 }
2378
2379 #[async_trait]
2380 impl IamVerifier for MockVerifier {
2381 async fn verify(&self, _token: &str) -> Result<IamIdentity, IamError> {
2382 if self.should_fail {
2383 Err(IamError::InvalidToken("mock verification failure".into()))
2384 } else {
2385 Ok(self.identity.clone())
2386 }
2387 }
2388 }
2389
2390 #[tokio::test]
2391 async fn iam_require_identity_passes_with_valid_token() {
2392 let verifier = MockVerifier {
2393 identity: IamIdentity::new("alice").with_role("user"),
2394 should_fail: false,
2395 };
2396
2397 let axon = Axon::<i32, i32, TestInfallible>::new("IamTest")
2398 .with_iam(IamPolicy::RequireIdentity, verifier)
2399 .then(AddOne);
2400
2401 let mut bus = Bus::new();
2402 bus.insert(IamToken("valid-token".to_string()));
2403 let outcome = axon.execute(10, &(), &mut bus).await;
2404
2405 assert!(matches!(outcome, Outcome::Next(11)));
2406 let identity = bus
2408 .read::<IamIdentity>()
2409 .expect("IamIdentity should be in Bus");
2410 assert_eq!(identity.subject, "alice");
2411 }
2412
2413 #[tokio::test]
2414 async fn iam_require_identity_rejects_missing_token() {
2415 let verifier = MockVerifier {
2416 identity: IamIdentity::new("ignored"),
2417 should_fail: false,
2418 };
2419
2420 let axon = Axon::<i32, i32, TestInfallible>::new("IamNoToken")
2421 .with_iam(IamPolicy::RequireIdentity, verifier)
2422 .then(AddOne);
2423
2424 let mut bus = Bus::new();
2425 let outcome = axon.execute(10, &(), &mut bus).await;
2427
2428 match &outcome {
2430 Outcome::Emit(label, _) => {
2431 assert_eq!(label, "iam.missing_token");
2432 }
2433 other => panic!("Expected Emit(iam.missing_token), got {:?}", other),
2434 }
2435 }
2436
2437 #[tokio::test]
2438 async fn iam_rejects_failed_verification() {
2439 let verifier = MockVerifier {
2440 identity: IamIdentity::new("ignored"),
2441 should_fail: true,
2442 };
2443
2444 let axon = Axon::<i32, i32, TestInfallible>::new("IamBadToken")
2445 .with_iam(IamPolicy::RequireIdentity, verifier)
2446 .then(AddOne);
2447
2448 let mut bus = Bus::new();
2449 bus.insert(IamToken("bad-token".to_string()));
2450 let outcome = axon.execute(10, &(), &mut bus).await;
2451
2452 match &outcome {
2453 Outcome::Emit(label, _) => {
2454 assert_eq!(label, "iam.verification_failed");
2455 }
2456 other => panic!("Expected Emit(iam.verification_failed), got {:?}", other),
2457 }
2458 }
2459
2460 #[tokio::test]
2461 async fn iam_require_role_passes_with_matching_role() {
2462 let verifier = MockVerifier {
2463 identity: IamIdentity::new("bob").with_role("admin").with_role("user"),
2464 should_fail: false,
2465 };
2466
2467 let axon = Axon::<i32, i32, TestInfallible>::new("IamRole")
2468 .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
2469 .then(AddOne);
2470
2471 let mut bus = Bus::new();
2472 bus.insert(IamToken("token".to_string()));
2473 let outcome = axon.execute(5, &(), &mut bus).await;
2474
2475 assert!(matches!(outcome, Outcome::Next(6)));
2476 }
2477
2478 #[tokio::test]
2479 async fn iam_require_role_denies_without_role() {
2480 let verifier = MockVerifier {
2481 identity: IamIdentity::new("carol").with_role("user"),
2482 should_fail: false,
2483 };
2484
2485 let axon = Axon::<i32, i32, TestInfallible>::new("IamRoleDeny")
2486 .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
2487 .then(AddOne);
2488
2489 let mut bus = Bus::new();
2490 bus.insert(IamToken("token".to_string()));
2491 let outcome = axon.execute(5, &(), &mut bus).await;
2492
2493 match &outcome {
2494 Outcome::Emit(label, _) => {
2495 assert_eq!(label, "iam.policy_denied");
2496 }
2497 other => panic!("Expected Emit(iam.policy_denied), got {:?}", other),
2498 }
2499 }
2500
2501 #[tokio::test]
2502 async fn iam_policy_none_skips_verification() {
2503 let verifier = MockVerifier {
2504 identity: IamIdentity::new("ignored"),
2505 should_fail: true, };
2507
2508 let axon = Axon::<i32, i32, TestInfallible>::new("IamNone")
2509 .with_iam(IamPolicy::None, verifier)
2510 .then(AddOne);
2511
2512 let mut bus = Bus::new();
2513 let outcome = axon.execute(10, &(), &mut bus).await;
2515
2516 assert!(matches!(outcome, Outcome::Next(11)));
2517 }
2518 }
2519
2520 #[derive(Clone)]
2523 struct SchemaTransition;
2524
2525 #[async_trait]
2526 impl Transition<String, String> for SchemaTransition {
2527 type Error = String;
2528 type Resources = ();
2529
2530 fn input_schema(&self) -> Option<serde_json::Value> {
2531 Some(serde_json::json!({
2532 "type": "object",
2533 "required": ["name"],
2534 "properties": {
2535 "name": { "type": "string" }
2536 }
2537 }))
2538 }
2539
2540 async fn run(
2541 &self,
2542 state: String,
2543 _resources: &Self::Resources,
2544 _bus: &mut Bus,
2545 ) -> Outcome<String, Self::Error> {
2546 Outcome::Next(state)
2547 }
2548 }
2549
2550 #[test]
2551 fn then_auto_populates_input_schema_from_transition() {
2552 let axon = Axon::<String, String, String>::new("SchemaTest").then(SchemaTransition);
2553
2554 let last_node = axon.schematic.nodes.last().unwrap();
2556 assert!(last_node.input_schema.is_some());
2557 let schema = last_node.input_schema.as_ref().unwrap();
2558 assert_eq!(schema["type"], "object");
2559 assert_eq!(schema["required"][0], "name");
2560 }
2561
2562 #[test]
2563 fn then_leaves_input_schema_none_when_not_provided() {
2564 let axon = Axon::<i32, i32, TestInfallible>::new("NoSchema").then(AddOne);
2565
2566 let last_node = axon.schematic.nodes.last().unwrap();
2567 assert!(last_node.input_schema.is_none());
2568 }
2569
2570 #[test]
2571 fn with_input_schema_value_sets_on_last_node() {
2572 let schema = serde_json::json!({"type": "integer"});
2573 let axon = Axon::<i32, i32, TestInfallible>::new("ManualSchema")
2574 .then(AddOne)
2575 .with_input_schema_value(schema.clone());
2576
2577 let last_node = axon.schematic.nodes.last().unwrap();
2578 assert_eq!(last_node.input_schema.as_ref().unwrap(), &schema);
2579 }
2580
2581 #[test]
2582 fn with_output_schema_value_sets_on_last_node() {
2583 let schema = serde_json::json!({"type": "integer"});
2584 let axon = Axon::<i32, i32, TestInfallible>::new("OutputSchema")
2585 .then(AddOne)
2586 .with_output_schema_value(schema.clone());
2587
2588 let last_node = axon.schematic.nodes.last().unwrap();
2589 assert_eq!(last_node.output_schema.as_ref().unwrap(), &schema);
2590 }
2591
2592 #[test]
2593 fn schematic_export_includes_schema_fields() {
2594 let axon = Axon::<String, String, String>::new("ExportTest")
2595 .then(SchemaTransition)
2596 .with_output_schema_value(serde_json::json!({"type": "string"}));
2597
2598 let json = serde_json::to_value(&axon.schematic).unwrap();
2599 let nodes = json["nodes"].as_array().unwrap();
2600 let last = nodes.last().unwrap();
2602 assert!(last.get("input_schema").is_some());
2603 assert_eq!(last["input_schema"]["type"], "object");
2604 assert_eq!(last["output_schema"]["type"], "string");
2605 }
2606
2607 #[test]
2608 fn schematic_export_omits_schema_fields_when_none() {
2609 let axon = Axon::<i32, i32, TestInfallible>::new("NoSchemaExport").then(AddOne);
2610
2611 let json = serde_json::to_value(&axon.schematic).unwrap();
2612 let nodes = json["nodes"].as_array().unwrap();
2613 let last = nodes.last().unwrap();
2614 let obj = last.as_object().unwrap();
2615 assert!(!obj.contains_key("input_schema"));
2616 assert!(!obj.contains_key("output_schema"));
2617 }
2618
2619 #[test]
2620 fn schematic_json_roundtrip_preserves_schemas() {
2621 let axon = Axon::<String, String, String>::new("Roundtrip")
2622 .then(SchemaTransition)
2623 .with_output_schema_value(serde_json::json!({"type": "string"}));
2624
2625 let json_str = serde_json::to_string(&axon.schematic).unwrap();
2626 let deserialized: ranvier_core::schematic::Schematic =
2627 serde_json::from_str(&json_str).unwrap();
2628
2629 let last = deserialized.nodes.last().unwrap();
2630 assert!(last.input_schema.is_some());
2631 assert!(last.output_schema.is_some());
2632 assert_eq!(last.input_schema.as_ref().unwrap()["required"][0], "name");
2633 assert_eq!(last.output_schema.as_ref().unwrap()["type"], "string");
2634 }
2635
2636 #[derive(Clone)]
2638 struct MultiplyByTwo;
2639
2640 #[async_trait]
2641 impl Transition<i32, i32> for MultiplyByTwo {
2642 type Error = TestInfallible;
2643 type Resources = ();
2644
2645 async fn run(
2646 &self,
2647 state: i32,
2648 _resources: &Self::Resources,
2649 _bus: &mut Bus,
2650 ) -> Outcome<i32, Self::Error> {
2651 Outcome::Next(state * 2)
2652 }
2653 }
2654
2655 #[derive(Clone)]
2656 struct AddTen;
2657
2658 #[async_trait]
2659 impl Transition<i32, i32> for AddTen {
2660 type Error = TestInfallible;
2661 type Resources = ();
2662
2663 async fn run(
2664 &self,
2665 state: i32,
2666 _resources: &Self::Resources,
2667 _bus: &mut Bus,
2668 ) -> Outcome<i32, Self::Error> {
2669 Outcome::Next(state + 10)
2670 }
2671 }
2672
2673 #[derive(Clone)]
2674 struct AddOneString;
2675
2676 #[async_trait]
2677 impl Transition<i32, i32> for AddOneString {
2678 type Error = String;
2679 type Resources = ();
2680
2681 async fn run(
2682 &self,
2683 state: i32,
2684 _resources: &Self::Resources,
2685 _bus: &mut Bus,
2686 ) -> Outcome<i32, Self::Error> {
2687 Outcome::Next(state + 1)
2688 }
2689 }
2690
2691 #[derive(Clone)]
2692 struct AddTenString;
2693
2694 #[async_trait]
2695 impl Transition<i32, i32> for AddTenString {
2696 type Error = String;
2697 type Resources = ();
2698
2699 async fn run(
2700 &self,
2701 state: i32,
2702 _resources: &Self::Resources,
2703 _bus: &mut Bus,
2704 ) -> Outcome<i32, Self::Error> {
2705 Outcome::Next(state + 10)
2706 }
2707 }
2708
2709 #[tokio::test]
2710 async fn axon_single_step_chain_executes_and_returns_next() {
2711 let mut bus = Bus::new();
2712 let axon = Axon::<i32, i32, TestInfallible>::start("SingleStep").then(AddOne);
2713
2714 let outcome = axon.execute(5, &(), &mut bus).await;
2715 assert!(matches!(outcome, Outcome::Next(6)));
2716 }
2717
2718 #[tokio::test]
2719 async fn axon_three_step_chain_executes_in_order() {
2720 let mut bus = Bus::new();
2721 let axon = Axon::<i32, i32, TestInfallible>::start("ThreeStep")
2722 .then(AddOne)
2723 .then(MultiplyByTwo)
2724 .then(AddTen);
2725
2726 let outcome = axon.execute(5, &(), &mut bus).await;
2728 assert!(matches!(outcome, Outcome::Next(22)));
2729 }
2730
2731 #[tokio::test]
2732 async fn axon_with_fault_in_middle_step_propagates_error() {
2733 let mut bus = Bus::new();
2734
2735 let axon = Axon::<i32, i32, String>::start("FaultInMiddle")
2740 .then(AddOneString)
2741 .then(AlwaysFault)
2742 .then(AddTenString);
2743
2744 let outcome = axon.execute(5, &(), &mut bus).await;
2745 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
2746 }
2747
2748 #[tokio::test]
2749 async fn fault_injects_transition_error_context_into_bus() {
2750 let mut bus = Bus::new();
2751
2752 let axon = Axon::<i32, i32, String>::start("my-pipeline")
2754 .then(AddOneString)
2755 .then(AlwaysFault)
2756 .then(AddTenString);
2757
2758 let outcome = axon.execute(5, &(), &mut bus).await;
2759 assert!(matches!(outcome, Outcome::Fault(_)));
2760
2761 let ctx = bus
2762 .read::<ranvier_core::error::TransitionErrorContext>()
2763 .expect("TransitionErrorContext should be in Bus after fault");
2764 assert_eq!(ctx.pipeline_name, "my-pipeline");
2765 assert_eq!(ctx.transition_name, "AlwaysFault");
2766 assert_eq!(ctx.step_index, 2); }
2768
2769 #[test]
2770 fn axon_schematic_has_correct_node_count_after_chaining() {
2771 let axon = Axon::<i32, i32, TestInfallible>::start("NodeCount")
2772 .then(AddOne)
2773 .then(MultiplyByTwo)
2774 .then(AddTen);
2775
2776 assert_eq!(axon.schematic.nodes.len(), 4);
2778 assert_eq!(axon.schematic.name, "NodeCount");
2779 }
2780
2781 #[tokio::test]
2782 async fn axon_execution_records_timeline_events() {
2783 let mut bus = Bus::new();
2784 bus.insert(Timeline::new());
2785
2786 let axon = Axon::<i32, i32, TestInfallible>::start("TimelineTest")
2787 .then(AddOne)
2788 .then(MultiplyByTwo);
2789
2790 let outcome = axon.execute(3, &(), &mut bus).await;
2791 assert!(matches!(outcome, Outcome::Next(8))); let timeline = bus.read::<Timeline>().unwrap();
2794
2795 let enter_count = timeline
2797 .events
2798 .iter()
2799 .filter(|e| matches!(e, TimelineEvent::NodeEnter { .. }))
2800 .count();
2801 let exit_count = timeline
2802 .events
2803 .iter()
2804 .filter(|e| matches!(e, TimelineEvent::NodeExit { .. }))
2805 .count();
2806
2807 assert!(enter_count >= 1, "Should have at least 1 NodeEnter event");
2809 assert!(exit_count >= 1, "Should have at least 1 NodeExit event");
2810 }
2811
2812 #[tokio::test]
2815 async fn parallel_all_succeed_returns_first_next() {
2816 use super::ParallelStrategy;
2817
2818 let mut bus = Bus::new();
2819 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelAllSucceed")
2820 .parallel(
2821 vec![
2822 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
2823 Arc::new(MultiplyByTwo),
2824 ],
2825 ParallelStrategy::AllMustSucceed,
2826 );
2827
2828 let outcome = axon.execute(5, &(), &mut bus).await;
2831 assert!(matches!(outcome, Outcome::Next(6)));
2832 }
2833
2834 #[tokio::test]
2835 async fn parallel_all_must_succeed_returns_fault_when_any_fails() {
2836 use super::ParallelStrategy;
2837
2838 let mut bus = Bus::new();
2839 let axon = Axon::<i32, i32, String>::start("ParallelAllFault")
2840 .parallel(
2841 vec![
2842 Arc::new(AddOneString) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
2843 Arc::new(AlwaysFault),
2844 ],
2845 ParallelStrategy::AllMustSucceed,
2846 );
2847
2848 let outcome = axon.execute(5, &(), &mut bus).await;
2849 assert!(
2850 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
2851 "Expected Fault(boom), got {:?}",
2852 outcome
2853 );
2854 }
2855
2856 #[tokio::test]
2857 async fn parallel_any_can_fail_returns_success_despite_fault() {
2858 use super::ParallelStrategy;
2859
2860 let mut bus = Bus::new();
2861 let axon = Axon::<i32, i32, String>::start("ParallelAnyCanFail")
2862 .parallel(
2863 vec![
2864 Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
2865 Arc::new(AddOneString),
2866 ],
2867 ParallelStrategy::AnyCanFail,
2868 );
2869
2870 let outcome = axon.execute(5, &(), &mut bus).await;
2872 assert!(
2873 matches!(outcome, Outcome::Next(6)),
2874 "Expected Next(6), got {:?}",
2875 outcome
2876 );
2877 }
2878
2879 #[tokio::test]
2880 async fn parallel_any_can_fail_all_fault_returns_first_fault() {
2881 use super::ParallelStrategy;
2882
2883 #[derive(Clone)]
2884 struct AlwaysFault2;
2885 #[async_trait]
2886 impl Transition<i32, i32> for AlwaysFault2 {
2887 type Error = String;
2888 type Resources = ();
2889 async fn run(&self, _state: i32, _resources: &(), _bus: &mut Bus) -> Outcome<i32, String> {
2890 Outcome::Fault("boom2".to_string())
2891 }
2892 }
2893
2894 let mut bus = Bus::new();
2895 let axon = Axon::<i32, i32, String>::start("ParallelAllFault2")
2896 .parallel(
2897 vec![
2898 Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
2899 Arc::new(AlwaysFault2),
2900 ],
2901 ParallelStrategy::AnyCanFail,
2902 );
2903
2904 let outcome = axon.execute(5, &(), &mut bus).await;
2905 assert!(
2907 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
2908 "Expected Fault(boom), got {:?}",
2909 outcome
2910 );
2911 }
2912
2913 #[test]
2914 fn parallel_schematic_has_fanout_fanin_nodes() {
2915 use super::ParallelStrategy;
2916 use ranvier_core::schematic::{EdgeType, NodeKind};
2917
2918 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelSchematic")
2919 .parallel(
2920 vec![
2921 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
2922 Arc::new(MultiplyByTwo),
2923 Arc::new(AddTen),
2924 ],
2925 ParallelStrategy::AllMustSucceed,
2926 );
2927
2928 assert_eq!(axon.schematic.nodes.len(), 6);
2930 assert!(matches!(axon.schematic.nodes[1].kind, NodeKind::FanOut));
2931 assert!(matches!(axon.schematic.nodes[2].kind, NodeKind::Atom));
2932 assert!(matches!(axon.schematic.nodes[3].kind, NodeKind::Atom));
2933 assert!(matches!(axon.schematic.nodes[4].kind, NodeKind::Atom));
2934 assert!(matches!(axon.schematic.nodes[5].kind, NodeKind::FanIn));
2935
2936 assert!(axon.schematic.nodes[1]
2938 .description
2939 .as_ref()
2940 .unwrap()
2941 .contains("3 branches"));
2942
2943 let parallel_edges: Vec<_> = axon
2945 .schematic
2946 .edges
2947 .iter()
2948 .filter(|e| matches!(e.kind, EdgeType::Parallel))
2949 .collect();
2950 assert_eq!(parallel_edges.len(), 6);
2952 }
2953
2954 #[tokio::test]
2955 async fn parallel_then_chain_composes_correctly() {
2956 use super::ParallelStrategy;
2957
2958 let mut bus = Bus::new();
2959 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelThenChain")
2960 .then(AddOne)
2961 .parallel(
2962 vec![
2963 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
2964 Arc::new(MultiplyByTwo),
2965 ],
2966 ParallelStrategy::AllMustSucceed,
2967 )
2968 .then(AddTen);
2969
2970 let outcome = axon.execute(5, &(), &mut bus).await;
2972 assert!(
2973 matches!(outcome, Outcome::Next(17)),
2974 "Expected Next(17), got {:?}",
2975 outcome
2976 );
2977 }
2978
2979 #[tokio::test]
2980 async fn parallel_records_timeline_events() {
2981 use super::ParallelStrategy;
2982 use ranvier_core::timeline::TimelineEvent;
2983
2984 let mut bus = Bus::new();
2985 bus.insert(Timeline::new());
2986
2987 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelTimeline")
2988 .parallel(
2989 vec![
2990 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
2991 Arc::new(MultiplyByTwo),
2992 ],
2993 ParallelStrategy::AllMustSucceed,
2994 );
2995
2996 let outcome = axon.execute(3, &(), &mut bus).await;
2997 assert!(matches!(outcome, Outcome::Next(4)));
2998
2999 let timeline = bus.read::<Timeline>().unwrap();
3000
3001 let fanout_enters = timeline
3003 .events
3004 .iter()
3005 .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanOut"))
3006 .count();
3007 let fanin_enters = timeline
3008 .events
3009 .iter()
3010 .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanIn"))
3011 .count();
3012
3013 assert_eq!(fanout_enters, 1, "Should have 1 FanOut enter");
3014 assert_eq!(fanin_enters, 1, "Should have 1 FanIn enter");
3015 }
3016
3017 #[derive(Clone)]
3020 struct Greet;
3021
3022 #[async_trait]
3023 impl Transition<(), String> for Greet {
3024 type Error = String;
3025 type Resources = ();
3026
3027 async fn run(
3028 &self,
3029 _state: (),
3030 _resources: &Self::Resources,
3031 _bus: &mut Bus,
3032 ) -> Outcome<String, Self::Error> {
3033 Outcome::Next("Hello from simple!".to_string())
3034 }
3035 }
3036
3037 #[tokio::test]
3038 async fn axon_simple_creates_pipeline() {
3039 let axon = Axon::simple::<String>("SimpleTest").then(Greet);
3040
3041 let mut bus = Bus::new();
3042 let result = axon.execute((), &(), &mut bus).await;
3043
3044 match result {
3045 Outcome::Next(msg) => assert_eq!(msg, "Hello from simple!"),
3046 other => panic!("Expected Outcome::Next, got {:?}", other),
3047 }
3048 }
3049
3050 #[tokio::test]
3051 async fn axon_simple_equivalent_to_explicit() {
3052 let simple = Axon::simple::<String>("Equiv").then(Greet);
3054 let explicit = Axon::<(), (), String>::new("Equiv").then(Greet);
3055
3056 let mut bus1 = Bus::new();
3057 let mut bus2 = Bus::new();
3058
3059 let r1 = simple.execute((), &(), &mut bus1).await;
3060 let r2 = explicit.execute((), &(), &mut bus2).await;
3061
3062 match (r1, r2) {
3063 (Outcome::Next(a), Outcome::Next(b)) => assert_eq!(a, b),
3064 _ => panic!("Both should produce Outcome::Next"),
3065 }
3066 }
3067
3068 #[tokio::test]
3069 async fn then_fn_closure_transition() {
3070 let axon = Axon::simple::<String>("ClosureTest")
3071 .then_fn("to_greeting", |_input: (), _bus: &mut Bus| {
3072 Outcome::next("hello from closure".to_string())
3073 });
3074
3075 let mut bus = Bus::new();
3076 let result = axon.execute((), &(), &mut bus).await;
3077
3078 match result {
3079 Outcome::Next(msg) => assert_eq!(msg, "hello from closure"),
3080 other => panic!("Expected Outcome::Next, got {:?}", other),
3081 }
3082 }
3083
3084 #[tokio::test]
3085 async fn then_fn_reads_bus() {
3086 let axon = Axon::simple::<String>("BusReadClosure")
3087 .then_fn("check_score", |_input: (), bus: &mut Bus| {
3088 let score = bus.read::<u32>().copied().unwrap_or(0);
3089 if score > 75 {
3090 Outcome::next("REJECTED".to_string())
3091 } else {
3092 Outcome::next("APPROVED".to_string())
3093 }
3094 });
3095
3096 let mut bus = Bus::new();
3097 bus.insert(80u32);
3098 let result = axon.execute((), &(), &mut bus).await;
3099 match result {
3100 Outcome::Next(msg) => assert_eq!(msg, "REJECTED"),
3101 other => panic!("Expected REJECTED, got {:?}", other),
3102 }
3103 }
3104
3105 #[tokio::test]
3106 async fn then_fn_mixed_with_transition() {
3107 let axon = Axon::simple::<String>("MixedPipeline")
3109 .then(Greet)
3110 .then_fn("uppercase", |input: String, _bus: &mut Bus| {
3111 Outcome::next(input.to_uppercase())
3112 });
3113
3114 let mut bus = Bus::new();
3115 let result = axon.execute((), &(), &mut bus).await;
3116 match result {
3117 Outcome::Next(msg) => assert_eq!(msg, "HELLO FROM SIMPLE!"),
3118 other => panic!("Expected uppercase greeting, got {:?}", other),
3119 }
3120 }
3121
3122 #[tokio::test]
3123 async fn then_fn_schematic_label() {
3124 let axon = Axon::simple::<String>("SchematicTest")
3125 .then_fn("my_custom_label", |_: (), _: &mut Bus| {
3126 Outcome::next("ok".to_string())
3127 });
3128
3129 assert_eq!(axon.schematic.nodes.len(), 2);
3131 assert_eq!(axon.schematic.nodes[1].label, "my_custom_label");
3132 }
3133}