1use crate::persistence::{
15 CompensationAutoTrigger, CompensationContext, CompensationHandle,
16 CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17 PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use async_trait::async_trait;
20use ranvier_audit::{AuditEvent, AuditSink};
21use ranvier_core::bus::Bus;
22use ranvier_core::cluster::DistributedLock;
23use ranvier_core::event::{DlqPolicy, DlqSink};
24use ranvier_core::outcome::Outcome;
25use ranvier_core::policy::DynamicPolicy;
26use ranvier_core::saga::{SagaPolicy, SagaStack};
27use ranvier_core::schematic::{
28 BusCapabilitySchema, Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation,
29};
30use ranvier_core::timeline::{Timeline, TimelineEvent};
31use ranvier_core::transition::Transition;
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use std::any::type_name;
35use std::ffi::OsString;
36use std::fs;
37use std::future::Future;
38use std::panic::Location;
39use std::path::{Path, PathBuf};
40use std::pin::Pin;
41use std::sync::{Arc, Mutex, OnceLock};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tracing::Instrument;
44
45#[derive(Clone)]
47pub enum ExecutionMode {
48 Local,
50 Singleton {
52 lock_key: String,
53 ttl_ms: u64,
54 lock_provider: Arc<dyn DistributedLock>,
55 },
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum ParallelStrategy {
63 AllMustSucceed,
65 AnyCanFail,
68}
69
70pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
72
73pub type Executor<In, Out, E, Res> =
77 Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
78
79#[derive(Debug, Clone)]
81pub struct ManualJump {
82 pub target_node: String,
83 pub payload_override: Option<serde_json::Value>,
84}
85
86#[derive(Debug, Clone, Copy)]
88struct StartStep(u64);
89
90#[derive(Debug, Clone)]
92struct ResumptionState {
93 payload: Option<serde_json::Value>,
94}
95
96fn type_name_of<T: ?Sized>() -> String {
98 let full = type_name::<T>();
99 full.split("::").last().unwrap_or(full).to_string()
100}
101
102pub struct Axon<In, Out, E, Res = ()> {
122 pub schematic: Schematic,
124 executor: Executor<In, Out, E, Res>,
126 pub execution_mode: ExecutionMode,
128 pub persistence_store: Option<Arc<dyn crate::persistence::PersistenceStore>>,
130 pub audit_sink: Option<Arc<dyn AuditSink>>,
132 pub dlq_sink: Option<Arc<dyn DlqSink>>,
134 pub dlq_policy: DlqPolicy,
136 pub dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>,
138 pub saga_policy: SagaPolicy,
140 pub dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>,
142 pub saga_compensation_registry:
144 Arc<std::sync::RwLock<ranvier_core::saga::SagaCompensationRegistry<E, Res>>>,
145 pub iam_handle: Option<ranvier_core::iam::IamHandle>,
147}
148
149#[derive(Debug, Clone)]
151pub struct SchematicExportRequest {
152 pub output: Option<PathBuf>,
154}
155
156impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
157 fn clone(&self) -> Self {
158 Self {
159 schematic: self.schematic.clone(),
160 executor: self.executor.clone(),
161 execution_mode: self.execution_mode.clone(),
162 persistence_store: self.persistence_store.clone(),
163 audit_sink: self.audit_sink.clone(),
164 dlq_sink: self.dlq_sink.clone(),
165 dlq_policy: self.dlq_policy.clone(),
166 dynamic_dlq_policy: self.dynamic_dlq_policy.clone(),
167 saga_policy: self.saga_policy.clone(),
168 dynamic_saga_policy: self.dynamic_saga_policy.clone(),
169 saga_compensation_registry: self.saga_compensation_registry.clone(),
170 iam_handle: self.iam_handle.clone(),
171 }
172 }
173}
174
175impl<In, E, Res> Axon<In, In, E, Res>
176where
177 In: Send + Sync + Serialize + DeserializeOwned + 'static,
178 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
179 Res: ranvier_core::transition::ResourceRequirement,
180{
181 #[track_caller]
184 pub fn new(label: &str) -> Self {
185 let caller = Location::caller();
186 Self::start_with_source(label, caller)
187 }
188
189 #[track_caller]
192 pub fn start(label: &str) -> Self {
193 let caller = Location::caller();
194 Self::start_with_source(label, caller)
195 }
196
197 fn start_with_source(label: &str, caller: &'static Location<'static>) -> Self {
198 let node_id = uuid::Uuid::new_v4().to_string();
199 let node = Node {
200 id: node_id,
201 kind: NodeKind::Ingress,
202 label: label.to_string(),
203 description: None,
204 input_type: "void".to_string(),
205 output_type: type_name_of::<In>(),
206 resource_type: type_name_of::<Res>(),
207 metadata: Default::default(),
208 bus_capability: None,
209 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
210 position: None,
211 compensation_node_id: None,
212 input_schema: None,
213 output_schema: None,
214 };
215
216 let mut schematic = Schematic::new(label);
217 schematic.nodes.push(node);
218
219 let executor: Executor<In, In, E, Res> =
220 Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
221
222 Self {
223 schematic,
224 executor,
225 execution_mode: ExecutionMode::Local,
226 persistence_store: None,
227 audit_sink: None,
228 dlq_sink: None,
229 dlq_policy: DlqPolicy::default(),
230 dynamic_dlq_policy: None,
231 saga_policy: SagaPolicy::default(),
232 dynamic_saga_policy: None,
233 saga_compensation_registry: Arc::new(std::sync::RwLock::new(
234 ranvier_core::saga::SagaCompensationRegistry::new(),
235 )),
236 iam_handle: None,
237 }
238 }
239}
240
241impl Axon<(), (), (), ()> {
242 #[track_caller]
257 pub fn simple<E>(label: &str) -> Axon<(), (), E, ()>
258 where
259 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
260 {
261 let caller = Location::caller();
262 <Axon<(), (), E, ()>>::start_with_source(label, caller)
263 }
264}
265
266impl<In, Out, E, Res> Axon<In, Out, E, Res>
267where
268 In: Send + Sync + Serialize + DeserializeOwned + 'static,
269 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
270 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
271 Res: ranvier_core::transition::ResourceRequirement,
272{
273 pub fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
275 self.execution_mode = mode;
276 self
277 }
278
279 pub fn with_version(mut self, version: impl Into<String>) -> Self {
281 self.schematic.schema_version = version.into();
282 self
283 }
284
285 pub fn with_persistence_store<S>(mut self, store: S) -> Self
287 where
288 S: crate::persistence::PersistenceStore + 'static,
289 {
290 self.persistence_store = Some(Arc::new(store));
291 self
292 }
293
294 pub fn with_audit_sink<S>(mut self, sink: S) -> Self
296 where
297 S: AuditSink + 'static,
298 {
299 self.audit_sink = Some(Arc::new(sink));
300 self
301 }
302
303 pub fn with_dlq_sink<S>(mut self, sink: S) -> Self
305 where
306 S: DlqSink + 'static,
307 {
308 self.dlq_sink = Some(Arc::new(sink));
309 self
310 }
311
312 pub fn with_dlq_policy(mut self, policy: DlqPolicy) -> Self {
314 self.dlq_policy = policy;
315 self
316 }
317
318 pub fn with_saga_policy(mut self, policy: SagaPolicy) -> Self {
320 self.saga_policy = policy;
321 self
322 }
323
324 pub fn with_dynamic_dlq_policy(mut self, dynamic: DynamicPolicy<DlqPolicy>) -> Self {
327 self.dynamic_dlq_policy = Some(dynamic);
328 self
329 }
330
331 pub fn with_dynamic_saga_policy(mut self, dynamic: DynamicPolicy<SagaPolicy>) -> Self {
334 self.dynamic_saga_policy = Some(dynamic);
335 self
336 }
337
338 pub fn with_iam(
346 mut self,
347 policy: ranvier_core::iam::IamPolicy,
348 verifier: impl ranvier_core::iam::IamVerifier + 'static,
349 ) -> Self {
350 self.iam_handle = Some(ranvier_core::iam::IamHandle::new(
351 policy,
352 Arc::new(verifier),
353 ));
354 self
355 }
356
357 #[cfg(feature = "schema")]
368 pub fn with_input_schema<T>(mut self) -> Self
369 where
370 T: schemars::JsonSchema,
371 {
372 if let Some(last_node) = self.schematic.nodes.last_mut() {
373 let schema = schemars::schema_for!(T);
374 last_node.input_schema =
375 Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
376 }
377 self
378 }
379
380 #[cfg(feature = "schema")]
384 pub fn with_output_schema<T>(mut self) -> Self
385 where
386 T: schemars::JsonSchema,
387 {
388 if let Some(last_node) = self.schematic.nodes.last_mut() {
389 let schema = schemars::schema_for!(T);
390 last_node.output_schema =
391 Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
392 }
393 self
394 }
395
396 pub fn with_input_schema_value(mut self, schema: serde_json::Value) -> Self {
400 if let Some(last_node) = self.schematic.nodes.last_mut() {
401 last_node.input_schema = Some(schema);
402 }
403 self
404 }
405
406 pub fn with_output_schema_value(mut self, schema: serde_json::Value) -> Self {
408 if let Some(last_node) = self.schematic.nodes.last_mut() {
409 last_node.output_schema = Some(schema);
410 }
411 self
412 }
413}
414
415#[async_trait]
416impl<In, Out, E, Res> ranvier_inspector::StateInspector for Axon<In, Out, E, Res>
417where
418 In: Send + Sync + Serialize + DeserializeOwned + 'static,
419 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
420 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
421 Res: ranvier_core::transition::ResourceRequirement,
422{
423 async fn get_state(&self, trace_id: &str) -> Option<serde_json::Value> {
424 let store = self.persistence_store.as_ref()?;
425 let trace = store.load(trace_id).await.ok().flatten()?;
426 Some(serde_json::to_value(trace).unwrap_or(serde_json::Value::Null))
427 }
428
429 async fn force_resume(
430 &self,
431 trace_id: &str,
432 target_node: &str,
433 payload_override: Option<Value>,
434 ) -> Result<(), String> {
435 let store = self
436 .persistence_store
437 .as_ref()
438 .ok_or("No persistence store attached to Axon")?;
439
440 let intervention = crate::persistence::Intervention {
441 target_node: target_node.to_string(),
442 payload_override,
443 timestamp_ms: now_ms(),
444 };
445
446 store
447 .save_intervention(trace_id, intervention)
448 .await
449 .map_err(|e| format!("Failed to save intervention: {}", e))?;
450
451 if let Some(sink) = self.audit_sink.as_ref() {
452 let event = AuditEvent::new(
453 uuid::Uuid::new_v4().to_string(),
454 "Inspector".to_string(),
455 "ForceResume".to_string(),
456 trace_id.to_string(),
457 )
458 .with_metadata("target_node", target_node);
459
460 let _ = sink.append(&event).await;
461 }
462
463 tracing::info!(trace_id = %trace_id, target_node = %target_node, "Force resume requested via Inspector");
464 Ok(())
465 }
466}
467
468impl<In, Out, E, Res> Axon<In, Out, E, Res>
469where
470 In: Send + Sync + Serialize + DeserializeOwned + 'static,
471 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
472 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
473 Res: ranvier_core::transition::ResourceRequirement,
474{
475 #[track_caller]
479 pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
480 where
481 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
482 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
483 {
484 let caller = Location::caller();
485 let Axon {
487 mut schematic,
488 executor: prev_executor,
489 execution_mode,
490 persistence_store,
491 audit_sink,
492 dlq_sink,
493 dlq_policy,
494 dynamic_dlq_policy,
495 saga_policy,
496 dynamic_saga_policy,
497 saga_compensation_registry,
498 iam_handle,
499 } = self;
500
501 let next_node_id = uuid::Uuid::new_v4().to_string();
503 let next_node = Node {
504 id: next_node_id.clone(),
505 kind: NodeKind::Atom,
506 label: transition.label(),
507 description: transition.description(),
508 input_type: type_name_of::<Out>(),
509 output_type: type_name_of::<Next>(),
510 resource_type: type_name_of::<Res>(),
511 metadata: Default::default(),
512 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
513 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
514 position: transition
515 .position()
516 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
517 compensation_node_id: None,
518 input_schema: transition.input_schema(),
519 output_schema: None,
520 };
521
522 let last_node_id = schematic
523 .nodes
524 .last()
525 .map(|n| n.id.clone())
526 .unwrap_or_default();
527
528 schematic.nodes.push(next_node);
529 schematic.edges.push(Edge {
530 from: last_node_id,
531 to: next_node_id.clone(),
532 kind: EdgeType::Linear,
533 label: Some("Next".to_string()),
534 });
535
536 let node_id_for_exec = next_node_id.clone();
538 let node_label_for_exec = transition.label();
539 let bus_policy_for_exec = transition.bus_access_policy();
540 let bus_policy_clone = bus_policy_for_exec.clone();
541 let current_step_idx = schematic.nodes.len() as u64 - 1;
542 let next_executor: Executor<In, Next, E, Res> = Arc::new(
543 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
544 let prev = prev_executor.clone();
545 let trans = transition.clone();
546 let timeline_node_id = node_id_for_exec.clone();
547 let timeline_node_label = node_label_for_exec.clone();
548 let transition_bus_policy = bus_policy_clone.clone();
549 let step_idx = current_step_idx;
550
551 Box::pin(async move {
552 if let Some(jump) = bus.read::<ManualJump>()
554 && (jump.target_node == timeline_node_id
555 || jump.target_node == timeline_node_label)
556 {
557 tracing::info!(
558 node_id = %timeline_node_id,
559 node_label = %timeline_node_label,
560 "Manual jump target reached; skipping previous steps"
561 );
562
563 let state = if let Some(ow) = jump.payload_override.clone() {
564 match serde_json::from_value::<Out>(ow) {
565 Ok(s) => s,
566 Err(e) => {
567 tracing::error!(
568 "Payload override deserialization failed: {}",
569 e
570 );
571 return Outcome::emit(
572 "execution.jump.payload_error",
573 Some(serde_json::json!({"error": e.to_string()})),
574 )
575 .map(|_: ()| unreachable!());
576 }
577 }
578 } else {
579 return Outcome::emit(
583 "execution.jump.missing_payload",
584 Some(serde_json::json!({"node_id": timeline_node_id})),
585 );
586 };
587
588 return run_this_step::<Out, Next, E, Res>(
590 &trans,
591 state,
592 res,
593 bus,
594 &timeline_node_id,
595 &timeline_node_label,
596 &transition_bus_policy,
597 step_idx,
598 )
599 .await;
600 }
601
602 if let Some(start) = bus.read::<StartStep>()
604 && step_idx == start.0
605 && bus.read::<ResumptionState>().is_some()
606 {
607 let fresh_state = serde_json::to_value(&input)
611 .ok()
612 .and_then(|v| serde_json::from_value::<Out>(v).ok());
613 let persisted_state = bus
614 .read::<ResumptionState>()
615 .and_then(|r| r.payload.clone())
616 .and_then(|p| serde_json::from_value::<Out>(p).ok());
617
618 if let Some(s) = fresh_state.or(persisted_state) {
619 tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint");
620 return run_this_step::<Out, Next, E, Res>(
621 &trans,
622 s,
623 res,
624 bus,
625 &timeline_node_id,
626 &timeline_node_label,
627 &transition_bus_policy,
628 step_idx,
629 )
630 .await;
631 }
632
633 return Outcome::emit(
634 "execution.resumption.payload_error",
635 Some(serde_json::json!({"error": "no compatible resumption state"})),
636 )
637 .map(|_: ()| unreachable!());
638 }
639
640 let prev_result = prev(input, res, bus).await;
642
643 let state = match prev_result {
645 Outcome::Next(t) => t,
646 other => return other.map(|_| unreachable!()),
647 };
648
649 run_this_step::<Out, Next, E, Res>(
650 &trans,
651 state,
652 res,
653 bus,
654 &timeline_node_id,
655 &timeline_node_label,
656 &transition_bus_policy,
657 step_idx,
658 )
659 .await
660 })
661 },
662 );
663 Axon {
664 schematic,
665 executor: next_executor,
666 execution_mode,
667 persistence_store,
668 audit_sink,
669 dlq_sink,
670 dlq_policy,
671 dynamic_dlq_policy,
672 saga_policy,
673 dynamic_saga_policy,
674 saga_compensation_registry,
675 iam_handle,
676 }
677 }
678
679 #[track_caller]
695 pub fn then_with_retry<Next, Trans>(
696 self,
697 transition: Trans,
698 policy: crate::retry::RetryPolicy,
699 ) -> Axon<In, Next, E, Res>
700 where
701 Out: Clone,
702 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
703 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
704 {
705 let caller = Location::caller();
706 let Axon {
707 mut schematic,
708 executor: prev_executor,
709 execution_mode,
710 persistence_store,
711 audit_sink,
712 dlq_sink,
713 dlq_policy,
714 dynamic_dlq_policy,
715 saga_policy,
716 dynamic_saga_policy,
717 saga_compensation_registry,
718 iam_handle,
719 } = self;
720
721 let next_node_id = uuid::Uuid::new_v4().to_string();
722 let next_node = Node {
723 id: next_node_id.clone(),
724 kind: NodeKind::Atom,
725 label: transition.label(),
726 description: transition.description(),
727 input_type: type_name_of::<Out>(),
728 output_type: type_name_of::<Next>(),
729 resource_type: type_name_of::<Res>(),
730 metadata: Default::default(),
731 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
732 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
733 position: transition
734 .position()
735 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
736 compensation_node_id: None,
737 input_schema: transition.input_schema(),
738 output_schema: None,
739 };
740
741 let last_node_id = schematic
742 .nodes
743 .last()
744 .map(|n| n.id.clone())
745 .unwrap_or_default();
746
747 schematic.nodes.push(next_node);
748 schematic.edges.push(Edge {
749 from: last_node_id,
750 to: next_node_id.clone(),
751 kind: EdgeType::Linear,
752 label: Some("Next (retryable)".to_string()),
753 });
754
755 let node_id_for_exec = next_node_id.clone();
756 let node_label_for_exec = transition.label();
757 let bus_policy_for_exec = transition.bus_access_policy();
758 let bus_policy_clone = bus_policy_for_exec.clone();
759 let current_step_idx = schematic.nodes.len() as u64 - 1;
760 let next_executor: Executor<In, Next, E, Res> = Arc::new(
761 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
762 let prev = prev_executor.clone();
763 let trans = transition.clone();
764 let timeline_node_id = node_id_for_exec.clone();
765 let timeline_node_label = node_label_for_exec.clone();
766 let transition_bus_policy = bus_policy_clone.clone();
767 let step_idx = current_step_idx;
768 let retry_policy = policy.clone();
769
770 Box::pin(async move {
771 let prev_result = prev(input, res, bus).await;
773 let state = match prev_result {
774 Outcome::Next(t) => t,
775 other => return other.map(|_| unreachable!()),
776 };
777
778 let mut last_result = None;
780 for attempt in 0..=retry_policy.max_retries {
781 let attempt_state = state.clone();
782
783 let result = run_this_step::<Out, Next, E, Res>(
784 &trans,
785 attempt_state,
786 res,
787 bus,
788 &timeline_node_id,
789 &timeline_node_label,
790 &transition_bus_policy,
791 step_idx,
792 )
793 .await;
794
795 match &result {
796 Outcome::Next(_) => return result,
797 Outcome::Fault(_) if attempt < retry_policy.max_retries => {
798 let delay = retry_policy.delay_for_attempt(attempt);
799 tracing::warn!(
800 node_id = %timeline_node_id,
801 attempt = attempt + 1,
802 max = retry_policy.max_retries,
803 delay_ms = delay.as_millis() as u64,
804 "Transition failed, retrying"
805 );
806 if let Some(timeline) = bus.read_mut::<Timeline>() {
807 timeline.push(TimelineEvent::NodeRetry {
808 node_id: timeline_node_id.clone(),
809 attempt: attempt + 1,
810 max_attempts: retry_policy.max_retries,
811 backoff_ms: delay.as_millis() as u64,
812 timestamp: now_ms(),
813 });
814 }
815 tokio::time::sleep(delay).await;
816 }
817 _ => {
818 last_result = Some(result);
819 break;
820 }
821 }
822 }
823
824 last_result.unwrap_or_else(|| {
825 Outcome::emit("execution.retry.exhausted", None)
826 })
827 })
828 },
829 );
830 Axon {
831 schematic,
832 executor: next_executor,
833 execution_mode,
834 persistence_store,
835 audit_sink,
836 dlq_sink,
837 dlq_policy,
838 dynamic_dlq_policy,
839 saga_policy,
840 dynamic_saga_policy,
841 saga_compensation_registry,
842 iam_handle,
843 }
844 }
845
846 #[track_caller]
865 pub fn then_with_timeout<Next, Trans, F>(
866 self,
867 transition: Trans,
868 duration: std::time::Duration,
869 make_timeout_error: F,
870 ) -> Axon<In, Next, E, Res>
871 where
872 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
873 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
874 F: Fn() -> E + Clone + Send + Sync + 'static,
875 {
876 let caller = Location::caller();
877 let Axon {
878 mut schematic,
879 executor: prev_executor,
880 execution_mode,
881 persistence_store,
882 audit_sink,
883 dlq_sink,
884 dlq_policy,
885 dynamic_dlq_policy,
886 saga_policy,
887 dynamic_saga_policy,
888 saga_compensation_registry,
889 iam_handle,
890 } = self;
891
892 let next_node_id = uuid::Uuid::new_v4().to_string();
893 let next_node = Node {
894 id: next_node_id.clone(),
895 kind: NodeKind::Atom,
896 label: transition.label(),
897 description: transition.description(),
898 input_type: type_name_of::<Out>(),
899 output_type: type_name_of::<Next>(),
900 resource_type: type_name_of::<Res>(),
901 metadata: Default::default(),
902 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
903 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
904 position: transition
905 .position()
906 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
907 compensation_node_id: None,
908 input_schema: transition.input_schema(),
909 output_schema: None,
910 };
911
912 let last_node_id = schematic
913 .nodes
914 .last()
915 .map(|n| n.id.clone())
916 .unwrap_or_default();
917
918 schematic.nodes.push(next_node);
919 schematic.edges.push(Edge {
920 from: last_node_id,
921 to: next_node_id.clone(),
922 kind: EdgeType::Linear,
923 label: Some("Next (timeout-guarded)".to_string()),
924 });
925
926 let node_id_for_exec = next_node_id.clone();
927 let node_label_for_exec = transition.label();
928 let bus_policy_for_exec = transition.bus_access_policy();
929 let bus_policy_clone = bus_policy_for_exec.clone();
930 let current_step_idx = schematic.nodes.len() as u64 - 1;
931 let next_executor: Executor<In, Next, E, Res> = Arc::new(
932 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
933 let prev = prev_executor.clone();
934 let trans = transition.clone();
935 let timeline_node_id = node_id_for_exec.clone();
936 let timeline_node_label = node_label_for_exec.clone();
937 let transition_bus_policy = bus_policy_clone.clone();
938 let step_idx = current_step_idx;
939 let timeout_duration = duration;
940 let error_factory = make_timeout_error.clone();
941
942 Box::pin(async move {
943 let prev_result = prev(input, res, bus).await;
945 let state = match prev_result {
946 Outcome::Next(t) => t,
947 other => return other.map(|_| unreachable!()),
948 };
949
950 match tokio::time::timeout(
952 timeout_duration,
953 run_this_step::<Out, Next, E, Res>(
954 &trans,
955 state,
956 res,
957 bus,
958 &timeline_node_id,
959 &timeline_node_label,
960 &transition_bus_policy,
961 step_idx,
962 ),
963 )
964 .await
965 {
966 Ok(result) => result,
967 Err(_elapsed) => {
968 tracing::warn!(
969 node_id = %timeline_node_id,
970 timeout_ms = timeout_duration.as_millis() as u64,
971 "Transition timed out"
972 );
973 if let Some(timeline) = bus.read_mut::<Timeline>() {
974 timeline.push(TimelineEvent::NodeTimeout {
975 node_id: timeline_node_id.clone(),
976 timeout_ms: timeout_duration.as_millis() as u64,
977 timestamp: now_ms(),
978 });
979 }
980 Outcome::Fault(error_factory())
981 }
982 }
983 })
984 },
985 );
986 Axon {
987 schematic,
988 executor: next_executor,
989 execution_mode,
990 persistence_store,
991 audit_sink,
992 dlq_sink,
993 dlq_policy,
994 dynamic_dlq_policy,
995 saga_policy,
996 dynamic_saga_policy,
997 saga_compensation_registry,
998 iam_handle,
999 }
1000 }
1001
1002 #[track_caller]
1007 pub fn then_compensated<Next, Trans, Comp>(
1008 self,
1009 transition: Trans,
1010 compensation: Comp,
1011 ) -> Axon<In, Next, E, Res>
1012 where
1013 Out: Clone,
1014 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
1015 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1016 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1017 {
1018 let caller = Location::caller();
1019 let Axon {
1020 mut schematic,
1021 executor: prev_executor,
1022 execution_mode,
1023 persistence_store,
1024 audit_sink,
1025 dlq_sink,
1026 dlq_policy,
1027 dynamic_dlq_policy,
1028 saga_policy,
1029 dynamic_saga_policy,
1030 saga_compensation_registry,
1031 iam_handle,
1032 } = self;
1033
1034 let next_node_id = uuid::Uuid::new_v4().to_string();
1036 let comp_node_id = uuid::Uuid::new_v4().to_string();
1037
1038 let next_node = Node {
1039 id: next_node_id.clone(),
1040 kind: NodeKind::Atom,
1041 label: transition.label(),
1042 description: transition.description(),
1043 input_type: type_name_of::<Out>(),
1044 output_type: type_name_of::<Next>(),
1045 resource_type: type_name_of::<Res>(),
1046 metadata: Default::default(),
1047 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
1048 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1049 position: transition
1050 .position()
1051 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1052 compensation_node_id: Some(comp_node_id.clone()),
1053 input_schema: None,
1054 output_schema: None,
1055 };
1056
1057 let comp_node = Node {
1059 id: comp_node_id.clone(),
1060 kind: NodeKind::Atom,
1061 label: format!("Compensate: {}", compensation.label()),
1062 description: compensation.description(),
1063 input_type: type_name_of::<Out>(),
1064 output_type: "void".to_string(),
1065 resource_type: type_name_of::<Res>(),
1066 metadata: Default::default(),
1067 bus_capability: None,
1068 source_location: None,
1069 position: compensation
1070 .position()
1071 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1072 compensation_node_id: None,
1073 input_schema: None,
1074 output_schema: None,
1075 };
1076
1077 let last_node_id = schematic
1078 .nodes
1079 .last()
1080 .map(|n| n.id.clone())
1081 .unwrap_or_default();
1082
1083 schematic.nodes.push(next_node);
1084 schematic.nodes.push(comp_node);
1085 schematic.edges.push(Edge {
1086 from: last_node_id,
1087 to: next_node_id.clone(),
1088 kind: EdgeType::Linear,
1089 label: Some("Next".to_string()),
1090 });
1091
1092 let node_id_for_exec = next_node_id.clone();
1094 let comp_id_for_exec = comp_node_id.clone();
1095 let node_label_for_exec = transition.label();
1096 let bus_policy_for_exec = transition.bus_access_policy();
1097 let step_idx_for_exec = schematic.nodes.len() as u64 - 2;
1098 let comp_for_exec = compensation.clone();
1099 let bus_policy_for_executor = bus_policy_for_exec.clone();
1100 let bus_policy_for_registry = bus_policy_for_exec.clone();
1101 let next_executor: Executor<In, Next, E, Res> = Arc::new(
1102 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
1103 let prev = prev_executor.clone();
1104 let trans = transition.clone();
1105 let comp = comp_for_exec.clone();
1106 let timeline_node_id = node_id_for_exec.clone();
1107 let timeline_comp_id = comp_id_for_exec.clone();
1108 let timeline_node_label = node_label_for_exec.clone();
1109 let transition_bus_policy = bus_policy_for_executor.clone();
1110 let step_idx = step_idx_for_exec;
1111
1112 Box::pin(async move {
1113 if let Some(jump) = bus.read::<ManualJump>()
1115 && (jump.target_node == timeline_node_id
1116 || jump.target_node == timeline_node_label)
1117 {
1118 tracing::info!(
1119 node_id = %timeline_node_id,
1120 node_label = %timeline_node_label,
1121 "Manual jump target reached (compensated); skipping previous steps"
1122 );
1123
1124 let state = if let Some(ow) = jump.payload_override.clone() {
1125 match serde_json::from_value::<Out>(ow) {
1126 Ok(s) => s,
1127 Err(e) => {
1128 tracing::error!(
1129 "Payload override deserialization failed: {}",
1130 e
1131 );
1132 return Outcome::emit(
1133 "execution.jump.payload_error",
1134 Some(serde_json::json!({"error": e.to_string()})),
1135 )
1136 .map(|_: ()| unreachable!());
1137 }
1138 }
1139 } else {
1140 return Outcome::emit(
1141 "execution.jump.missing_payload",
1142 Some(serde_json::json!({"node_id": timeline_node_id})),
1143 )
1144 .map(|_: ()| unreachable!());
1145 };
1146
1147 return run_this_compensated_step::<Out, Next, E, Res, Comp>(
1149 &trans,
1150 &comp,
1151 state,
1152 res,
1153 bus,
1154 &timeline_node_id,
1155 &timeline_comp_id,
1156 &timeline_node_label,
1157 &transition_bus_policy,
1158 step_idx,
1159 )
1160 .await;
1161 }
1162
1163 if let Some(start) = bus.read::<StartStep>()
1165 && step_idx == start.0
1166 && bus.read::<ResumptionState>().is_some()
1167 {
1168 let fresh_state = serde_json::to_value(&input)
1169 .ok()
1170 .and_then(|v| serde_json::from_value::<Out>(v).ok());
1171 let persisted_state = bus
1172 .read::<ResumptionState>()
1173 .and_then(|r| r.payload.clone())
1174 .and_then(|p| serde_json::from_value::<Out>(p).ok());
1175
1176 if let Some(s) = fresh_state.or(persisted_state) {
1177 tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint (compensated)");
1178 return run_this_compensated_step::<Out, Next, E, Res, Comp>(
1179 &trans,
1180 &comp,
1181 s,
1182 res,
1183 bus,
1184 &timeline_node_id,
1185 &timeline_comp_id,
1186 &timeline_node_label,
1187 &transition_bus_policy,
1188 step_idx,
1189 )
1190 .await;
1191 }
1192
1193 return Outcome::emit(
1194 "execution.resumption.payload_error",
1195 Some(serde_json::json!({"error": "no compatible resumption state"})),
1196 )
1197 .map(|_: ()| unreachable!());
1198 }
1199
1200 let prev_result = prev(input, res, bus).await;
1202
1203 let state = match prev_result {
1205 Outcome::Next(t) => t,
1206 other => return other.map(|_| unreachable!()),
1207 };
1208
1209 run_this_compensated_step::<Out, Next, E, Res, Comp>(
1210 &trans,
1211 &comp,
1212 state,
1213 res,
1214 bus,
1215 &timeline_node_id,
1216 &timeline_comp_id,
1217 &timeline_node_label,
1218 &transition_bus_policy,
1219 step_idx,
1220 )
1221 .await
1222 })
1223 },
1224 );
1225 {
1227 let mut registry = saga_compensation_registry.write().expect("saga compensation registry lock poisoned");
1228 let comp_fn = compensation.clone();
1229 let transition_bus_policy = bus_policy_for_registry.clone();
1230
1231 let handler: ranvier_core::saga::SagaCompensationFn<E, Res> =
1232 Arc::new(move |input_data, res, bus| {
1233 let comp = comp_fn.clone();
1234 let bus_policy = transition_bus_policy.clone();
1235 Box::pin(async move {
1236 let input: Out = serde_json::from_slice(&input_data).expect("saga compensation input deserialization failed — type mismatch between snapshot and compensation handler");
1237 bus.set_access_policy(comp.label(), bus_policy);
1238 let res = comp.run(input, res, bus).await;
1239 bus.clear_access_policy();
1240 res
1241 })
1242 });
1243 registry.register(next_node_id.clone(), handler);
1244 }
1245
1246 Axon {
1247 schematic,
1248 executor: next_executor,
1249 execution_mode,
1250 persistence_store,
1251 audit_sink,
1252 dlq_sink,
1253 dlq_policy,
1254 dynamic_dlq_policy,
1255 saga_policy,
1256 dynamic_saga_policy,
1257 saga_compensation_registry,
1258 iam_handle,
1259 }
1260 }
1261
1262 #[track_caller]
1265 pub fn compensate_with<Comp>(mut self, transition: Comp) -> Self
1266 where
1267 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1268 {
1269 let caller = Location::caller();
1272 let comp_node_id = uuid::Uuid::new_v4().to_string();
1273
1274 let comp_node = Node {
1275 id: comp_node_id.clone(),
1276 kind: NodeKind::Atom,
1277 label: transition.label(),
1278 description: transition.description(),
1279 input_type: type_name_of::<Out>(),
1280 output_type: "void".to_string(),
1281 resource_type: type_name_of::<Res>(),
1282 metadata: Default::default(),
1283 bus_capability: None,
1284 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1285 position: transition
1286 .position()
1287 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1288 compensation_node_id: None,
1289 input_schema: None,
1290 output_schema: None,
1291 };
1292
1293 if let Some(last_node) = self.schematic.nodes.last_mut() {
1294 last_node.compensation_node_id = Some(comp_node_id.clone());
1295 }
1296
1297 self.schematic.nodes.push(comp_node);
1298 self
1299 }
1300
1301 #[track_caller]
1303 pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
1304 let caller = Location::caller();
1305 let branch_id_str = branch_id.into();
1306 let last_node_id = self
1307 .schematic
1308 .nodes
1309 .last()
1310 .map(|n| n.id.clone())
1311 .unwrap_or_default();
1312
1313 let branch_node = Node {
1314 id: uuid::Uuid::new_v4().to_string(),
1315 kind: NodeKind::Synapse,
1316 label: label.to_string(),
1317 description: None,
1318 input_type: type_name_of::<Out>(),
1319 output_type: type_name_of::<Out>(),
1320 resource_type: type_name_of::<Res>(),
1321 metadata: Default::default(),
1322 bus_capability: None,
1323 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1324 position: None,
1325 compensation_node_id: None,
1326 input_schema: None,
1327 output_schema: None,
1328 };
1329
1330 self.schematic.nodes.push(branch_node);
1331 self.schematic.edges.push(Edge {
1332 from: last_node_id,
1333 to: branch_id_str.clone(),
1334 kind: EdgeType::Branch(branch_id_str),
1335 label: Some("Branch".to_string()),
1336 });
1337
1338 self
1339 }
1340
1341 #[track_caller]
1378 pub fn parallel(
1379 self,
1380 transitions: Vec<Arc<dyn Transition<Out, Out, Resources = Res, Error = E> + Send + Sync>>,
1381 strategy: ParallelStrategy,
1382 ) -> Axon<In, Out, E, Res>
1383 where
1384 Out: Clone,
1385 {
1386 let caller = Location::caller();
1387 let Axon {
1388 mut schematic,
1389 executor: prev_executor,
1390 execution_mode,
1391 persistence_store,
1392 audit_sink,
1393 dlq_sink,
1394 dlq_policy,
1395 dynamic_dlq_policy,
1396 saga_policy,
1397 dynamic_saga_policy,
1398 saga_compensation_registry,
1399 iam_handle,
1400 } = self;
1401
1402 let fanout_id = uuid::Uuid::new_v4().to_string();
1404 let fanin_id = uuid::Uuid::new_v4().to_string();
1405
1406 let last_node_id = schematic
1407 .nodes
1408 .last()
1409 .map(|n| n.id.clone())
1410 .unwrap_or_default();
1411
1412 let fanout_node = Node {
1413 id: fanout_id.clone(),
1414 kind: NodeKind::FanOut,
1415 label: "FanOut".to_string(),
1416 description: Some(format!(
1417 "Parallel split ({} branches, {:?})",
1418 transitions.len(),
1419 strategy
1420 )),
1421 input_type: type_name_of::<Out>(),
1422 output_type: type_name_of::<Out>(),
1423 resource_type: type_name_of::<Res>(),
1424 metadata: Default::default(),
1425 bus_capability: None,
1426 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1427 position: None,
1428 compensation_node_id: None,
1429 input_schema: None,
1430 output_schema: None,
1431 };
1432
1433 schematic.nodes.push(fanout_node);
1434 schematic.edges.push(Edge {
1435 from: last_node_id,
1436 to: fanout_id.clone(),
1437 kind: EdgeType::Linear,
1438 label: Some("Next".to_string()),
1439 });
1440
1441 let mut branch_node_ids = Vec::with_capacity(transitions.len());
1443 for (i, trans) in transitions.iter().enumerate() {
1444 let branch_id = uuid::Uuid::new_v4().to_string();
1445 let branch_node = Node {
1446 id: branch_id.clone(),
1447 kind: NodeKind::Atom,
1448 label: trans.label(),
1449 description: trans.description(),
1450 input_type: type_name_of::<Out>(),
1451 output_type: type_name_of::<Out>(),
1452 resource_type: type_name_of::<Res>(),
1453 metadata: Default::default(),
1454 bus_capability: bus_capability_schema_from_policy(trans.bus_access_policy()),
1455 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1456 position: None,
1457 compensation_node_id: None,
1458 input_schema: trans.input_schema(),
1459 output_schema: None,
1460 };
1461 schematic.nodes.push(branch_node);
1462 schematic.edges.push(Edge {
1463 from: fanout_id.clone(),
1464 to: branch_id.clone(),
1465 kind: EdgeType::Parallel,
1466 label: Some(format!("Branch {}", i)),
1467 });
1468 branch_node_ids.push(branch_id);
1469 }
1470
1471 let fanin_node = Node {
1473 id: fanin_id.clone(),
1474 kind: NodeKind::FanIn,
1475 label: "FanIn".to_string(),
1476 description: Some(format!("Parallel join ({:?})", strategy)),
1477 input_type: type_name_of::<Out>(),
1478 output_type: type_name_of::<Out>(),
1479 resource_type: type_name_of::<Res>(),
1480 metadata: Default::default(),
1481 bus_capability: None,
1482 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1483 position: None,
1484 compensation_node_id: None,
1485 input_schema: None,
1486 output_schema: None,
1487 };
1488
1489 schematic.nodes.push(fanin_node);
1490 for branch_id in &branch_node_ids {
1491 schematic.edges.push(Edge {
1492 from: branch_id.clone(),
1493 to: fanin_id.clone(),
1494 kind: EdgeType::Parallel,
1495 label: Some("Join".to_string()),
1496 });
1497 }
1498
1499 let fanout_node_id = fanout_id.clone();
1501 let fanin_node_id = fanin_id.clone();
1502 let branch_ids_for_exec = branch_node_ids.clone();
1503 let step_idx = schematic.nodes.len() as u64 - 1;
1504
1505 let next_executor: Executor<In, Out, E, Res> = Arc::new(
1506 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Out, E>> {
1507 let prev = prev_executor.clone();
1508 let branches = transitions.clone();
1509 let fanout_id = fanout_node_id.clone();
1510 let fanin_id = fanin_node_id.clone();
1511 let branch_ids = branch_ids_for_exec.clone();
1512
1513 Box::pin(async move {
1514 let prev_result = prev(input, res, bus).await;
1516 let state = match prev_result {
1517 Outcome::Next(t) => t,
1518 other => return other.map(|_| unreachable!()),
1519 };
1520
1521 let fanout_enter_ts = now_ms();
1523 if let Some(timeline) = bus.read_mut::<Timeline>() {
1524 timeline.push(TimelineEvent::NodeEnter {
1525 node_id: fanout_id.clone(),
1526 node_label: "FanOut".to_string(),
1527 timestamp: fanout_enter_ts,
1528 });
1529 }
1530
1531 let futs: Vec<_> = branches
1535 .iter()
1536 .enumerate()
1537 .map(|(i, trans)| {
1538 let branch_state = state.clone();
1539 let branch_node_id = branch_ids[i].clone();
1540 let trans = trans.clone();
1541
1542 async move {
1543 let mut branch_bus = Bus::new();
1544 let label = trans.label();
1545 let bus_policy = trans.bus_access_policy();
1546
1547 branch_bus.set_access_policy(label.clone(), bus_policy);
1548 let result = trans.run(branch_state, res, &mut branch_bus).await;
1549 branch_bus.clear_access_policy();
1550
1551 (i, branch_node_id, label, result)
1552 }
1553 })
1554 .collect();
1555
1556 let results: Vec<(usize, String, String, Outcome<Out, E>)> =
1558 futures_util::future::join_all(futs).await;
1559
1560 for (_, branch_node_id, branch_label, outcome) in &results {
1562 if let Some(timeline) = bus.read_mut::<Timeline>() {
1563 let ts = now_ms();
1564 timeline.push(TimelineEvent::NodeEnter {
1565 node_id: branch_node_id.clone(),
1566 node_label: branch_label.clone(),
1567 timestamp: ts,
1568 });
1569 timeline.push(TimelineEvent::NodeExit {
1570 node_id: branch_node_id.clone(),
1571 outcome_type: outcome_type_name(outcome),
1572 duration_ms: 0,
1573 timestamp: ts,
1574 });
1575 }
1576 }
1577
1578 if let Some(timeline) = bus.read_mut::<Timeline>() {
1580 timeline.push(TimelineEvent::NodeExit {
1581 node_id: fanout_id.clone(),
1582 outcome_type: "Next".to_string(),
1583 duration_ms: 0,
1584 timestamp: now_ms(),
1585 });
1586 }
1587
1588 let combined = match strategy {
1590 ParallelStrategy::AllMustSucceed => {
1591 let mut first_fault = None;
1592 let mut first_success = None;
1593
1594 for (_, _, _, outcome) in results {
1595 match outcome {
1596 Outcome::Fault(e) => {
1597 if first_fault.is_none() {
1598 first_fault = Some(Outcome::Fault(e));
1599 }
1600 }
1601 Outcome::Next(val) => {
1602 if first_success.is_none() {
1603 first_success = Some(Outcome::Next(val));
1604 }
1605 }
1606 other => {
1607 if first_fault.is_none() {
1610 first_fault = Some(other);
1611 }
1612 }
1613 }
1614 }
1615
1616 if let Some(fault) = first_fault {
1617 fault
1618 } else {
1619 first_success.unwrap_or_else(|| {
1620 Outcome::emit("execution.parallel.no_results", None)
1621 })
1622 }
1623 }
1624 ParallelStrategy::AnyCanFail => {
1625 let mut first_success = None;
1626 let mut first_fault = None;
1627
1628 for (_, _, _, outcome) in results {
1629 match outcome {
1630 Outcome::Next(val) => {
1631 if first_success.is_none() {
1632 first_success = Some(Outcome::Next(val));
1633 }
1634 }
1635 Outcome::Fault(e) => {
1636 if first_fault.is_none() {
1637 first_fault = Some(Outcome::Fault(e));
1638 }
1639 }
1640 _ => {}
1641 }
1642 }
1643
1644 first_success.unwrap_or_else(|| {
1645 first_fault.unwrap_or_else(|| {
1646 Outcome::emit("execution.parallel.no_results", None)
1647 })
1648 })
1649 }
1650 };
1651
1652 let fanin_enter_ts = now_ms();
1654 if let Some(timeline) = bus.read_mut::<Timeline>() {
1655 timeline.push(TimelineEvent::NodeEnter {
1656 node_id: fanin_id.clone(),
1657 node_label: "FanIn".to_string(),
1658 timestamp: fanin_enter_ts,
1659 });
1660 timeline.push(TimelineEvent::NodeExit {
1661 node_id: fanin_id.clone(),
1662 outcome_type: outcome_type_name(&combined),
1663 duration_ms: 0,
1664 timestamp: fanin_enter_ts,
1665 });
1666 }
1667
1668 if let Some(handle) = bus.read::<PersistenceHandle>() {
1670 let trace_id = persistence_trace_id(bus);
1671 let circuit = bus
1672 .read::<ranvier_core::schematic::Schematic>()
1673 .map(|s| s.name.clone())
1674 .unwrap_or_default();
1675 let version = bus
1676 .read::<ranvier_core::schematic::Schematic>()
1677 .map(|s| s.schema_version.clone())
1678 .unwrap_or_default();
1679
1680 persist_execution_event(
1681 handle,
1682 &trace_id,
1683 &circuit,
1684 &version,
1685 step_idx,
1686 Some(fanin_id.clone()),
1687 outcome_kind_name(&combined),
1688 Some(combined.to_json_value()),
1689 )
1690 .await;
1691 }
1692
1693 combined
1694 })
1695 },
1696 );
1697
1698 Axon {
1699 schematic,
1700 executor: next_executor,
1701 execution_mode,
1702 persistence_store,
1703 audit_sink,
1704 dlq_sink,
1705 dlq_policy,
1706 dynamic_dlq_policy,
1707 saga_policy,
1708 dynamic_saga_policy,
1709 saga_compensation_registry,
1710 iam_handle,
1711 }
1712 }
1713
1714 pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
1716 if let ExecutionMode::Singleton {
1717 lock_key,
1718 ttl_ms,
1719 lock_provider,
1720 } = &self.execution_mode
1721 {
1722 let trace_span = tracing::info_span!("Singleton Execution", key = %lock_key);
1723 let _enter = trace_span.enter();
1724 match lock_provider.try_acquire(lock_key, *ttl_ms).await {
1725 Ok(true) => {
1726 tracing::debug!("Successfully acquired singleton lock: {}", lock_key);
1727 }
1728 Ok(false) => {
1729 tracing::debug!(
1730 "Singleton lock {} already held, aborting execution.",
1731 lock_key
1732 );
1733 return Outcome::emit(
1735 "execution.skipped.lock_held",
1736 Some(serde_json::json!({
1737 "lock_key": lock_key
1738 })),
1739 );
1740 }
1741 Err(e) => {
1742 tracing::error!("Failed to check singleton lock {}: {:?}", lock_key, e);
1743 return Outcome::emit(
1744 "execution.skipped.lock_error",
1745 Some(serde_json::json!({
1746 "error": e.to_string()
1747 })),
1748 );
1749 }
1750 }
1751 }
1752
1753 if let Some(iam) = &self.iam_handle {
1755 use ranvier_core::iam::{IamPolicy, IamToken, enforce_policy};
1756
1757 if matches!(iam.policy, IamPolicy::None) {
1758 } else {
1760 let token = bus.read::<IamToken>().map(|t| t.0.clone());
1761
1762 match token {
1763 Some(raw_token) => {
1764 match iam.verifier.verify(&raw_token).await {
1765 Ok(identity) => {
1766 if let Err(e) = enforce_policy(&iam.policy, &identity) {
1767 tracing::warn!(
1768 policy = ?iam.policy,
1769 subject = %identity.subject,
1770 "IAM policy enforcement failed: {}",
1771 e
1772 );
1773 return Outcome::emit(
1774 "iam.policy_denied",
1775 Some(serde_json::json!({
1776 "error": e.to_string(),
1777 "subject": identity.subject,
1778 })),
1779 );
1780 }
1781 bus.insert(identity);
1783 }
1784 Err(e) => {
1785 tracing::warn!("IAM token verification failed: {}", e);
1786 return Outcome::emit(
1787 "iam.verification_failed",
1788 Some(serde_json::json!({
1789 "error": e.to_string()
1790 })),
1791 );
1792 }
1793 }
1794 }
1795 None => {
1796 tracing::warn!("IAM policy requires token but none found in Bus");
1797 return Outcome::emit("iam.missing_token", None);
1798 }
1799 }
1800 }
1801 }
1802
1803 let trace_id = persistence_trace_id(bus);
1804 let label = self.schematic.name.clone();
1805
1806 if let Some(sink) = &self.dlq_sink {
1808 bus.insert(sink.clone());
1809 }
1810 let effective_dlq_policy = self
1812 .dynamic_dlq_policy
1813 .as_ref()
1814 .map(|d| d.current())
1815 .unwrap_or_else(|| self.dlq_policy.clone());
1816 bus.insert(effective_dlq_policy);
1817 bus.insert(self.schematic.clone());
1818 let effective_saga_policy = self
1819 .dynamic_saga_policy
1820 .as_ref()
1821 .map(|d| d.current())
1822 .unwrap_or_else(|| self.saga_policy.clone());
1823 bus.insert(effective_saga_policy.clone());
1824
1825 if effective_saga_policy == SagaPolicy::Enabled && bus.read::<SagaStack>().is_none() {
1827 bus.insert(SagaStack::new());
1828 }
1829
1830 let persistence_handle = bus.read::<PersistenceHandle>().cloned();
1831 let compensation_handle = bus.read::<CompensationHandle>().cloned();
1832 let compensation_retry_policy = compensation_retry_policy(bus);
1833 let compensation_idempotency = bus.read::<CompensationIdempotencyHandle>().cloned();
1834 let version = self.schematic.schema_version.clone();
1835 let migration_registry = bus
1836 .read::<ranvier_core::schematic::MigrationRegistry>()
1837 .cloned();
1838
1839 let persistence_start_step = if let Some(handle) = persistence_handle.as_ref() {
1840 let (mut start_step, trace_version, intervention, last_node_id, mut last_payload) =
1841 load_persistence_version(handle, &trace_id).await;
1842
1843 if let Some(interv) = intervention {
1844 tracing::info!(
1845 trace_id = %trace_id,
1846 target_node = %interv.target_node,
1847 "Applying manual intervention command"
1848 );
1849
1850 if let Some(target_idx) = self
1852 .schematic
1853 .nodes
1854 .iter()
1855 .position(|n| n.id == interv.target_node || n.label == interv.target_node)
1856 {
1857 tracing::info!(
1858 trace_id = %trace_id,
1859 target_node = %interv.target_node,
1860 target_step = target_idx,
1861 "Intervention: Jumping to target node"
1862 );
1863 start_step = target_idx as u64;
1864
1865 bus.insert(ManualJump {
1867 target_node: interv.target_node.clone(),
1868 payload_override: interv.payload_override.clone(),
1869 });
1870
1871 if let Some(sink) = self.audit_sink.as_ref() {
1873 let event = AuditEvent::new(
1874 uuid::Uuid::new_v4().to_string(),
1875 "System".to_string(),
1876 "ApplyIntervention".to_string(),
1877 trace_id.to_string(),
1878 )
1879 .with_metadata("target_node", interv.target_node.clone())
1880 .with_metadata("target_step", target_idx);
1881
1882 let _ = sink.append(&event).await;
1883 }
1884 } else {
1885 tracing::warn!(
1886 trace_id = %trace_id,
1887 target_node = %interv.target_node,
1888 "Intervention target node not found in schematic; ignoring jump"
1889 );
1890 }
1891 }
1892
1893 if let Some(old_version) = trace_version
1894 && old_version != version
1895 {
1896 tracing::info!(
1897 trace_id = %trace_id,
1898 old_version = %old_version,
1899 current_version = %version,
1900 "Version mismatch detected during resumption"
1901 );
1902
1903 let migration_path = migration_registry
1905 .as_ref()
1906 .and_then(|r| r.find_migration_path(&old_version, &version));
1907
1908 let (final_migration, mapped_payload) = if let Some(path) = migration_path {
1909 if path.is_empty() {
1910 (None, last_payload.clone())
1911 } else {
1912 let mut payload = last_payload.clone();
1914 for hop in &path {
1915 if let (Some(mapper), Some(p)) = (&hop.payload_mapper, payload.as_ref())
1916 {
1917 match mapper.map_state(p) {
1918 Ok(mapped) => payload = Some(mapped),
1919 Err(e) => {
1920 tracing::error!(
1921 trace_id = %trace_id,
1922 from = %hop.from_version,
1923 to = %hop.to_version,
1924 error = %e,
1925 "Payload migration mapper failed"
1926 );
1927 return Outcome::emit(
1928 "execution.resumption.payload_migration_failed",
1929 Some(serde_json::json!({
1930 "trace_id": trace_id,
1931 "from": hop.from_version,
1932 "to": hop.to_version,
1933 "error": e.to_string()
1934 })),
1935 );
1936 }
1937 }
1938 }
1939 }
1940 let hops: Vec<String> = path
1941 .iter()
1942 .map(|h| format!("{}->{}", h.from_version, h.to_version))
1943 .collect();
1944 tracing::info!(trace_id = %trace_id, hops = ?hops, "Applied multi-hop migration path");
1945 (path.last().copied(), payload)
1946 }
1947 } else {
1948 (None, last_payload.clone())
1949 };
1950
1951 let migration = final_migration.or_else(|| {
1953 migration_registry
1954 .as_ref()
1955 .and_then(|r| r.find_migration(&old_version, &version))
1956 });
1957
1958 if mapped_payload.is_some() {
1960 last_payload = mapped_payload;
1961 }
1962
1963 let strategy = if let (Some(m), Some(node_id)) = (migration, last_node_id.as_ref())
1964 {
1965 m.node_mapping
1966 .get(node_id)
1967 .cloned()
1968 .unwrap_or(m.default_strategy.clone())
1969 } else {
1970 migration
1971 .map(|m| m.default_strategy.clone())
1972 .unwrap_or(ranvier_core::schematic::MigrationStrategy::Fail)
1973 };
1974
1975 match strategy {
1976 ranvier_core::schematic::MigrationStrategy::ResumeFromStart => {
1977 tracing::info!(trace_id = %trace_id, "Applying ResumeFromStart migration strategy");
1978 start_step = 0;
1979 }
1980 ranvier_core::schematic::MigrationStrategy::MigrateActiveNode {
1981 new_node_id,
1982 ..
1983 } => {
1984 tracing::info!(trace_id = %trace_id, to_node = %new_node_id, "Applying MigrateActiveNode strategy");
1985 if let Some(target_idx) = self
1986 .schematic
1987 .nodes
1988 .iter()
1989 .position(|n| n.id == new_node_id || n.label == new_node_id)
1990 {
1991 start_step = target_idx as u64;
1992 } else {
1993 tracing::warn!(trace_id = %trace_id, "MigrateActiveNode: target node {} not found", new_node_id);
1994 return Outcome::emit(
1995 "execution.resumption.migration_target_not_found",
1996 Some(serde_json::json!({ "node_id": new_node_id })),
1997 );
1998 }
1999 }
2000 ranvier_core::schematic::MigrationStrategy::FallbackToNode(node_id) => {
2001 tracing::info!(trace_id = %trace_id, to_node = %node_id, "Applying FallbackToNode strategy");
2002 if let Some(target_idx) = self
2003 .schematic
2004 .nodes
2005 .iter()
2006 .position(|n| n.id == node_id || n.label == node_id)
2007 {
2008 start_step = target_idx as u64;
2009 } else {
2010 tracing::warn!(trace_id = %trace_id, "FallbackToNode: node {} not found", node_id);
2011 return Outcome::emit(
2012 "execution.resumption.migration_target_not_found",
2013 Some(serde_json::json!({ "node_id": node_id })),
2014 );
2015 }
2016 }
2017 ranvier_core::schematic::MigrationStrategy::Fail => {
2018 tracing::error!(trace_id = %trace_id, "Version mismatch: no migration path found. Failing resumption.");
2019 return Outcome::emit(
2020 "execution.resumption.version_mismatch_failed",
2021 Some(serde_json::json!({
2022 "trace_id": trace_id,
2023 "old_version": old_version,
2024 "current_version": version
2025 })),
2026 );
2027 }
2028 _ => {
2029 tracing::error!(trace_id = %trace_id, "Unsupported migration strategy: {:?}", strategy);
2030 return Outcome::emit(
2031 "execution.resumption.unsupported_migration",
2032 Some(serde_json::json!({
2033 "trace_id": trace_id,
2034 "strategy": format!("{:?}", strategy)
2035 })),
2036 );
2037 }
2038 }
2039 }
2040
2041 let ingress_node_id = self.schematic.nodes.first().map(|n| n.id.clone());
2042 persist_execution_event(
2043 handle,
2044 &trace_id,
2045 &label,
2046 &version,
2047 start_step,
2048 ingress_node_id,
2049 "Enter",
2050 None,
2051 )
2052 .await;
2053
2054 bus.insert(StartStep(start_step));
2055 if start_step > 0 {
2056 bus.insert(ResumptionState {
2057 payload: last_payload,
2058 });
2059 }
2060
2061 Some(start_step)
2062 } else {
2063 None
2064 };
2065
2066 let should_capture = should_attach_timeline(bus);
2067 let inserted_timeline = if should_capture {
2068 ensure_timeline(bus)
2069 } else {
2070 false
2071 };
2072 let ingress_started = std::time::Instant::now();
2073 let ingress_enter_ts = now_ms();
2074 if should_capture
2075 && let (Some(timeline), Some(ingress)) =
2076 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
2077 {
2078 timeline.push(TimelineEvent::NodeEnter {
2079 node_id: ingress.id.clone(),
2080 node_label: ingress.label.clone(),
2081 timestamp: ingress_enter_ts,
2082 });
2083 }
2084
2085 let circuit_span = tracing::info_span!(
2086 "Circuit",
2087 ranvier.circuit = %label,
2088 ranvier.outcome_kind = tracing::field::Empty,
2089 ranvier.outcome_target = tracing::field::Empty
2090 );
2091 let outcome = (self.executor)(input, resources, bus)
2092 .instrument(circuit_span.clone())
2093 .await;
2094 circuit_span.record("ranvier.outcome_kind", outcome_kind_name(&outcome));
2095 if let Some(target) = outcome_target(&outcome) {
2096 circuit_span.record("ranvier.outcome_target", tracing::field::display(&target));
2097 }
2098
2099 if matches!(outcome, Outcome::Fault(_)) && self.saga_policy == SagaPolicy::Enabled {
2101 while let Some(task) = {
2102 let mut stack = bus.read_mut::<SagaStack>();
2103 stack.as_mut().and_then(|s| s.pop())
2104 } {
2105 tracing::info!(trace_id = %trace_id, node_id = %task.node_id, "Compensating step: {}", task.node_label);
2106
2107 let handler = {
2108 let registry = self.saga_compensation_registry.read().expect("saga compensation registry lock poisoned");
2109 registry.get(&task.node_id)
2110 };
2111 if let Some(handler) = handler {
2112 let res = handler(task.input_snapshot, resources, bus).await;
2113 if let Outcome::Fault(e) = res {
2114 tracing::error!(trace_id = %trace_id, node_id = %task.node_id, "Saga compensation FAILED: {:?}", e);
2115 }
2116 } else {
2117 tracing::warn!(trace_id = %trace_id, node_id = %task.node_id, "No compensation handler found in registry for saga rollback");
2118 }
2119 }
2120 tracing::info!(trace_id = %trace_id, "Saga automated rollback completed");
2121 }
2122
2123 let ingress_exit_ts = now_ms();
2124 if should_capture
2125 && let (Some(timeline), Some(ingress)) =
2126 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
2127 {
2128 timeline.push(TimelineEvent::NodeExit {
2129 node_id: ingress.id.clone(),
2130 outcome_type: outcome_type_name(&outcome),
2131 duration_ms: ingress_started.elapsed().as_millis() as u64,
2132 timestamp: ingress_exit_ts,
2133 });
2134 }
2135
2136 if let Some(handle) = persistence_handle.as_ref() {
2137 let fault_step = persistence_start_step.map(|s| s + 1).unwrap_or(1);
2138 persist_execution_event(
2139 handle,
2140 &trace_id,
2141 &label,
2142 &version,
2143 fault_step,
2144 None, outcome_kind_name(&outcome),
2146 Some(outcome.to_json_value()),
2147 )
2148 .await;
2149
2150 let mut completion = completion_from_outcome(&outcome);
2151 if matches!(outcome, Outcome::Fault(_))
2152 && let Some(compensation) = compensation_handle.as_ref()
2153 && compensation_auto_trigger(bus)
2154 {
2155 let context = CompensationContext {
2156 trace_id: trace_id.clone(),
2157 circuit: label.clone(),
2158 fault_kind: outcome_kind_name(&outcome).to_string(),
2159 fault_step,
2160 timestamp_ms: now_ms(),
2161 };
2162
2163 if run_compensation(
2164 compensation,
2165 context,
2166 compensation_retry_policy,
2167 compensation_idempotency.clone(),
2168 )
2169 .await
2170 {
2171 persist_execution_event(
2172 handle,
2173 &trace_id,
2174 &label,
2175 &version,
2176 fault_step.saturating_add(1),
2177 None,
2178 "Compensated",
2179 None,
2180 )
2181 .await;
2182 completion = CompletionState::Compensated;
2183 }
2184 }
2185
2186 if persistence_auto_complete(bus) {
2187 persist_completion(handle, &trace_id, completion).await;
2188 }
2189 }
2190
2191 if should_capture {
2192 maybe_export_timeline(bus, &outcome);
2193 }
2194 if inserted_timeline {
2195 let _ = bus.remove::<Timeline>();
2196 }
2197
2198 outcome
2199 }
2200
2201 pub fn serve_inspector(self, port: u16) -> Self {
2204 if !inspector_dev_mode_from_env() {
2205 tracing::info!("Inspector disabled because RANVIER_MODE is production");
2206 return self;
2207 }
2208 if !inspector_enabled_from_env() {
2209 tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
2210 return self;
2211 }
2212
2213 let schematic = self.schematic.clone();
2214 let axon_inspector = Arc::new(self.clone());
2215 tokio::spawn(async move {
2216 if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
2217 .with_projection_files_from_env()
2218 .with_mode_from_env()
2219 .with_auth_policy_from_env()
2220 .with_state_inspector(axon_inspector)
2221 .serve()
2222 .await
2223 {
2224 tracing::error!("Inspector server failed: {}", e);
2225 }
2226 });
2227 self
2228 }
2229
2230 pub fn schematic(&self) -> &Schematic {
2232 &self.schematic
2233 }
2234
2235 pub fn into_schematic(self) -> Schematic {
2237 self.schematic
2238 }
2239
2240 pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
2251 schematic_export_request_from_process()
2252 }
2253
2254 pub fn maybe_export_and_exit(&self) -> anyhow::Result<bool> {
2266 self.maybe_export_and_exit_with(|_| ())
2267 }
2268
2269 pub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> anyhow::Result<bool>
2274 where
2275 F: FnOnce(&SchematicExportRequest),
2276 {
2277 let Some(request) = self.schematic_export_request() else {
2278 return Ok(false);
2279 };
2280 on_before_exit(&request);
2281 self.export_schematic(&request)?;
2282 Ok(true)
2283 }
2284
2285 pub fn export_schematic(&self, request: &SchematicExportRequest) -> anyhow::Result<()> {
2287 let json = serde_json::to_string_pretty(self.schematic())?;
2288 if let Some(path) = &request.output {
2289 if let Some(parent) = path.parent()
2290 && !parent.as_os_str().is_empty()
2291 {
2292 fs::create_dir_all(parent)?;
2293 }
2294 fs::write(path, json.as_bytes())?;
2295 return Ok(());
2296 }
2297 println!("{}", json);
2298 Ok(())
2299 }
2300}
2301
2302fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
2303 let args: Vec<OsString> = std::env::args_os().skip(1).collect();
2304 let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
2305 let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
2306
2307 let mut i = 0;
2308 while i < args.len() {
2309 let arg = args[i].to_string_lossy();
2310
2311 if arg == "--schematic" {
2312 enabled = true;
2313 i += 1;
2314 continue;
2315 }
2316
2317 if arg == "--schematic-output" || arg == "--output" {
2318 if let Some(next) = args.get(i + 1) {
2319 output = Some(PathBuf::from(next));
2320 i += 2;
2321 continue;
2322 }
2323 } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
2324 output = Some(PathBuf::from(value));
2325 } else if let Some(value) = arg.strip_prefix("--output=") {
2326 output = Some(PathBuf::from(value));
2327 }
2328
2329 i += 1;
2330 }
2331
2332 if enabled {
2333 Some(SchematicExportRequest { output })
2334 } else {
2335 None
2336 }
2337}
2338
2339fn env_flag_is_true(key: &str) -> bool {
2340 match std::env::var(key) {
2341 Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
2342 Err(_) => false,
2343 }
2344}
2345
2346fn inspector_enabled_from_env() -> bool {
2347 let raw = std::env::var("RANVIER_INSPECTOR").ok();
2348 inspector_enabled_from_value(raw.as_deref())
2349}
2350
2351fn inspector_enabled_from_value(value: Option<&str>) -> bool {
2352 match value {
2353 Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
2354 None => true,
2355 }
2356}
2357
2358fn inspector_dev_mode_from_env() -> bool {
2359 let raw = std::env::var("RANVIER_MODE").ok();
2360 inspector_dev_mode_from_value(raw.as_deref())
2361}
2362
2363fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
2364 !matches!(
2365 value.map(|v| v.to_ascii_lowercase()),
2366 Some(mode) if mode == "prod" || mode == "production"
2367 )
2368}
2369
2370fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
2371 let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
2372 Ok(v) if !v.trim().is_empty() => v,
2373 _ => return,
2374 };
2375
2376 let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
2377 let policy = timeline_adaptive_policy();
2378 let forced = should_force_export(outcome, &policy);
2379 let should_export = sampled || forced;
2380 if !should_export {
2381 record_sampling_stats(false, sampled, forced, "none", &policy);
2382 return;
2383 }
2384
2385 let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
2386 timeline.sort();
2387
2388 let mode = std::env::var("RANVIER_TIMELINE_MODE")
2389 .unwrap_or_else(|_| "overwrite".to_string())
2390 .to_ascii_lowercase();
2391
2392 if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
2393 tracing::warn!(
2394 "Failed to persist timeline file {} (mode={}): {}",
2395 path,
2396 mode,
2397 err
2398 );
2399 record_sampling_stats(false, sampled, forced, &mode, &policy);
2400 } else {
2401 record_sampling_stats(true, sampled, forced, &mode, &policy);
2402 }
2403}
2404
2405fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
2406 match outcome {
2407 Outcome::Next(_) => "Next".to_string(),
2408 Outcome::Branch(id, _) => format!("Branch:{}", id),
2409 Outcome::Jump(id, _) => format!("Jump:{}", id),
2410 Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
2411 Outcome::Fault(_) => "Fault".to_string(),
2412 }
2413}
2414
2415fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
2416 match outcome {
2417 Outcome::Next(_) => "Next",
2418 Outcome::Branch(_, _) => "Branch",
2419 Outcome::Jump(_, _) => "Jump",
2420 Outcome::Emit(_, _) => "Emit",
2421 Outcome::Fault(_) => "Fault",
2422 }
2423}
2424
2425fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
2426 match outcome {
2427 Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
2428 Outcome::Jump(node_id, _) => Some(node_id.to_string()),
2429 Outcome::Emit(event_type, _) => Some(event_type.clone()),
2430 Outcome::Next(_) | Outcome::Fault(_) => None,
2431 }
2432}
2433
2434fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
2435 match outcome {
2436 Outcome::Fault(_) => CompletionState::Fault,
2437 _ => CompletionState::Success,
2438 }
2439}
2440
2441fn persistence_trace_id(bus: &Bus) -> String {
2442 if let Some(explicit) = bus.read::<PersistenceTraceId>() {
2443 explicit.0.clone()
2444 } else {
2445 format!("{}:{}", bus.id, now_ms())
2446 }
2447}
2448
2449fn persistence_auto_complete(bus: &Bus) -> bool {
2450 bus.read::<PersistenceAutoComplete>()
2451 .map(|v| v.0)
2452 .unwrap_or(true)
2453}
2454
2455fn compensation_auto_trigger(bus: &Bus) -> bool {
2456 bus.read::<CompensationAutoTrigger>()
2457 .map(|v| v.0)
2458 .unwrap_or(true)
2459}
2460
2461fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
2462 bus.read::<CompensationRetryPolicy>()
2463 .copied()
2464 .unwrap_or_default()
2465}
2466
2467fn unwrap_outcome_payload(payload: Option<&serde_json::Value>) -> Option<serde_json::Value> {
2473 payload.map(|p| {
2474 p.get("Next")
2475 .or_else(|| p.get("Branch"))
2476 .or_else(|| p.get("Jump"))
2477 .cloned()
2478 .unwrap_or_else(|| p.clone())
2479 })
2480}
2481
2482async fn load_persistence_version(
2483 handle: &PersistenceHandle,
2484 trace_id: &str,
2485) -> (
2486 u64,
2487 Option<String>,
2488 Option<crate::persistence::Intervention>,
2489 Option<String>,
2490 Option<serde_json::Value>,
2491) {
2492 let store = handle.store();
2493 match store.load(trace_id).await {
2494 Ok(Some(trace)) => {
2495 let (next_step, last_node_id, last_payload) =
2496 if let Some(resume_from_step) = trace.resumed_from_step {
2497 let anchor_event = trace
2498 .events
2499 .iter()
2500 .rev()
2501 .find(|event| {
2502 event.step <= resume_from_step
2503 && event.outcome_kind == "Next"
2504 && event.payload.is_some()
2505 })
2506 .or_else(|| {
2507 trace.events.iter().rev().find(|event| {
2508 event.step <= resume_from_step
2509 && event.outcome_kind != "Fault"
2510 && event.payload.is_some()
2511 })
2512 })
2513 .or_else(|| {
2514 trace.events.iter().rev().find(|event| {
2515 event.step <= resume_from_step && event.payload.is_some()
2516 })
2517 })
2518 .or_else(|| trace.events.last());
2519
2520 (
2521 resume_from_step.saturating_add(1),
2522 anchor_event.and_then(|event| event.node_id.clone()),
2523 anchor_event.and_then(|event| {
2524 unwrap_outcome_payload(event.payload.as_ref())
2525 }),
2526 )
2527 } else {
2528 let last_event = trace.events.last();
2529 (
2530 last_event
2531 .map(|event| event.step.saturating_add(1))
2532 .unwrap_or(0),
2533 last_event.and_then(|event| event.node_id.clone()),
2534 last_event.and_then(|event| {
2535 unwrap_outcome_payload(event.payload.as_ref())
2536 }),
2537 )
2538 };
2539
2540 (
2541 next_step,
2542 Some(trace.schematic_version),
2543 trace.interventions.last().cloned(),
2544 last_node_id,
2545 last_payload,
2546 )
2547 }
2548 Ok(None) => (0, None, None, None, None),
2549 Err(err) => {
2550 tracing::warn!(
2551 trace_id = %trace_id,
2552 error = %err,
2553 "Failed to load persistence trace; falling back to step=0"
2554 );
2555 (0, None, None, None, None)
2556 }
2557 }
2558}
2559
2560#[allow(clippy::too_many_arguments)]
2561async fn run_this_step<In, Out, E, Res>(
2562 trans: &(impl Transition<In, Out, Resources = Res, Error = E> + Clone + 'static),
2563 state: In,
2564 res: &Res,
2565 bus: &mut Bus,
2566 node_id: &str,
2567 node_label: &str,
2568 bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2569 step_idx: u64,
2570) -> Outcome<Out, E>
2571where
2572 In: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2573 Out: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2574 E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2575 Res: ranvier_core::transition::ResourceRequirement,
2576{
2577 let label = trans.label();
2578 let res_type = std::any::type_name::<Res>()
2579 .split("::")
2580 .last()
2581 .unwrap_or("unknown");
2582
2583 let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2585 debug.should_pause(node_id)
2586 } else {
2587 false
2588 };
2589
2590 if should_pause {
2591 let trace_id = persistence_trace_id(bus);
2592 tracing::event!(
2593 target: "ranvier.debugger",
2594 tracing::Level::INFO,
2595 trace_id = %trace_id,
2596 node_id = %node_id,
2597 "Node paused"
2598 );
2599
2600 if let Some(timeline) = bus.read_mut::<Timeline>() {
2601 timeline.push(TimelineEvent::NodePaused {
2602 node_id: node_id.to_string(),
2603 timestamp: now_ms(),
2604 });
2605 }
2606 if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2607 debug.wait().await;
2608 }
2609 }
2610
2611 let enter_ts = now_ms();
2612 if let Some(timeline) = bus.read_mut::<Timeline>() {
2613 timeline.push(TimelineEvent::NodeEnter {
2614 node_id: node_id.to_string(),
2615 node_label: node_label.to_string(),
2616 timestamp: enter_ts,
2617 });
2618 }
2619
2620 let dlq_retry_config = bus.read::<DlqPolicy>().and_then(|p| {
2622 if let DlqPolicy::RetryThenDlq {
2623 max_attempts,
2624 backoff_ms,
2625 } = p
2626 {
2627 Some((*max_attempts, *backoff_ms))
2628 } else {
2629 None
2630 }
2631 });
2632 let retry_state_snapshot = if dlq_retry_config.is_some() {
2633 serde_json::to_value(&state).ok()
2634 } else {
2635 None
2636 };
2637
2638 let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2640 Some(serde_json::to_vec(&state).unwrap_or_default())
2641 } else {
2642 None
2643 };
2644
2645 let node_span = tracing::info_span!(
2646 "Node",
2647 ranvier.node = %label,
2648 ranvier.resource_type = %res_type,
2649 ranvier.outcome_kind = tracing::field::Empty,
2650 ranvier.outcome_target = tracing::field::Empty
2651 );
2652 let started = std::time::Instant::now();
2653 bus.set_access_policy(label.clone(), bus_policy.clone());
2654 let result = trans
2655 .run(state, res, bus)
2656 .instrument(node_span.clone())
2657 .await;
2658 bus.clear_access_policy();
2659
2660 let result = if let Outcome::Fault(_) = &result {
2663 if let (Some((max_attempts, backoff_ms)), Some(snapshot)) =
2664 (dlq_retry_config, &retry_state_snapshot)
2665 {
2666 let mut final_result = result;
2667 for attempt in 2..=max_attempts {
2669 let delay = backoff_ms.saturating_mul(2u64.saturating_pow(attempt - 2));
2670
2671 tracing::info!(
2672 ranvier.node = %label,
2673 attempt = attempt,
2674 max_attempts = max_attempts,
2675 backoff_ms = delay,
2676 "Retrying faulted node"
2677 );
2678
2679 if let Some(timeline) = bus.read_mut::<Timeline>() {
2680 timeline.push(TimelineEvent::NodeRetry {
2681 node_id: node_id.to_string(),
2682 attempt,
2683 max_attempts,
2684 backoff_ms: delay,
2685 timestamp: now_ms(),
2686 });
2687 }
2688
2689 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
2690
2691 if let Ok(retry_state) = serde_json::from_value::<In>(snapshot.clone()) {
2692 bus.set_access_policy(label.clone(), bus_policy.clone());
2693 let retry_result = trans
2694 .run(retry_state, res, bus)
2695 .instrument(tracing::info_span!(
2696 "NodeRetry",
2697 ranvier.node = %label,
2698 attempt = attempt
2699 ))
2700 .await;
2701 bus.clear_access_policy();
2702
2703 match &retry_result {
2704 Outcome::Fault(_) => {
2705 final_result = retry_result;
2706 }
2707 _ => {
2708 final_result = retry_result;
2709 break;
2710 }
2711 }
2712 }
2713 }
2714 final_result
2715 } else {
2716 result
2717 }
2718 } else {
2719 result
2720 };
2721
2722 node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
2723 if let Some(target) = outcome_target(&result) {
2724 node_span.record("ranvier.outcome_target", tracing::field::display(&target));
2725 }
2726 let duration_ms = started.elapsed().as_millis() as u64;
2727 let exit_ts = now_ms();
2728
2729 if let Some(timeline) = bus.read_mut::<Timeline>() {
2730 timeline.push(TimelineEvent::NodeExit {
2731 node_id: node_id.to_string(),
2732 outcome_type: outcome_type_name(&result),
2733 duration_ms,
2734 timestamp: exit_ts,
2735 });
2736
2737 if let Outcome::Branch(branch_id, _) = &result {
2738 timeline.push(TimelineEvent::Branchtaken {
2739 branch_id: branch_id.clone(),
2740 timestamp: exit_ts,
2741 });
2742 }
2743 }
2744
2745 if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot)
2747 && let Some(stack) = bus.read_mut::<SagaStack>()
2748 {
2749 stack.push(node_id.to_string(), label.clone(), snapshot);
2750 }
2751
2752 if let Some(handle) = bus.read::<PersistenceHandle>() {
2753 let trace_id = persistence_trace_id(bus);
2754 let circuit = bus
2755 .read::<ranvier_core::schematic::Schematic>()
2756 .map(|s| s.name.clone())
2757 .unwrap_or_default();
2758 let version = bus
2759 .read::<ranvier_core::schematic::Schematic>()
2760 .map(|s| s.schema_version.clone())
2761 .unwrap_or_default();
2762
2763 persist_execution_event(
2764 handle,
2765 &trace_id,
2766 &circuit,
2767 &version,
2768 step_idx,
2769 Some(node_id.to_string()),
2770 outcome_kind_name(&result),
2771 Some(result.to_json_value()),
2772 )
2773 .await;
2774 }
2775
2776 if let Outcome::Fault(f) = &result {
2779 let dlq_action = {
2781 let policy = bus.read::<DlqPolicy>();
2782 let sink = bus.read::<Arc<dyn DlqSink>>();
2783 match (sink, policy) {
2784 (Some(s), Some(p)) if !matches!(p, DlqPolicy::Drop) => Some(s.clone()),
2785 _ => None,
2786 }
2787 };
2788
2789 if let Some(sink) = dlq_action {
2790 if let Some((max_attempts, _)) = dlq_retry_config
2791 && let Some(timeline) = bus.read_mut::<Timeline>()
2792 {
2793 timeline.push(TimelineEvent::DlqExhausted {
2794 node_id: node_id.to_string(),
2795 total_attempts: max_attempts,
2796 timestamp: now_ms(),
2797 });
2798 }
2799
2800 let trace_id = persistence_trace_id(bus);
2801 let circuit = bus
2802 .read::<ranvier_core::schematic::Schematic>()
2803 .map(|s| s.name.clone())
2804 .unwrap_or_default();
2805 let _ = sink
2806 .store_dead_letter(
2807 &trace_id,
2808 &circuit,
2809 node_id,
2810 &format!("{:?}", f),
2811 &serde_json::to_vec(&f).unwrap_or_default(),
2812 )
2813 .await;
2814 }
2815 }
2816
2817 result
2818}
2819
2820#[allow(clippy::too_many_arguments)]
2821async fn run_this_compensated_step<Out, Next, E, Res, Comp>(
2822 trans: &(impl Transition<Out, Next, Resources = Res, Error = E> + Clone + 'static),
2823 comp: &Comp,
2824 state: Out,
2825 res: &Res,
2826 bus: &mut Bus,
2827 node_id: &str,
2828 comp_node_id: &str,
2829 node_label: &str,
2830 bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2831 step_idx: u64,
2832) -> Outcome<Next, E>
2833where
2834 Out: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
2835 Next: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2836 E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2837 Res: ranvier_core::transition::ResourceRequirement,
2838 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
2839{
2840 let label = trans.label();
2841
2842 let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2844 debug.should_pause(node_id)
2845 } else {
2846 false
2847 };
2848
2849 if should_pause {
2850 let trace_id = persistence_trace_id(bus);
2851 tracing::event!(
2852 target: "ranvier.debugger",
2853 tracing::Level::INFO,
2854 trace_id = %trace_id,
2855 node_id = %node_id,
2856 "Node paused (compensated)"
2857 );
2858
2859 if let Some(timeline) = bus.read_mut::<Timeline>() {
2860 timeline.push(TimelineEvent::NodePaused {
2861 node_id: node_id.to_string(),
2862 timestamp: now_ms(),
2863 });
2864 }
2865 if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2866 debug.wait().await;
2867 }
2868 }
2869
2870 let enter_ts = now_ms();
2871 if let Some(timeline) = bus.read_mut::<Timeline>() {
2872 timeline.push(TimelineEvent::NodeEnter {
2873 node_id: node_id.to_string(),
2874 node_label: node_label.to_string(),
2875 timestamp: enter_ts,
2876 });
2877 }
2878
2879 let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2881 Some(serde_json::to_vec(&state).unwrap_or_default())
2882 } else {
2883 None
2884 };
2885
2886 let node_span = tracing::info_span!("Node", ranvier.node = %label);
2887 bus.set_access_policy(label.clone(), bus_policy.clone());
2888 let result = trans
2889 .run(state.clone(), res, bus)
2890 .instrument(node_span)
2891 .await;
2892 bus.clear_access_policy();
2893
2894 let duration_ms = 0; let exit_ts = now_ms();
2896
2897 if let Some(timeline) = bus.read_mut::<Timeline>() {
2898 timeline.push(TimelineEvent::NodeExit {
2899 node_id: node_id.to_string(),
2900 outcome_type: outcome_type_name(&result),
2901 duration_ms,
2902 timestamp: exit_ts,
2903 });
2904 }
2905
2906 if let Outcome::Fault(ref err) = result {
2908 if compensation_auto_trigger(bus) {
2909 tracing::info!(
2910 ranvier.node = %label,
2911 ranvier.compensation.trigger = "saga",
2912 error = ?err,
2913 "Saga compensation triggered"
2914 );
2915
2916 if let Some(timeline) = bus.read_mut::<Timeline>() {
2917 timeline.push(TimelineEvent::NodeEnter {
2918 node_id: comp_node_id.to_string(),
2919 node_label: format!("Compensate: {}", comp.label()),
2920 timestamp: exit_ts,
2921 });
2922 }
2923
2924 let _ = comp.run(state, res, bus).await;
2926
2927 if let Some(timeline) = bus.read_mut::<Timeline>() {
2928 timeline.push(TimelineEvent::NodeExit {
2929 node_id: comp_node_id.to_string(),
2930 outcome_type: "Compensated".to_string(),
2931 duration_ms: 0,
2932 timestamp: now_ms(),
2933 });
2934 }
2935
2936 if let Some(handle) = bus.read::<PersistenceHandle>() {
2937 let trace_id = persistence_trace_id(bus);
2938 let circuit = bus
2939 .read::<ranvier_core::schematic::Schematic>()
2940 .map(|s| s.name.clone())
2941 .unwrap_or_default();
2942 let version = bus
2943 .read::<ranvier_core::schematic::Schematic>()
2944 .map(|s| s.schema_version.clone())
2945 .unwrap_or_default();
2946
2947 persist_execution_event(
2948 handle,
2949 &trace_id,
2950 &circuit,
2951 &version,
2952 step_idx + 1, Some(comp_node_id.to_string()),
2954 "Compensated",
2955 None,
2956 )
2957 .await;
2958 }
2959 }
2960 } else if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot) {
2961 if let Some(stack) = bus.read_mut::<SagaStack>() {
2963 stack.push(node_id.to_string(), label.clone(), snapshot);
2964 }
2965
2966 if let Some(handle) = bus.read::<PersistenceHandle>() {
2967 let trace_id = persistence_trace_id(bus);
2968 let circuit = bus
2969 .read::<ranvier_core::schematic::Schematic>()
2970 .map(|s| s.name.clone())
2971 .unwrap_or_default();
2972 let version = bus
2973 .read::<ranvier_core::schematic::Schematic>()
2974 .map(|s| s.schema_version.clone())
2975 .unwrap_or_default();
2976
2977 persist_execution_event(
2978 handle,
2979 &trace_id,
2980 &circuit,
2981 &version,
2982 step_idx,
2983 Some(node_id.to_string()),
2984 outcome_kind_name(&result),
2985 Some(result.to_json_value()),
2986 )
2987 .await;
2988 }
2989 }
2990
2991 if let Outcome::Fault(f) = &result
2993 && let (Some(sink), Some(policy)) =
2994 (bus.read::<Arc<dyn DlqSink>>(), bus.read::<DlqPolicy>())
2995 {
2996 let should_dlq = !matches!(policy, DlqPolicy::Drop);
2997 if should_dlq {
2998 let trace_id = persistence_trace_id(bus);
2999 let circuit = bus
3000 .read::<ranvier_core::schematic::Schematic>()
3001 .map(|s| s.name.clone())
3002 .unwrap_or_default();
3003 let _ = sink
3004 .store_dead_letter(
3005 &trace_id,
3006 &circuit,
3007 node_id,
3008 &format!("{:?}", f),
3009 &serde_json::to_vec(&f).unwrap_or_default(),
3010 )
3011 .await;
3012 }
3013 }
3014
3015 result
3016}
3017
3018#[allow(clippy::too_many_arguments)]
3019pub async fn persist_execution_event(
3020 handle: &PersistenceHandle,
3021 trace_id: &str,
3022 circuit: &str,
3023 schematic_version: &str,
3024 step: u64,
3025 node_id: Option<String>,
3026 outcome_kind: &str,
3027 payload: Option<serde_json::Value>,
3028) {
3029 let store = handle.store();
3030 let envelope = PersistenceEnvelope {
3031 trace_id: trace_id.to_string(),
3032 circuit: circuit.to_string(),
3033 schematic_version: schematic_version.to_string(),
3034 step,
3035 node_id,
3036 outcome_kind: outcome_kind.to_string(),
3037 timestamp_ms: now_ms(),
3038 payload_hash: None,
3039 payload,
3040 };
3041
3042 if let Err(err) = store.append(envelope).await {
3043 tracing::warn!(
3044 trace_id = %trace_id,
3045 circuit = %circuit,
3046 step,
3047 outcome_kind = %outcome_kind,
3048 error = %err,
3049 "Failed to append persistence envelope"
3050 );
3051 }
3052}
3053
3054async fn persist_completion(
3055 handle: &PersistenceHandle,
3056 trace_id: &str,
3057 completion: CompletionState,
3058) {
3059 let store = handle.store();
3060 if let Err(err) = store.complete(trace_id, completion).await {
3061 tracing::warn!(
3062 trace_id = %trace_id,
3063 error = %err,
3064 "Failed to complete persistence trace"
3065 );
3066 }
3067}
3068
3069fn compensation_idempotency_key(context: &CompensationContext) -> String {
3070 format!(
3071 "{}:{}:{}",
3072 context.trace_id, context.circuit, context.fault_kind
3073 )
3074}
3075
3076async fn run_compensation(
3077 handle: &CompensationHandle,
3078 context: CompensationContext,
3079 retry_policy: CompensationRetryPolicy,
3080 idempotency: Option<CompensationIdempotencyHandle>,
3081) -> bool {
3082 let hook = handle.hook();
3083 let key = compensation_idempotency_key(&context);
3084
3085 if let Some(store_handle) = idempotency.as_ref() {
3086 let store = store_handle.store();
3087 match store.was_compensated(&key).await {
3088 Ok(true) => {
3089 tracing::info!(
3090 trace_id = %context.trace_id,
3091 circuit = %context.circuit,
3092 key = %key,
3093 "Compensation already recorded; skipping duplicate hook execution"
3094 );
3095 return true;
3096 }
3097 Ok(false) => {}
3098 Err(err) => {
3099 tracing::warn!(
3100 trace_id = %context.trace_id,
3101 key = %key,
3102 error = %err,
3103 "Failed to check compensation idempotency state"
3104 );
3105 }
3106 }
3107 }
3108
3109 let max_attempts = retry_policy.max_attempts.max(1);
3110 for attempt in 1..=max_attempts {
3111 match hook.compensate(context.clone()).await {
3112 Ok(()) => {
3113 if let Some(store_handle) = idempotency.as_ref() {
3114 let store = store_handle.store();
3115 if let Err(err) = store.mark_compensated(&key).await {
3116 tracing::warn!(
3117 trace_id = %context.trace_id,
3118 key = %key,
3119 error = %err,
3120 "Failed to mark compensation idempotency state"
3121 );
3122 }
3123 }
3124 return true;
3125 }
3126 Err(err) => {
3127 let is_last = attempt == max_attempts;
3128 tracing::warn!(
3129 trace_id = %context.trace_id,
3130 circuit = %context.circuit,
3131 fault_kind = %context.fault_kind,
3132 fault_step = context.fault_step,
3133 attempt,
3134 max_attempts,
3135 error = %err,
3136 "Compensation hook attempt failed"
3137 );
3138 if !is_last && retry_policy.backoff_ms > 0 {
3139 tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
3140 .await;
3141 }
3142 }
3143 }
3144 }
3145 false
3146}
3147
3148fn ensure_timeline(bus: &mut Bus) -> bool {
3149 if bus.has::<Timeline>() {
3150 false
3151 } else {
3152 bus.insert(Timeline::new());
3153 true
3154 }
3155}
3156
3157fn should_attach_timeline(bus: &Bus) -> bool {
3158 if bus.has::<Timeline>() {
3160 return true;
3161 }
3162
3163 has_timeline_output_path()
3165}
3166
3167fn has_timeline_output_path() -> bool {
3168 std::env::var("RANVIER_TIMELINE_OUTPUT")
3169 .ok()
3170 .map(|v| !v.trim().is_empty())
3171 .unwrap_or(false)
3172}
3173
3174fn timeline_sample_rate() -> f64 {
3175 std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
3176 .ok()
3177 .and_then(|v| v.parse::<f64>().ok())
3178 .map(|v| v.clamp(0.0, 1.0))
3179 .unwrap_or(1.0)
3180}
3181
3182fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
3183 if rate <= 0.0 {
3184 return false;
3185 }
3186 if rate >= 1.0 {
3187 return true;
3188 }
3189 let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
3190 bucket < rate
3191}
3192
3193fn timeline_adaptive_policy() -> String {
3194 std::env::var("RANVIER_TIMELINE_ADAPTIVE")
3195 .unwrap_or_else(|_| "fault_branch".to_string())
3196 .to_ascii_lowercase()
3197}
3198
3199fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
3200 match policy {
3201 "off" => false,
3202 "fault_only" => matches!(outcome, Outcome::Fault(_)),
3203 "fault_branch_emit" => {
3204 matches!(
3205 outcome,
3206 Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
3207 )
3208 }
3209 _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
3210 }
3211}
3212
3213#[derive(Default, Clone)]
3214struct SamplingStats {
3215 total_decisions: u64,
3216 exported: u64,
3217 skipped: u64,
3218 sampled_exports: u64,
3219 forced_exports: u64,
3220 last_mode: String,
3221 last_policy: String,
3222 last_updated_ms: u64,
3223}
3224
3225static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
3226
3227fn stats_cell() -> &'static Mutex<SamplingStats> {
3228 TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
3229}
3230
3231fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
3232 let snapshot = {
3233 let mut stats = match stats_cell().lock() {
3234 Ok(guard) => guard,
3235 Err(_) => return,
3236 };
3237
3238 stats.total_decisions += 1;
3239 if exported {
3240 stats.exported += 1;
3241 } else {
3242 stats.skipped += 1;
3243 }
3244 if sampled && exported {
3245 stats.sampled_exports += 1;
3246 }
3247 if forced && exported {
3248 stats.forced_exports += 1;
3249 }
3250 stats.last_mode = mode.to_string();
3251 stats.last_policy = policy.to_string();
3252 stats.last_updated_ms = now_ms();
3253 stats.clone()
3254 };
3255
3256 tracing::debug!(
3257 ranvier.timeline.total_decisions = snapshot.total_decisions,
3258 ranvier.timeline.exported = snapshot.exported,
3259 ranvier.timeline.skipped = snapshot.skipped,
3260 ranvier.timeline.sampled_exports = snapshot.sampled_exports,
3261 ranvier.timeline.forced_exports = snapshot.forced_exports,
3262 ranvier.timeline.mode = %snapshot.last_mode,
3263 ranvier.timeline.policy = %snapshot.last_policy,
3264 "Timeline sampling stats updated"
3265 );
3266
3267 if let Some(path) = timeline_stats_output_path() {
3268 let payload = serde_json::json!({
3269 "total_decisions": snapshot.total_decisions,
3270 "exported": snapshot.exported,
3271 "skipped": snapshot.skipped,
3272 "sampled_exports": snapshot.sampled_exports,
3273 "forced_exports": snapshot.forced_exports,
3274 "last_mode": snapshot.last_mode,
3275 "last_policy": snapshot.last_policy,
3276 "last_updated_ms": snapshot.last_updated_ms
3277 });
3278 if let Some(parent) = Path::new(&path).parent() {
3279 let _ = fs::create_dir_all(parent);
3280 }
3281 if let Err(err) = fs::write(&path, payload.to_string()) {
3282 tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
3283 }
3284 }
3285}
3286
3287fn timeline_stats_output_path() -> Option<String> {
3288 std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
3289 .ok()
3290 .filter(|v| !v.trim().is_empty())
3291}
3292
3293fn write_timeline_with_policy(
3294 path: &str,
3295 mode: &str,
3296 mut timeline: Timeline,
3297) -> Result<(), String> {
3298 match mode {
3299 "append" => {
3300 if Path::new(path).exists() {
3301 let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
3302 match serde_json::from_str::<Timeline>(&content) {
3303 Ok(mut existing) => {
3304 existing.events.append(&mut timeline.events);
3305 existing.sort();
3306 if let Some(max_events) = max_events_limit() {
3307 truncate_timeline_events(&mut existing, max_events);
3308 }
3309 write_timeline_json(path, &existing)
3310 }
3311 Err(_) => {
3312 if let Some(max_events) = max_events_limit() {
3314 truncate_timeline_events(&mut timeline, max_events);
3315 }
3316 write_timeline_json(path, &timeline)
3317 }
3318 }
3319 } else {
3320 if let Some(max_events) = max_events_limit() {
3321 truncate_timeline_events(&mut timeline, max_events);
3322 }
3323 write_timeline_json(path, &timeline)
3324 }
3325 }
3326 "rotate" => {
3327 let rotated_path = rotated_path(path, now_ms());
3328 write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
3329 if let Some(keep) = rotate_keep_limit() {
3330 cleanup_rotated_files(path, keep)?;
3331 }
3332 Ok(())
3333 }
3334 _ => write_timeline_json(path, &timeline),
3335 }
3336}
3337
3338fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
3339 if let Some(parent) = Path::new(path).parent()
3340 && !parent.as_os_str().is_empty()
3341 {
3342 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
3343 }
3344 let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
3345 fs::write(path, json).map_err(|e| e.to_string())
3346}
3347
3348fn rotated_path(path: &str, suffix: u64) -> PathBuf {
3349 let p = Path::new(path);
3350 let parent = p.parent().unwrap_or_else(|| Path::new(""));
3351 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
3352 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
3353 parent.join(format!("{}_{}.{}", stem, suffix, ext))
3354}
3355
3356fn max_events_limit() -> Option<usize> {
3357 std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
3358 .ok()
3359 .and_then(|v| v.parse::<usize>().ok())
3360 .filter(|v| *v > 0)
3361}
3362
3363fn rotate_keep_limit() -> Option<usize> {
3364 std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
3365 .ok()
3366 .and_then(|v| v.parse::<usize>().ok())
3367 .filter(|v| *v > 0)
3368}
3369
3370fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
3371 let len = timeline.events.len();
3372 if len > max_events {
3373 let keep_from = len - max_events;
3374 timeline.events = timeline.events.split_off(keep_from);
3375 }
3376}
3377
3378fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
3379 let p = Path::new(base_path);
3380 let parent = p.parent().unwrap_or_else(|| Path::new("."));
3381 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
3382 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
3383 let prefix = format!("{}_", stem);
3384 let suffix = format!(".{}", ext);
3385
3386 let mut files = fs::read_dir(parent)
3387 .map_err(|e| e.to_string())?
3388 .filter_map(|entry| entry.ok())
3389 .filter(|entry| {
3390 let name = entry.file_name();
3391 let name = name.to_string_lossy();
3392 name.starts_with(&prefix) && name.ends_with(&suffix)
3393 })
3394 .map(|entry| {
3395 let modified = entry
3396 .metadata()
3397 .ok()
3398 .and_then(|m| m.modified().ok())
3399 .unwrap_or(SystemTime::UNIX_EPOCH);
3400 (entry.path(), modified)
3401 })
3402 .collect::<Vec<_>>();
3403
3404 files.sort_by(|a, b| b.1.cmp(&a.1));
3405 for (path, _) in files.into_iter().skip(keep) {
3406 let _ = fs::remove_file(path);
3407 }
3408 Ok(())
3409}
3410
3411fn bus_capability_schema_from_policy(
3412 policy: Option<ranvier_core::bus::BusAccessPolicy>,
3413) -> Option<BusCapabilitySchema> {
3414 let policy = policy?;
3415
3416 let mut allow = policy
3417 .allow
3418 .unwrap_or_default()
3419 .into_iter()
3420 .map(|entry| entry.type_name.to_string())
3421 .collect::<Vec<_>>();
3422 let mut deny = policy
3423 .deny
3424 .into_iter()
3425 .map(|entry| entry.type_name.to_string())
3426 .collect::<Vec<_>>();
3427 allow.sort();
3428 deny.sort();
3429
3430 if allow.is_empty() && deny.is_empty() {
3431 return None;
3432 }
3433
3434 Some(BusCapabilitySchema { allow, deny })
3435}
3436
3437fn now_ms() -> u64 {
3438 SystemTime::now()
3439 .duration_since(UNIX_EPOCH)
3440 .map(|d| d.as_millis() as u64)
3441 .unwrap_or(0)
3442}
3443
3444#[cfg(test)]
3445mod tests {
3446 use super::{
3447 Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
3448 should_force_export,
3449 };
3450 use crate::persistence::{
3451 CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
3452 CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
3453 InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
3454 PersistenceHandle, PersistenceStore, PersistenceTraceId,
3455 };
3456 use anyhow::Result;
3457 use async_trait::async_trait;
3458 use ranvier_audit::{AuditError, AuditEvent, AuditSink};
3459 use ranvier_core::event::{DlqPolicy, DlqSink};
3460 use ranvier_core::saga::SagaStack;
3461 use ranvier_core::timeline::{Timeline, TimelineEvent};
3462 use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
3463 use serde::{Deserialize, Serialize};
3464 use std::sync::Arc;
3465 use tokio::sync::Mutex;
3466 use uuid::Uuid;
3467
3468 struct MockAuditSink {
3469 events: Arc<Mutex<Vec<AuditEvent>>>,
3470 }
3471
3472 #[async_trait]
3473 impl AuditSink for MockAuditSink {
3474 async fn append(&self, event: &AuditEvent) -> Result<(), AuditError> {
3475 self.events.lock().await.push(event.clone());
3476 Ok(())
3477 }
3478 }
3479
3480 #[tokio::test]
3481 async fn execute_logs_audit_events_for_intervention() {
3482 use ranvier_inspector::StateInspector;
3483
3484 let trace_id = "test-audit-trace";
3485 let store_impl = InMemoryPersistenceStore::new();
3486 let events = Arc::new(Mutex::new(Vec::new()));
3487 let sink = MockAuditSink {
3488 events: events.clone(),
3489 };
3490
3491 let axon = Axon::<i32, i32, TestInfallible>::start("AuditTest")
3492 .then(AddOne)
3493 .with_persistence_store(store_impl.clone())
3494 .with_audit_sink(sink);
3495
3496 let mut bus = Bus::new();
3497 bus.insert(PersistenceHandle::from_arc(Arc::new(store_impl.clone())));
3498 bus.insert(PersistenceTraceId::new(trace_id));
3499 let target_node_id = axon.schematic.nodes[0].id.clone();
3500
3501 store_impl
3503 .append(crate::persistence::PersistenceEnvelope {
3504 trace_id: trace_id.to_string(),
3505 circuit: "AuditTest".to_string(),
3506 schematic_version: "v1.0".to_string(),
3507 step: 0,
3508 node_id: None,
3509 outcome_kind: "Next".to_string(),
3510 timestamp_ms: 0,
3511 payload_hash: None,
3512 payload: None,
3513 })
3514 .await
3515 .unwrap();
3516
3517 axon.force_resume(trace_id, &target_node_id, None)
3519 .await
3520 .unwrap();
3521
3522 axon.execute(10, &(), &mut bus).await;
3524
3525 let recorded = events.lock().await;
3526 assert_eq!(
3527 recorded.len(),
3528 2,
3529 "Should have 2 audit events: ForceResume and ApplyIntervention"
3530 );
3531 assert_eq!(recorded[0].action, "ForceResume");
3532 assert_eq!(recorded[0].target, trace_id);
3533 assert_eq!(recorded[1].action, "ApplyIntervention");
3534 assert_eq!(recorded[1].target, trace_id);
3535 }
3536
3537 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
3538 pub enum TestInfallible {}
3539
3540 #[test]
3541 fn inspector_enabled_flag_matrix() {
3542 assert!(inspector_enabled_from_value(None));
3543 assert!(inspector_enabled_from_value(Some("1")));
3544 assert!(inspector_enabled_from_value(Some("true")));
3545 assert!(inspector_enabled_from_value(Some("on")));
3546 assert!(!inspector_enabled_from_value(Some("0")));
3547 assert!(!inspector_enabled_from_value(Some("false")));
3548 }
3549
3550 #[test]
3551 fn inspector_dev_mode_matrix() {
3552 assert!(inspector_dev_mode_from_value(None));
3553 assert!(inspector_dev_mode_from_value(Some("dev")));
3554 assert!(inspector_dev_mode_from_value(Some("staging")));
3555 assert!(!inspector_dev_mode_from_value(Some("prod")));
3556 assert!(!inspector_dev_mode_from_value(Some("production")));
3557 }
3558
3559 #[test]
3560 fn adaptive_policy_force_export_matrix() {
3561 let next = Outcome::<(), &'static str>::Next(());
3562 let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
3563 let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
3564 let fault = Outcome::<(), &'static str>::Fault("boom");
3565
3566 assert!(!should_force_export(&next, "off"));
3567 assert!(!should_force_export(&fault, "off"));
3568
3569 assert!(!should_force_export(&branch, "fault_only"));
3570 assert!(should_force_export(&fault, "fault_only"));
3571
3572 assert!(should_force_export(&branch, "fault_branch"));
3573 assert!(!should_force_export(&emit, "fault_branch"));
3574 assert!(should_force_export(&fault, "fault_branch"));
3575
3576 assert!(should_force_export(&branch, "fault_branch_emit"));
3577 assert!(should_force_export(&emit, "fault_branch_emit"));
3578 assert!(should_force_export(&fault, "fault_branch_emit"));
3579 }
3580
3581 #[test]
3582 fn sampling_and_adaptive_combination_decisions() {
3583 let bus_id = Uuid::nil();
3584 let next = Outcome::<(), &'static str>::Next(());
3585 let fault = Outcome::<(), &'static str>::Fault("boom");
3586
3587 let sampled_never = sampled_by_bus_id(bus_id, 0.0);
3588 assert!(!sampled_never);
3589 assert!(!(sampled_never || should_force_export(&next, "off")));
3590 assert!(sampled_never || should_force_export(&fault, "fault_only"));
3591
3592 let sampled_always = sampled_by_bus_id(bus_id, 1.0);
3593 assert!(sampled_always);
3594 assert!(sampled_always || should_force_export(&next, "off"));
3595 assert!(sampled_always || should_force_export(&fault, "off"));
3596 }
3597
3598 #[derive(Clone)]
3599 struct AddOne;
3600
3601 #[async_trait]
3602 impl Transition<i32, i32> for AddOne {
3603 type Error = TestInfallible;
3604 type Resources = ();
3605
3606 async fn run(
3607 &self,
3608 state: i32,
3609 _resources: &Self::Resources,
3610 _bus: &mut Bus,
3611 ) -> Outcome<i32, Self::Error> {
3612 Outcome::Next(state + 1)
3613 }
3614 }
3615
3616 #[derive(Clone)]
3617 struct AlwaysFault;
3618
3619 #[async_trait]
3620 impl Transition<i32, i32> for AlwaysFault {
3621 type Error = String;
3622 type Resources = ();
3623
3624 async fn run(
3625 &self,
3626 _state: i32,
3627 _resources: &Self::Resources,
3628 _bus: &mut Bus,
3629 ) -> Outcome<i32, Self::Error> {
3630 Outcome::Fault("boom".to_string())
3631 }
3632 }
3633
3634 #[derive(Clone)]
3635 struct CapabilityGuarded;
3636
3637 #[async_trait]
3638 impl Transition<(), ()> for CapabilityGuarded {
3639 type Error = String;
3640 type Resources = ();
3641
3642 fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
3643 Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
3644 }
3645
3646 async fn run(
3647 &self,
3648 _state: (),
3649 _resources: &Self::Resources,
3650 bus: &mut Bus,
3651 ) -> Outcome<(), Self::Error> {
3652 match bus.get::<String>() {
3653 Ok(_) => Outcome::Next(()),
3654 Err(err) => Outcome::Fault(err.to_string()),
3655 }
3656 }
3657 }
3658
3659 #[derive(Clone)]
3660 struct RecordingCompensationHook {
3661 calls: Arc<Mutex<Vec<CompensationContext>>>,
3662 should_fail: bool,
3663 }
3664
3665 #[async_trait]
3666 impl CompensationHook for RecordingCompensationHook {
3667 async fn compensate(&self, context: CompensationContext) -> Result<()> {
3668 self.calls.lock().await.push(context);
3669 if self.should_fail {
3670 return Err(anyhow::anyhow!("compensation failed"));
3671 }
3672 Ok(())
3673 }
3674 }
3675
3676 #[derive(Clone)]
3677 struct FlakyCompensationHook {
3678 calls: Arc<Mutex<u32>>,
3679 failures_remaining: Arc<Mutex<u32>>,
3680 }
3681
3682 #[async_trait]
3683 impl CompensationHook for FlakyCompensationHook {
3684 async fn compensate(&self, _context: CompensationContext) -> Result<()> {
3685 {
3686 let mut calls = self.calls.lock().await;
3687 *calls += 1;
3688 }
3689 let mut failures_remaining = self.failures_remaining.lock().await;
3690 if *failures_remaining > 0 {
3691 *failures_remaining -= 1;
3692 return Err(anyhow::anyhow!("transient compensation failure"));
3693 }
3694 Ok(())
3695 }
3696 }
3697
3698 #[derive(Clone)]
3699 struct FailingCompensationIdempotencyStore {
3700 read_calls: Arc<Mutex<u32>>,
3701 write_calls: Arc<Mutex<u32>>,
3702 }
3703
3704 #[async_trait]
3705 impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
3706 async fn was_compensated(&self, _key: &str) -> Result<bool> {
3707 let mut read_calls = self.read_calls.lock().await;
3708 *read_calls += 1;
3709 Err(anyhow::anyhow!("forced idempotency read failure"))
3710 }
3711
3712 async fn mark_compensated(&self, _key: &str) -> Result<()> {
3713 let mut write_calls = self.write_calls.lock().await;
3714 *write_calls += 1;
3715 Err(anyhow::anyhow!("forced idempotency write failure"))
3716 }
3717 }
3718
3719 #[tokio::test]
3720 async fn execute_persists_success_trace_when_handle_exists() {
3721 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3722 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3723
3724 let mut bus = Bus::new();
3725 bus.insert(PersistenceHandle::from_arc(store_dyn));
3726 bus.insert(PersistenceTraceId::new("trace-success"));
3727
3728 let axon = Axon::<i32, i32, TestInfallible>::start("PersistSuccess").then(AddOne);
3729 let outcome = axon.execute(41, &(), &mut bus).await;
3730 assert!(matches!(outcome, Outcome::Next(42)));
3731
3732 let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
3733 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[0].outcome_kind, "Enter");
3735 assert_eq!(persisted.events[1].outcome_kind, "Next"); assert_eq!(persisted.events[2].outcome_kind, "Next"); assert_eq!(persisted.completion, Some(CompletionState::Success));
3738 }
3739
3740 #[tokio::test]
3741 async fn execute_persists_fault_completion_state() {
3742 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3743 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3744
3745 let mut bus = Bus::new();
3746 bus.insert(PersistenceHandle::from_arc(store_dyn));
3747 bus.insert(PersistenceTraceId::new("trace-fault"));
3748
3749 let axon = Axon::<i32, i32, String>::start("PersistFault").then(AlwaysFault);
3750 let outcome = axon.execute(41, &(), &mut bus).await;
3751 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3752
3753 let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
3754 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[1].outcome_kind, "Fault"); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.completion, Some(CompletionState::Fault));
3758 }
3759
3760 #[tokio::test]
3761 async fn execute_respects_persistence_auto_complete_off() {
3762 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3763 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3764
3765 let mut bus = Bus::new();
3766 bus.insert(PersistenceHandle::from_arc(store_dyn));
3767 bus.insert(PersistenceTraceId::new("trace-no-complete"));
3768 bus.insert(PersistenceAutoComplete(false));
3769
3770 let axon = Axon::<i32, i32, TestInfallible>::start("PersistNoComplete").then(AddOne);
3771 let outcome = axon.execute(1, &(), &mut bus).await;
3772 assert!(matches!(outcome, Outcome::Next(2)));
3773
3774 let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
3775 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.completion, None);
3777 }
3778
3779 #[tokio::test]
3780 async fn fault_triggers_compensation_and_marks_compensated() {
3781 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3782 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3783 let calls = Arc::new(Mutex::new(Vec::new()));
3784 let compensation = RecordingCompensationHook {
3785 calls: calls.clone(),
3786 should_fail: false,
3787 };
3788
3789 let mut bus = Bus::new();
3790 bus.insert(PersistenceHandle::from_arc(store_dyn));
3791 bus.insert(PersistenceTraceId::new("trace-compensated"));
3792 bus.insert(CompensationHandle::from_hook(compensation));
3793
3794 let axon = Axon::<i32, i32, String>::start("CompensatedFault").then(AlwaysFault);
3795 let outcome = axon.execute(7, &(), &mut bus).await;
3796 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3797
3798 let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
3799 assert_eq!(persisted.events.len(), 4); assert_eq!(persisted.events[0].outcome_kind, "Enter");
3801 assert_eq!(persisted.events[1].outcome_kind, "Fault"); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.events[3].outcome_kind, "Compensated");
3804 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3805
3806 let recorded = calls.lock().await;
3807 assert_eq!(recorded.len(), 1);
3808 assert_eq!(recorded[0].trace_id, "trace-compensated");
3809 assert_eq!(recorded[0].fault_kind, "Fault");
3810 }
3811
3812 #[tokio::test]
3813 async fn failed_compensation_keeps_fault_completion() {
3814 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3815 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3816 let calls = Arc::new(Mutex::new(Vec::new()));
3817 let compensation = RecordingCompensationHook {
3818 calls: calls.clone(),
3819 should_fail: true,
3820 };
3821
3822 let mut bus = Bus::new();
3823 bus.insert(PersistenceHandle::from_arc(store_dyn));
3824 bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
3825 bus.insert(CompensationHandle::from_hook(compensation));
3826
3827 let axon = Axon::<i32, i32, String>::start("CompensationFails").then(AlwaysFault);
3828 let outcome = axon.execute(7, &(), &mut bus).await;
3829 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3830
3831 let persisted = store_impl
3832 .load("trace-compensation-failed")
3833 .await
3834 .unwrap()
3835 .unwrap();
3836 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.completion, Some(CompletionState::Fault));
3839
3840 let recorded = calls.lock().await;
3841 assert_eq!(recorded.len(), 1);
3842 }
3843
3844 #[tokio::test]
3845 async fn compensation_retry_policy_succeeds_after_retries() {
3846 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3847 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3848 let calls = Arc::new(Mutex::new(0u32));
3849 let failures_remaining = Arc::new(Mutex::new(2u32));
3850 let compensation = FlakyCompensationHook {
3851 calls: calls.clone(),
3852 failures_remaining,
3853 };
3854
3855 let mut bus = Bus::new();
3856 bus.insert(PersistenceHandle::from_arc(store_dyn));
3857 bus.insert(PersistenceTraceId::new("trace-retry-success"));
3858 bus.insert(CompensationHandle::from_hook(compensation));
3859 bus.insert(CompensationRetryPolicy {
3860 max_attempts: 3,
3861 backoff_ms: 0,
3862 });
3863
3864 let axon = Axon::<i32, i32, String>::start("CompensationRetry").then(AlwaysFault);
3865 let outcome = axon.execute(7, &(), &mut bus).await;
3866 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3867
3868 let persisted = store_impl
3869 .load("trace-retry-success")
3870 .await
3871 .unwrap()
3872 .unwrap();
3873 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3874 assert_eq!(
3875 persisted.events.last().map(|e| e.outcome_kind.as_str()),
3876 Some("Compensated")
3877 );
3878
3879 let attempt_count = calls.lock().await;
3880 assert_eq!(*attempt_count, 3);
3881 }
3882
3883 #[tokio::test]
3884 async fn compensation_idempotency_skips_duplicate_hook_execution() {
3885 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3886 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3887 let calls = Arc::new(Mutex::new(Vec::new()));
3888 let compensation = RecordingCompensationHook {
3889 calls: calls.clone(),
3890 should_fail: false,
3891 };
3892 let idempotency = InMemoryCompensationIdempotencyStore::new();
3893
3894 let mut bus = Bus::new();
3895 bus.insert(PersistenceHandle::from_arc(store_dyn));
3896 bus.insert(PersistenceTraceId::new("trace-idempotent"));
3897 bus.insert(PersistenceAutoComplete(false));
3898 bus.insert(CompensationHandle::from_hook(compensation));
3899 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3900
3901 let axon = Axon::<i32, i32, String>::start("CompensationIdempotency").then(AlwaysFault);
3902
3903 let outcome1 = axon.execute(7, &(), &mut bus).await;
3904 let outcome2 = axon.execute(8, &(), &mut bus).await;
3905 assert!(matches!(outcome1, Outcome::Fault(msg) if msg == "boom"));
3906 assert!(matches!(outcome2, Outcome::Fault(msg) if msg == "boom"));
3907
3908 let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
3909 assert_eq!(persisted.completion, None);
3910 let compensated_count = persisted
3912 .events
3913 .iter()
3914 .filter(|e| e.outcome_kind == "Compensated")
3915 .count();
3916 assert_eq!(
3917 compensated_count, 2,
3918 "Should have 2 Compensated events (one per execution)"
3919 );
3920
3921 let recorded = calls.lock().await;
3922 assert_eq!(recorded.len(), 1);
3923 }
3924
3925 #[tokio::test]
3926 async fn compensation_idempotency_store_failure_does_not_block_compensation() {
3927 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3928 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3929 let calls = Arc::new(Mutex::new(Vec::new()));
3930 let read_calls = Arc::new(Mutex::new(0u32));
3931 let write_calls = Arc::new(Mutex::new(0u32));
3932 let compensation = RecordingCompensationHook {
3933 calls: calls.clone(),
3934 should_fail: false,
3935 };
3936 let idempotency = FailingCompensationIdempotencyStore {
3937 read_calls: read_calls.clone(),
3938 write_calls: write_calls.clone(),
3939 };
3940
3941 let mut bus = Bus::new();
3942 bus.insert(PersistenceHandle::from_arc(store_dyn));
3943 bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
3944 bus.insert(CompensationHandle::from_hook(compensation));
3945 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3946
3947 let axon = Axon::<i32, i32, String>::start("IdempotencyStoreFailure").then(AlwaysFault);
3948 let outcome = axon.execute(9, &(), &mut bus).await;
3949 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3950
3951 let persisted = store_impl
3952 .load("trace-idempotency-store-failure")
3953 .await
3954 .unwrap()
3955 .unwrap();
3956 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3957 assert_eq!(
3958 persisted.events.last().map(|e| e.outcome_kind.as_str()),
3959 Some("Compensated")
3960 );
3961
3962 let recorded = calls.lock().await;
3963 assert_eq!(recorded.len(), 1);
3964 assert_eq!(*read_calls.lock().await, 1);
3965 assert_eq!(*write_calls.lock().await, 1);
3966 }
3967
3968 #[tokio::test]
3969 async fn transition_bus_policy_blocks_unauthorized_resource_access() {
3970 let mut bus = Bus::new();
3971 bus.insert(1_i32);
3972 bus.insert("secret".to_string());
3973
3974 let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
3975 let outcome = axon.execute((), &(), &mut bus).await;
3976
3977 match outcome {
3978 Outcome::Fault(msg) => {
3979 assert!(msg.contains("Bus access denied"), "{msg}");
3980 assert!(msg.contains("CapabilityGuarded"), "{msg}");
3981 assert!(msg.contains("alloc::string::String"), "{msg}");
3982 }
3983 other => panic!("expected fault, got {other:?}"),
3984 }
3985 }
3986
3987 #[tokio::test]
3988 async fn execute_fails_on_version_mismatch_without_migration() {
3989 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3990 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3991
3992 let trace_id = "v-mismatch";
3993 let old_envelope = crate::persistence::PersistenceEnvelope {
3995 trace_id: trace_id.to_string(),
3996 circuit: "TestCircuit".to_string(),
3997 schematic_version: "0.9".to_string(),
3998 step: 0,
3999 node_id: None,
4000 outcome_kind: "Enter".to_string(),
4001 timestamp_ms: 0,
4002 payload_hash: None,
4003 payload: None,
4004 };
4005 store_impl.append(old_envelope).await.unwrap();
4006
4007 let mut bus = Bus::new();
4008 bus.insert(PersistenceHandle::from_arc(store_dyn));
4009 bus.insert(PersistenceTraceId::new(trace_id));
4010
4011 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
4013 let outcome = axon.execute(10, &(), &mut bus).await;
4014
4015 if let Outcome::Emit(kind, _) = outcome {
4016 assert_eq!(kind, "execution.resumption.version_mismatch_failed");
4017 } else {
4018 panic!("Expected version mismatch emission, got {:?}", outcome);
4019 }
4020 }
4021
4022 #[tokio::test]
4023 async fn execute_resumes_from_start_on_migration_strategy() {
4024 let store_impl = Arc::new(InMemoryPersistenceStore::new());
4025 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
4026
4027 let trace_id = "v-migration";
4028 let old_envelope = crate::persistence::PersistenceEnvelope {
4030 trace_id: trace_id.to_string(),
4031 circuit: "TestCircuit".to_string(),
4032 schematic_version: "0.9".to_string(),
4033 step: 5,
4034 node_id: None,
4035 outcome_kind: "Next".to_string(),
4036 timestamp_ms: 0,
4037 payload_hash: None,
4038 payload: None,
4039 };
4040 store_impl.append(old_envelope).await.unwrap();
4041
4042 let mut registry = ranvier_core::schematic::MigrationRegistry::new("TestCircuit");
4043 registry.register(ranvier_core::schematic::SnapshotMigration {
4044 name: Some("v0.9 to v1.0".to_string()),
4045 from_version: "0.9".to_string(),
4046 to_version: "1.0".to_string(),
4047 default_strategy: ranvier_core::schematic::MigrationStrategy::ResumeFromStart,
4048 node_mapping: std::collections::HashMap::new(),
4049 payload_mapper: None,
4050 });
4051
4052 let mut bus = Bus::new();
4053 bus.insert(PersistenceHandle::from_arc(store_dyn));
4054 bus.insert(PersistenceTraceId::new(trace_id));
4055 bus.insert(registry);
4056
4057 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
4058 let outcome = axon.execute(10, &(), &mut bus).await;
4059
4060 assert!(matches!(outcome, Outcome::Next(11)));
4062
4063 let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
4065 assert_eq!(persisted.schematic_version, "1.0");
4066 }
4067
4068 #[tokio::test]
4069 async fn execute_applies_manual_intervention_jump_and_payload() {
4070 let store_impl = Arc::new(InMemoryPersistenceStore::new());
4071 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
4072
4073 let trace_id = "intervention-test";
4074 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit")
4076 .then(AddOne)
4077 .then(AddOne);
4078
4079 let mut bus = Bus::new();
4080 bus.insert(PersistenceHandle::from_arc(store_dyn));
4081 bus.insert(PersistenceTraceId::new(trace_id));
4082
4083 let _target_node_label = "AddOne";
4088 let target_node_id = axon.schematic.nodes[2].id.clone();
4090
4091 store_impl
4093 .append(crate::persistence::PersistenceEnvelope {
4094 trace_id: trace_id.to_string(),
4095 circuit: "TestCircuit".to_string(),
4096 schematic_version: "1.0".to_string(),
4097 step: 0,
4098 node_id: None,
4099 outcome_kind: "Enter".to_string(),
4100 timestamp_ms: 0,
4101 payload_hash: None,
4102 payload: None,
4103 })
4104 .await
4105 .unwrap();
4106
4107 store_impl
4108 .save_intervention(
4109 trace_id,
4110 crate::persistence::Intervention {
4111 target_node: target_node_id.clone(),
4112 payload_override: Some(serde_json::json!(100)),
4113 timestamp_ms: 0,
4114 },
4115 )
4116 .await
4117 .unwrap();
4118
4119 let outcome = axon.execute(10, &(), &mut bus).await;
4122
4123 match outcome {
4124 Outcome::Next(val) => assert_eq!(val, 101, "Should have used payload 100 and added 1"),
4125 other => panic!("Expected Outcome::Next(101), got {:?}", other),
4126 }
4127
4128 let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
4130 assert_eq!(persisted.interventions.len(), 1);
4132 assert_eq!(persisted.interventions[0].target_node, target_node_id);
4133 }
4134
4135 #[derive(Clone)]
4139 struct FailNThenSucceed {
4140 remaining: Arc<tokio::sync::Mutex<u32>>,
4141 }
4142
4143 #[async_trait]
4144 impl Transition<i32, i32> for FailNThenSucceed {
4145 type Error = String;
4146 type Resources = ();
4147
4148 async fn run(
4149 &self,
4150 state: i32,
4151 _resources: &Self::Resources,
4152 _bus: &mut Bus,
4153 ) -> Outcome<i32, Self::Error> {
4154 let mut rem = self.remaining.lock().await;
4155 if *rem > 0 {
4156 *rem -= 1;
4157 Outcome::Fault("transient failure".to_string())
4158 } else {
4159 Outcome::Next(state + 1)
4160 }
4161 }
4162 }
4163
4164 #[derive(Clone)]
4166 struct MockDlqSink {
4167 letters: Arc<tokio::sync::Mutex<Vec<String>>>,
4168 }
4169
4170 #[async_trait]
4171 impl DlqSink for MockDlqSink {
4172 async fn store_dead_letter(
4173 &self,
4174 workflow_id: &str,
4175 _circuit_label: &str,
4176 node_id: &str,
4177 error_msg: &str,
4178 _payload: &[u8],
4179 ) -> Result<(), String> {
4180 let entry = format!("{}:{}:{}", workflow_id, node_id, error_msg);
4181 self.letters.lock().await.push(entry);
4182 Ok(())
4183 }
4184 }
4185
4186 #[tokio::test]
4187 async fn retry_then_dlq_retries_and_succeeds_before_exhaustion() {
4188 let remaining = Arc::new(tokio::sync::Mutex::new(2u32));
4190 let trans = FailNThenSucceed { remaining };
4191
4192 let dlq_sink = MockDlqSink {
4193 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4194 };
4195
4196 let mut bus = Bus::new();
4197 bus.insert(Timeline::new());
4198
4199 let axon = Axon::<i32, i32, String>::start("RetrySucceed")
4200 .then(trans)
4201 .with_dlq_policy(DlqPolicy::RetryThenDlq {
4202 max_attempts: 5,
4203 backoff_ms: 1,
4204 })
4205 .with_dlq_sink(dlq_sink.clone());
4206 let outcome = axon.execute(10, &(), &mut bus).await;
4207
4208 assert!(
4210 matches!(outcome, Outcome::Next(11)),
4211 "Expected Next(11), got {:?}",
4212 outcome
4213 );
4214
4215 let letters = dlq_sink.letters.lock().await;
4217 assert!(
4218 letters.is_empty(),
4219 "Should have 0 dead letters, got {}",
4220 letters.len()
4221 );
4222
4223 let timeline = bus.read::<Timeline>().unwrap();
4225 let retry_count = timeline
4226 .events
4227 .iter()
4228 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4229 .count();
4230 assert_eq!(retry_count, 2, "Should have 2 retry events");
4231 }
4232
4233 #[tokio::test]
4234 async fn retry_then_dlq_exhausts_retries_and_sends_to_dlq() {
4235 let mut bus = Bus::new();
4237 bus.insert(Timeline::new());
4238
4239 let dlq_sink = MockDlqSink {
4240 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4241 };
4242
4243 let axon = Axon::<i32, i32, String>::start("RetryExhaust")
4244 .then(AlwaysFault)
4245 .with_dlq_policy(DlqPolicy::RetryThenDlq {
4246 max_attempts: 3,
4247 backoff_ms: 1,
4248 })
4249 .with_dlq_sink(dlq_sink.clone());
4250 let outcome = axon.execute(42, &(), &mut bus).await;
4251
4252 assert!(
4253 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4254 "Expected Fault(boom), got {:?}",
4255 outcome
4256 );
4257
4258 let letters = dlq_sink.letters.lock().await;
4260 assert_eq!(letters.len(), 1, "Should have 1 dead letter");
4261
4262 let timeline = bus.read::<Timeline>().unwrap();
4264 let retry_count = timeline
4265 .events
4266 .iter()
4267 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4268 .count();
4269 let dlq_count = timeline
4270 .events
4271 .iter()
4272 .filter(|e| matches!(e, TimelineEvent::DlqExhausted { .. }))
4273 .count();
4274 assert_eq!(
4275 retry_count, 2,
4276 "Should have 2 retry events (attempts 2 and 3)"
4277 );
4278 assert_eq!(dlq_count, 1, "Should have 1 DlqExhausted event");
4279 }
4280
4281 #[tokio::test]
4282 async fn send_to_dlq_policy_sends_immediately_without_retry() {
4283 let mut bus = Bus::new();
4284 bus.insert(Timeline::new());
4285
4286 let dlq_sink = MockDlqSink {
4287 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4288 };
4289
4290 let axon = Axon::<i32, i32, String>::start("SendDlq")
4291 .then(AlwaysFault)
4292 .with_dlq_policy(DlqPolicy::SendToDlq)
4293 .with_dlq_sink(dlq_sink.clone());
4294 let outcome = axon.execute(1, &(), &mut bus).await;
4295
4296 assert!(matches!(outcome, Outcome::Fault(_)));
4297
4298 let letters = dlq_sink.letters.lock().await;
4300 assert_eq!(letters.len(), 1);
4301
4302 let timeline = bus.read::<Timeline>().unwrap();
4304 let retry_count = timeline
4305 .events
4306 .iter()
4307 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4308 .count();
4309 assert_eq!(retry_count, 0);
4310 }
4311
4312 #[tokio::test]
4313 async fn drop_policy_does_not_send_to_dlq() {
4314 let mut bus = Bus::new();
4315
4316 let dlq_sink = MockDlqSink {
4317 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4318 };
4319
4320 let axon = Axon::<i32, i32, String>::start("DropDlq")
4321 .then(AlwaysFault)
4322 .with_dlq_policy(DlqPolicy::Drop)
4323 .with_dlq_sink(dlq_sink.clone());
4324 let outcome = axon.execute(1, &(), &mut bus).await;
4325
4326 assert!(matches!(outcome, Outcome::Fault(_)));
4327
4328 let letters = dlq_sink.letters.lock().await;
4330 assert!(letters.is_empty());
4331 }
4332
4333 #[tokio::test]
4334 async fn dynamic_policy_hot_reload_changes_dlq_behavior() {
4335 use ranvier_core::policy::DynamicPolicy;
4336
4337 let (tx, dynamic) = DynamicPolicy::new(DlqPolicy::Drop);
4339 let dlq_sink = MockDlqSink {
4340 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4341 };
4342
4343 let axon = Axon::<i32, i32, String>::start("DynamicDlq")
4344 .then(AlwaysFault)
4345 .with_dynamic_dlq_policy(dynamic)
4346 .with_dlq_sink(dlq_sink.clone());
4347
4348 let mut bus = Bus::new();
4350 let outcome = axon.execute(1, &(), &mut bus).await;
4351 assert!(matches!(outcome, Outcome::Fault(_)));
4352 assert!(
4353 dlq_sink.letters.lock().await.is_empty(),
4354 "Drop policy should produce no DLQ entries"
4355 );
4356
4357 tx.send(DlqPolicy::SendToDlq).unwrap();
4359
4360 let mut bus2 = Bus::new();
4362 let outcome2 = axon.execute(2, &(), &mut bus2).await;
4363 assert!(matches!(outcome2, Outcome::Fault(_)));
4364 assert_eq!(
4365 dlq_sink.letters.lock().await.len(),
4366 1,
4367 "SendToDlq policy should produce 1 DLQ entry"
4368 );
4369 }
4370
4371 #[tokio::test]
4372 async fn dynamic_saga_policy_hot_reload() {
4373 use ranvier_core::policy::DynamicPolicy;
4374 use ranvier_core::saga::SagaPolicy;
4375
4376 let (tx, dynamic) = DynamicPolicy::new(SagaPolicy::Disabled);
4378
4379 let axon = Axon::<i32, i32, TestInfallible>::start("DynamicSaga")
4380 .then(AddOne)
4381 .with_dynamic_saga_policy(dynamic);
4382
4383 let mut bus = Bus::new();
4385 let _outcome = axon.execute(1, &(), &mut bus).await;
4386 assert!(
4387 bus.read::<SagaStack>().is_none() || bus.read::<SagaStack>().unwrap().is_empty(),
4388 "SagaStack should be absent or empty when disabled"
4389 );
4390
4391 tx.send(SagaPolicy::Enabled).unwrap();
4393
4394 let mut bus2 = Bus::new();
4396 let _outcome2 = axon.execute(10, &(), &mut bus2).await;
4397 assert!(
4398 bus2.read::<SagaStack>().is_some(),
4399 "SagaStack should exist when saga is enabled"
4400 );
4401 }
4402
4403 mod iam_tests {
4406 use super::*;
4407 use ranvier_core::iam::{IamError, IamIdentity, IamPolicy, IamToken, IamVerifier};
4408
4409 #[derive(Clone)]
4411 struct MockVerifier {
4412 identity: IamIdentity,
4413 should_fail: bool,
4414 }
4415
4416 #[async_trait]
4417 impl IamVerifier for MockVerifier {
4418 async fn verify(&self, _token: &str) -> Result<IamIdentity, IamError> {
4419 if self.should_fail {
4420 Err(IamError::InvalidToken("mock verification failure".into()))
4421 } else {
4422 Ok(self.identity.clone())
4423 }
4424 }
4425 }
4426
4427 #[tokio::test]
4428 async fn iam_require_identity_passes_with_valid_token() {
4429 let verifier = MockVerifier {
4430 identity: IamIdentity::new("alice").with_role("user"),
4431 should_fail: false,
4432 };
4433
4434 let axon = Axon::<i32, i32, TestInfallible>::new("IamTest")
4435 .with_iam(IamPolicy::RequireIdentity, verifier)
4436 .then(AddOne);
4437
4438 let mut bus = Bus::new();
4439 bus.insert(IamToken("valid-token".to_string()));
4440 let outcome = axon.execute(10, &(), &mut bus).await;
4441
4442 assert!(matches!(outcome, Outcome::Next(11)));
4443 let identity = bus
4445 .read::<IamIdentity>()
4446 .expect("IamIdentity should be in Bus");
4447 assert_eq!(identity.subject, "alice");
4448 }
4449
4450 #[tokio::test]
4451 async fn iam_require_identity_rejects_missing_token() {
4452 let verifier = MockVerifier {
4453 identity: IamIdentity::new("ignored"),
4454 should_fail: false,
4455 };
4456
4457 let axon = Axon::<i32, i32, TestInfallible>::new("IamNoToken")
4458 .with_iam(IamPolicy::RequireIdentity, verifier)
4459 .then(AddOne);
4460
4461 let mut bus = Bus::new();
4462 let outcome = axon.execute(10, &(), &mut bus).await;
4464
4465 match &outcome {
4467 Outcome::Emit(label, _) => {
4468 assert_eq!(label, "iam.missing_token");
4469 }
4470 other => panic!("Expected Emit(iam.missing_token), got {:?}", other),
4471 }
4472 }
4473
4474 #[tokio::test]
4475 async fn iam_rejects_failed_verification() {
4476 let verifier = MockVerifier {
4477 identity: IamIdentity::new("ignored"),
4478 should_fail: true,
4479 };
4480
4481 let axon = Axon::<i32, i32, TestInfallible>::new("IamBadToken")
4482 .with_iam(IamPolicy::RequireIdentity, verifier)
4483 .then(AddOne);
4484
4485 let mut bus = Bus::new();
4486 bus.insert(IamToken("bad-token".to_string()));
4487 let outcome = axon.execute(10, &(), &mut bus).await;
4488
4489 match &outcome {
4490 Outcome::Emit(label, _) => {
4491 assert_eq!(label, "iam.verification_failed");
4492 }
4493 other => panic!("Expected Emit(iam.verification_failed), got {:?}", other),
4494 }
4495 }
4496
4497 #[tokio::test]
4498 async fn iam_require_role_passes_with_matching_role() {
4499 let verifier = MockVerifier {
4500 identity: IamIdentity::new("bob").with_role("admin").with_role("user"),
4501 should_fail: false,
4502 };
4503
4504 let axon = Axon::<i32, i32, TestInfallible>::new("IamRole")
4505 .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
4506 .then(AddOne);
4507
4508 let mut bus = Bus::new();
4509 bus.insert(IamToken("token".to_string()));
4510 let outcome = axon.execute(5, &(), &mut bus).await;
4511
4512 assert!(matches!(outcome, Outcome::Next(6)));
4513 }
4514
4515 #[tokio::test]
4516 async fn iam_require_role_denies_without_role() {
4517 let verifier = MockVerifier {
4518 identity: IamIdentity::new("carol").with_role("user"),
4519 should_fail: false,
4520 };
4521
4522 let axon = Axon::<i32, i32, TestInfallible>::new("IamRoleDeny")
4523 .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
4524 .then(AddOne);
4525
4526 let mut bus = Bus::new();
4527 bus.insert(IamToken("token".to_string()));
4528 let outcome = axon.execute(5, &(), &mut bus).await;
4529
4530 match &outcome {
4531 Outcome::Emit(label, _) => {
4532 assert_eq!(label, "iam.policy_denied");
4533 }
4534 other => panic!("Expected Emit(iam.policy_denied), got {:?}", other),
4535 }
4536 }
4537
4538 #[tokio::test]
4539 async fn iam_policy_none_skips_verification() {
4540 let verifier = MockVerifier {
4541 identity: IamIdentity::new("ignored"),
4542 should_fail: true, };
4544
4545 let axon = Axon::<i32, i32, TestInfallible>::new("IamNone")
4546 .with_iam(IamPolicy::None, verifier)
4547 .then(AddOne);
4548
4549 let mut bus = Bus::new();
4550 let outcome = axon.execute(10, &(), &mut bus).await;
4552
4553 assert!(matches!(outcome, Outcome::Next(11)));
4554 }
4555 }
4556
4557 #[derive(Clone)]
4560 struct SchemaTransition;
4561
4562 #[async_trait]
4563 impl Transition<String, String> for SchemaTransition {
4564 type Error = String;
4565 type Resources = ();
4566
4567 fn input_schema(&self) -> Option<serde_json::Value> {
4568 Some(serde_json::json!({
4569 "type": "object",
4570 "required": ["name"],
4571 "properties": {
4572 "name": { "type": "string" }
4573 }
4574 }))
4575 }
4576
4577 async fn run(
4578 &self,
4579 state: String,
4580 _resources: &Self::Resources,
4581 _bus: &mut Bus,
4582 ) -> Outcome<String, Self::Error> {
4583 Outcome::Next(state)
4584 }
4585 }
4586
4587 #[test]
4588 fn then_auto_populates_input_schema_from_transition() {
4589 let axon = Axon::<String, String, String>::new("SchemaTest").then(SchemaTransition);
4590
4591 let last_node = axon.schematic.nodes.last().unwrap();
4593 assert!(last_node.input_schema.is_some());
4594 let schema = last_node.input_schema.as_ref().unwrap();
4595 assert_eq!(schema["type"], "object");
4596 assert_eq!(schema["required"][0], "name");
4597 }
4598
4599 #[test]
4600 fn then_leaves_input_schema_none_when_not_provided() {
4601 let axon = Axon::<i32, i32, TestInfallible>::new("NoSchema").then(AddOne);
4602
4603 let last_node = axon.schematic.nodes.last().unwrap();
4604 assert!(last_node.input_schema.is_none());
4605 }
4606
4607 #[test]
4608 fn with_input_schema_value_sets_on_last_node() {
4609 let schema = serde_json::json!({"type": "integer"});
4610 let axon = Axon::<i32, i32, TestInfallible>::new("ManualSchema")
4611 .then(AddOne)
4612 .with_input_schema_value(schema.clone());
4613
4614 let last_node = axon.schematic.nodes.last().unwrap();
4615 assert_eq!(last_node.input_schema.as_ref().unwrap(), &schema);
4616 }
4617
4618 #[test]
4619 fn with_output_schema_value_sets_on_last_node() {
4620 let schema = serde_json::json!({"type": "integer"});
4621 let axon = Axon::<i32, i32, TestInfallible>::new("OutputSchema")
4622 .then(AddOne)
4623 .with_output_schema_value(schema.clone());
4624
4625 let last_node = axon.schematic.nodes.last().unwrap();
4626 assert_eq!(last_node.output_schema.as_ref().unwrap(), &schema);
4627 }
4628
4629 #[test]
4630 fn schematic_export_includes_schema_fields() {
4631 let axon = Axon::<String, String, String>::new("ExportTest")
4632 .then(SchemaTransition)
4633 .with_output_schema_value(serde_json::json!({"type": "string"}));
4634
4635 let json = serde_json::to_value(&axon.schematic).unwrap();
4636 let nodes = json["nodes"].as_array().unwrap();
4637 let last = nodes.last().unwrap();
4639 assert!(last.get("input_schema").is_some());
4640 assert_eq!(last["input_schema"]["type"], "object");
4641 assert_eq!(last["output_schema"]["type"], "string");
4642 }
4643
4644 #[test]
4645 fn schematic_export_omits_schema_fields_when_none() {
4646 let axon = Axon::<i32, i32, TestInfallible>::new("NoSchemaExport").then(AddOne);
4647
4648 let json = serde_json::to_value(&axon.schematic).unwrap();
4649 let nodes = json["nodes"].as_array().unwrap();
4650 let last = nodes.last().unwrap();
4651 let obj = last.as_object().unwrap();
4652 assert!(!obj.contains_key("input_schema"));
4653 assert!(!obj.contains_key("output_schema"));
4654 }
4655
4656 #[test]
4657 fn schematic_json_roundtrip_preserves_schemas() {
4658 let axon = Axon::<String, String, String>::new("Roundtrip")
4659 .then(SchemaTransition)
4660 .with_output_schema_value(serde_json::json!({"type": "string"}));
4661
4662 let json_str = serde_json::to_string(&axon.schematic).unwrap();
4663 let deserialized: ranvier_core::schematic::Schematic =
4664 serde_json::from_str(&json_str).unwrap();
4665
4666 let last = deserialized.nodes.last().unwrap();
4667 assert!(last.input_schema.is_some());
4668 assert!(last.output_schema.is_some());
4669 assert_eq!(last.input_schema.as_ref().unwrap()["required"][0], "name");
4670 assert_eq!(last.output_schema.as_ref().unwrap()["type"], "string");
4671 }
4672
4673 #[derive(Clone)]
4675 struct MultiplyByTwo;
4676
4677 #[async_trait]
4678 impl Transition<i32, i32> for MultiplyByTwo {
4679 type Error = TestInfallible;
4680 type Resources = ();
4681
4682 async fn run(
4683 &self,
4684 state: i32,
4685 _resources: &Self::Resources,
4686 _bus: &mut Bus,
4687 ) -> Outcome<i32, Self::Error> {
4688 Outcome::Next(state * 2)
4689 }
4690 }
4691
4692 #[derive(Clone)]
4693 struct AddTen;
4694
4695 #[async_trait]
4696 impl Transition<i32, i32> for AddTen {
4697 type Error = TestInfallible;
4698 type Resources = ();
4699
4700 async fn run(
4701 &self,
4702 state: i32,
4703 _resources: &Self::Resources,
4704 _bus: &mut Bus,
4705 ) -> Outcome<i32, Self::Error> {
4706 Outcome::Next(state + 10)
4707 }
4708 }
4709
4710 #[derive(Clone)]
4711 struct AddOneString;
4712
4713 #[async_trait]
4714 impl Transition<i32, i32> for AddOneString {
4715 type Error = String;
4716 type Resources = ();
4717
4718 async fn run(
4719 &self,
4720 state: i32,
4721 _resources: &Self::Resources,
4722 _bus: &mut Bus,
4723 ) -> Outcome<i32, Self::Error> {
4724 Outcome::Next(state + 1)
4725 }
4726 }
4727
4728 #[derive(Clone)]
4729 struct AddTenString;
4730
4731 #[async_trait]
4732 impl Transition<i32, i32> for AddTenString {
4733 type Error = String;
4734 type Resources = ();
4735
4736 async fn run(
4737 &self,
4738 state: i32,
4739 _resources: &Self::Resources,
4740 _bus: &mut Bus,
4741 ) -> Outcome<i32, Self::Error> {
4742 Outcome::Next(state + 10)
4743 }
4744 }
4745
4746 #[tokio::test]
4747 async fn axon_single_step_chain_executes_and_returns_next() {
4748 let mut bus = Bus::new();
4749 let axon = Axon::<i32, i32, TestInfallible>::start("SingleStep").then(AddOne);
4750
4751 let outcome = axon.execute(5, &(), &mut bus).await;
4752 assert!(matches!(outcome, Outcome::Next(6)));
4753 }
4754
4755 #[tokio::test]
4756 async fn axon_three_step_chain_executes_in_order() {
4757 let mut bus = Bus::new();
4758 let axon = Axon::<i32, i32, TestInfallible>::start("ThreeStep")
4759 .then(AddOne)
4760 .then(MultiplyByTwo)
4761 .then(AddTen);
4762
4763 let outcome = axon.execute(5, &(), &mut bus).await;
4765 assert!(matches!(outcome, Outcome::Next(22)));
4766 }
4767
4768 #[tokio::test]
4769 async fn axon_with_fault_in_middle_step_propagates_error() {
4770 let mut bus = Bus::new();
4771
4772 let axon = Axon::<i32, i32, String>::start("FaultInMiddle")
4777 .then(AddOneString)
4778 .then(AlwaysFault)
4779 .then(AddTenString);
4780
4781 let outcome = axon.execute(5, &(), &mut bus).await;
4782 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
4783 }
4784
4785 #[test]
4786 fn axon_schematic_has_correct_node_count_after_chaining() {
4787 let axon = Axon::<i32, i32, TestInfallible>::start("NodeCount")
4788 .then(AddOne)
4789 .then(MultiplyByTwo)
4790 .then(AddTen);
4791
4792 assert_eq!(axon.schematic.nodes.len(), 4);
4794 assert_eq!(axon.schematic.name, "NodeCount");
4795 }
4796
4797 #[tokio::test]
4798 async fn axon_execution_records_timeline_events() {
4799 let mut bus = Bus::new();
4800 bus.insert(Timeline::new());
4801
4802 let axon = Axon::<i32, i32, TestInfallible>::start("TimelineTest")
4803 .then(AddOne)
4804 .then(MultiplyByTwo);
4805
4806 let outcome = axon.execute(3, &(), &mut bus).await;
4807 assert!(matches!(outcome, Outcome::Next(8))); let timeline = bus.read::<Timeline>().unwrap();
4810
4811 let enter_count = timeline
4813 .events
4814 .iter()
4815 .filter(|e| matches!(e, TimelineEvent::NodeEnter { .. }))
4816 .count();
4817 let exit_count = timeline
4818 .events
4819 .iter()
4820 .filter(|e| matches!(e, TimelineEvent::NodeExit { .. }))
4821 .count();
4822
4823 assert!(enter_count >= 1, "Should have at least 1 NodeEnter event");
4825 assert!(exit_count >= 1, "Should have at least 1 NodeExit event");
4826 }
4827
4828 #[tokio::test]
4831 async fn parallel_all_succeed_returns_first_next() {
4832 use super::ParallelStrategy;
4833
4834 let mut bus = Bus::new();
4835 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelAllSucceed")
4836 .parallel(
4837 vec![
4838 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4839 Arc::new(MultiplyByTwo),
4840 ],
4841 ParallelStrategy::AllMustSucceed,
4842 );
4843
4844 let outcome = axon.execute(5, &(), &mut bus).await;
4847 assert!(matches!(outcome, Outcome::Next(6)));
4848 }
4849
4850 #[tokio::test]
4851 async fn parallel_all_must_succeed_returns_fault_when_any_fails() {
4852 use super::ParallelStrategy;
4853
4854 let mut bus = Bus::new();
4855 let axon = Axon::<i32, i32, String>::start("ParallelAllFault")
4856 .parallel(
4857 vec![
4858 Arc::new(AddOneString) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4859 Arc::new(AlwaysFault),
4860 ],
4861 ParallelStrategy::AllMustSucceed,
4862 );
4863
4864 let outcome = axon.execute(5, &(), &mut bus).await;
4865 assert!(
4866 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4867 "Expected Fault(boom), got {:?}",
4868 outcome
4869 );
4870 }
4871
4872 #[tokio::test]
4873 async fn parallel_any_can_fail_returns_success_despite_fault() {
4874 use super::ParallelStrategy;
4875
4876 let mut bus = Bus::new();
4877 let axon = Axon::<i32, i32, String>::start("ParallelAnyCanFail")
4878 .parallel(
4879 vec![
4880 Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4881 Arc::new(AddOneString),
4882 ],
4883 ParallelStrategy::AnyCanFail,
4884 );
4885
4886 let outcome = axon.execute(5, &(), &mut bus).await;
4888 assert!(
4889 matches!(outcome, Outcome::Next(6)),
4890 "Expected Next(6), got {:?}",
4891 outcome
4892 );
4893 }
4894
4895 #[tokio::test]
4896 async fn parallel_any_can_fail_all_fault_returns_first_fault() {
4897 use super::ParallelStrategy;
4898
4899 #[derive(Clone)]
4900 struct AlwaysFault2;
4901 #[async_trait]
4902 impl Transition<i32, i32> for AlwaysFault2 {
4903 type Error = String;
4904 type Resources = ();
4905 async fn run(&self, _state: i32, _resources: &(), _bus: &mut Bus) -> Outcome<i32, String> {
4906 Outcome::Fault("boom2".to_string())
4907 }
4908 }
4909
4910 let mut bus = Bus::new();
4911 let axon = Axon::<i32, i32, String>::start("ParallelAllFault2")
4912 .parallel(
4913 vec![
4914 Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4915 Arc::new(AlwaysFault2),
4916 ],
4917 ParallelStrategy::AnyCanFail,
4918 );
4919
4920 let outcome = axon.execute(5, &(), &mut bus).await;
4921 assert!(
4923 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4924 "Expected Fault(boom), got {:?}",
4925 outcome
4926 );
4927 }
4928
4929 #[test]
4930 fn parallel_schematic_has_fanout_fanin_nodes() {
4931 use super::ParallelStrategy;
4932 use ranvier_core::schematic::{EdgeType, NodeKind};
4933
4934 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelSchematic")
4935 .parallel(
4936 vec![
4937 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4938 Arc::new(MultiplyByTwo),
4939 Arc::new(AddTen),
4940 ],
4941 ParallelStrategy::AllMustSucceed,
4942 );
4943
4944 assert_eq!(axon.schematic.nodes.len(), 6);
4946 assert!(matches!(axon.schematic.nodes[1].kind, NodeKind::FanOut));
4947 assert!(matches!(axon.schematic.nodes[2].kind, NodeKind::Atom));
4948 assert!(matches!(axon.schematic.nodes[3].kind, NodeKind::Atom));
4949 assert!(matches!(axon.schematic.nodes[4].kind, NodeKind::Atom));
4950 assert!(matches!(axon.schematic.nodes[5].kind, NodeKind::FanIn));
4951
4952 assert!(axon.schematic.nodes[1]
4954 .description
4955 .as_ref()
4956 .unwrap()
4957 .contains("3 branches"));
4958
4959 let parallel_edges: Vec<_> = axon
4961 .schematic
4962 .edges
4963 .iter()
4964 .filter(|e| matches!(e.kind, EdgeType::Parallel))
4965 .collect();
4966 assert_eq!(parallel_edges.len(), 6);
4968 }
4969
4970 #[tokio::test]
4971 async fn parallel_then_chain_composes_correctly() {
4972 use super::ParallelStrategy;
4973
4974 let mut bus = Bus::new();
4975 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelThenChain")
4976 .then(AddOne)
4977 .parallel(
4978 vec![
4979 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4980 Arc::new(MultiplyByTwo),
4981 ],
4982 ParallelStrategy::AllMustSucceed,
4983 )
4984 .then(AddTen);
4985
4986 let outcome = axon.execute(5, &(), &mut bus).await;
4988 assert!(
4989 matches!(outcome, Outcome::Next(17)),
4990 "Expected Next(17), got {:?}",
4991 outcome
4992 );
4993 }
4994
4995 #[tokio::test]
4996 async fn parallel_records_timeline_events() {
4997 use super::ParallelStrategy;
4998 use ranvier_core::timeline::TimelineEvent;
4999
5000 let mut bus = Bus::new();
5001 bus.insert(Timeline::new());
5002
5003 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelTimeline")
5004 .parallel(
5005 vec![
5006 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
5007 Arc::new(MultiplyByTwo),
5008 ],
5009 ParallelStrategy::AllMustSucceed,
5010 );
5011
5012 let outcome = axon.execute(3, &(), &mut bus).await;
5013 assert!(matches!(outcome, Outcome::Next(4)));
5014
5015 let timeline = bus.read::<Timeline>().unwrap();
5016
5017 let fanout_enters = timeline
5019 .events
5020 .iter()
5021 .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanOut"))
5022 .count();
5023 let fanin_enters = timeline
5024 .events
5025 .iter()
5026 .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanIn"))
5027 .count();
5028
5029 assert_eq!(fanout_enters, 1, "Should have 1 FanOut enter");
5030 assert_eq!(fanin_enters, 1, "Should have 1 FanIn enter");
5031 }
5032
5033 #[derive(Clone)]
5036 struct Greet;
5037
5038 #[async_trait]
5039 impl Transition<(), String> for Greet {
5040 type Error = String;
5041 type Resources = ();
5042
5043 async fn run(
5044 &self,
5045 _state: (),
5046 _resources: &Self::Resources,
5047 _bus: &mut Bus,
5048 ) -> Outcome<String, Self::Error> {
5049 Outcome::Next("Hello from simple!".to_string())
5050 }
5051 }
5052
5053 #[tokio::test]
5054 async fn axon_simple_creates_pipeline() {
5055 let axon = Axon::simple::<String>("SimpleTest").then(Greet);
5056
5057 let mut bus = Bus::new();
5058 let result = axon.execute((), &(), &mut bus).await;
5059
5060 match result {
5061 Outcome::Next(msg) => assert_eq!(msg, "Hello from simple!"),
5062 other => panic!("Expected Outcome::Next, got {:?}", other),
5063 }
5064 }
5065
5066 #[tokio::test]
5067 async fn axon_simple_equivalent_to_explicit() {
5068 let simple = Axon::simple::<String>("Equiv").then(Greet);
5070 let explicit = Axon::<(), (), String>::new("Equiv").then(Greet);
5071
5072 let mut bus1 = Bus::new();
5073 let mut bus2 = Bus::new();
5074
5075 let r1 = simple.execute((), &(), &mut bus1).await;
5076 let r2 = explicit.execute((), &(), &mut bus2).await;
5077
5078 match (r1, r2) {
5079 (Outcome::Next(a), Outcome::Next(b)) => assert_eq!(a, b),
5080 _ => panic!("Both should produce Outcome::Next"),
5081 }
5082 }
5083}