1use crate::persistence::{
15 CompensationAutoTrigger, CompensationContext, CompensationHandle,
16 CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17 PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use async_trait::async_trait;
20use ranvier_audit::{AuditEvent, AuditSink};
21use ranvier_core::bus::Bus;
22use ranvier_core::cluster::DistributedLock;
23use ranvier_core::event::{DlqPolicy, DlqSink};
24use ranvier_core::outcome::Outcome;
25use ranvier_core::policy::DynamicPolicy;
26use ranvier_core::saga::{SagaPolicy, SagaStack};
27use ranvier_core::schematic::{
28 BusCapabilitySchema, Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation,
29};
30use ranvier_core::timeline::{Timeline, TimelineEvent};
31use ranvier_core::transition::Transition;
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use std::any::type_name;
35use std::ffi::OsString;
36use std::fs;
37use std::future::Future;
38use std::panic::Location;
39use std::path::{Path, PathBuf};
40use std::pin::Pin;
41use std::sync::{Arc, Mutex, OnceLock};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tracing::Instrument;
44
45#[derive(Clone)]
47pub enum ExecutionMode {
48 Local,
50 Singleton {
52 lock_key: String,
53 ttl_ms: u64,
54 lock_provider: Arc<dyn DistributedLock>,
55 },
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum ParallelStrategy {
63 AllMustSucceed,
65 AnyCanFail,
68}
69
70pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
72
73pub type Executor<In, Out, E, Res> =
77 Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
78
79#[derive(Debug, Clone)]
81pub struct ManualJump {
82 pub target_node: String,
83 pub payload_override: Option<serde_json::Value>,
84}
85
86#[derive(Debug, Clone, Copy)]
88struct StartStep(u64);
89
90#[derive(Debug, Clone)]
92struct ResumptionState {
93 payload: Option<serde_json::Value>,
94}
95
96fn type_name_of<T: ?Sized>() -> String {
98 let full = type_name::<T>();
99 full.split("::").last().unwrap_or(full).to_string()
100}
101
102pub struct Axon<In, Out, E, Res = ()> {
122 pub schematic: Schematic,
124 executor: Executor<In, Out, E, Res>,
126 pub execution_mode: ExecutionMode,
128 pub persistence_store: Option<Arc<dyn crate::persistence::PersistenceStore>>,
130 pub audit_sink: Option<Arc<dyn AuditSink>>,
132 pub dlq_sink: Option<Arc<dyn DlqSink>>,
134 pub dlq_policy: DlqPolicy,
136 pub dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>,
138 pub saga_policy: SagaPolicy,
140 pub dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>,
142 pub saga_compensation_registry:
144 Arc<std::sync::RwLock<ranvier_core::saga::SagaCompensationRegistry<E, Res>>>,
145 pub iam_handle: Option<ranvier_core::iam::IamHandle>,
147}
148
149#[derive(Debug, Clone)]
151pub struct SchematicExportRequest {
152 pub output: Option<PathBuf>,
154}
155
156impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
157 fn clone(&self) -> Self {
158 Self {
159 schematic: self.schematic.clone(),
160 executor: self.executor.clone(),
161 execution_mode: self.execution_mode.clone(),
162 persistence_store: self.persistence_store.clone(),
163 audit_sink: self.audit_sink.clone(),
164 dlq_sink: self.dlq_sink.clone(),
165 dlq_policy: self.dlq_policy.clone(),
166 dynamic_dlq_policy: self.dynamic_dlq_policy.clone(),
167 saga_policy: self.saga_policy.clone(),
168 dynamic_saga_policy: self.dynamic_saga_policy.clone(),
169 saga_compensation_registry: self.saga_compensation_registry.clone(),
170 iam_handle: self.iam_handle.clone(),
171 }
172 }
173}
174
175impl<In, E, Res> Axon<In, In, E, Res>
176where
177 In: Send + Sync + Serialize + DeserializeOwned + 'static,
178 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
179 Res: ranvier_core::transition::ResourceRequirement,
180{
181 #[track_caller]
184 pub fn new(label: &str) -> Self {
185 let caller = Location::caller();
186 Self::start_with_source(label, caller)
187 }
188
189 #[track_caller]
192 pub fn start(label: &str) -> Self {
193 let caller = Location::caller();
194 Self::start_with_source(label, caller)
195 }
196
197 fn start_with_source(label: &str, caller: &'static Location<'static>) -> Self {
198 let node_id = uuid::Uuid::new_v4().to_string();
199 let node = Node {
200 id: node_id,
201 kind: NodeKind::Ingress,
202 label: label.to_string(),
203 description: None,
204 input_type: "void".to_string(),
205 output_type: type_name_of::<In>(),
206 resource_type: type_name_of::<Res>(),
207 metadata: Default::default(),
208 bus_capability: None,
209 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
210 position: None,
211 compensation_node_id: None,
212 input_schema: None,
213 output_schema: None,
214 };
215
216 let mut schematic = Schematic::new(label);
217 schematic.nodes.push(node);
218
219 let executor: Executor<In, In, E, Res> =
220 Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
221
222 Self {
223 schematic,
224 executor,
225 execution_mode: ExecutionMode::Local,
226 persistence_store: None,
227 audit_sink: None,
228 dlq_sink: None,
229 dlq_policy: DlqPolicy::default(),
230 dynamic_dlq_policy: None,
231 saga_policy: SagaPolicy::default(),
232 dynamic_saga_policy: None,
233 saga_compensation_registry: Arc::new(std::sync::RwLock::new(
234 ranvier_core::saga::SagaCompensationRegistry::new(),
235 )),
236 iam_handle: None,
237 }
238 }
239}
240
241impl Axon<(), (), (), ()> {
242 #[track_caller]
257 pub fn simple<E>(label: &str) -> Axon<(), (), E, ()>
258 where
259 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
260 {
261 let caller = Location::caller();
262 <Axon<(), (), E, ()>>::start_with_source(label, caller)
263 }
264}
265
266impl<In, Out, E, Res> Axon<In, Out, E, Res>
267where
268 In: Send + Sync + Serialize + DeserializeOwned + 'static,
269 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
270 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
271 Res: ranvier_core::transition::ResourceRequirement,
272{
273 pub fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
275 self.execution_mode = mode;
276 self
277 }
278
279 pub fn with_version(mut self, version: impl Into<String>) -> Self {
281 self.schematic.schema_version = version.into();
282 self
283 }
284
285 pub fn with_persistence_store<S>(mut self, store: S) -> Self
287 where
288 S: crate::persistence::PersistenceStore + 'static,
289 {
290 self.persistence_store = Some(Arc::new(store));
291 self
292 }
293
294 pub fn with_audit_sink<S>(mut self, sink: S) -> Self
296 where
297 S: AuditSink + 'static,
298 {
299 self.audit_sink = Some(Arc::new(sink));
300 self
301 }
302
303 pub fn with_dlq_sink<S>(mut self, sink: S) -> Self
305 where
306 S: DlqSink + 'static,
307 {
308 self.dlq_sink = Some(Arc::new(sink));
309 self
310 }
311
312 pub fn with_dlq_policy(mut self, policy: DlqPolicy) -> Self {
314 self.dlq_policy = policy;
315 self
316 }
317
318 pub fn with_saga_policy(mut self, policy: SagaPolicy) -> Self {
320 self.saga_policy = policy;
321 self
322 }
323
324 pub fn with_dynamic_dlq_policy(mut self, dynamic: DynamicPolicy<DlqPolicy>) -> Self {
327 self.dynamic_dlq_policy = Some(dynamic);
328 self
329 }
330
331 pub fn with_dynamic_saga_policy(mut self, dynamic: DynamicPolicy<SagaPolicy>) -> Self {
334 self.dynamic_saga_policy = Some(dynamic);
335 self
336 }
337
338 pub fn with_iam(
346 mut self,
347 policy: ranvier_core::iam::IamPolicy,
348 verifier: impl ranvier_core::iam::IamVerifier + 'static,
349 ) -> Self {
350 self.iam_handle = Some(ranvier_core::iam::IamHandle::new(
351 policy,
352 Arc::new(verifier),
353 ));
354 self
355 }
356
357 #[cfg(feature = "schema")]
368 pub fn with_input_schema<T>(mut self) -> Self
369 where
370 T: schemars::JsonSchema,
371 {
372 if let Some(last_node) = self.schematic.nodes.last_mut() {
373 let schema = schemars::schema_for!(T);
374 last_node.input_schema =
375 Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
376 }
377 self
378 }
379
380 #[cfg(feature = "schema")]
384 pub fn with_output_schema<T>(mut self) -> Self
385 where
386 T: schemars::JsonSchema,
387 {
388 if let Some(last_node) = self.schematic.nodes.last_mut() {
389 let schema = schemars::schema_for!(T);
390 last_node.output_schema =
391 Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
392 }
393 self
394 }
395
396 pub fn with_input_schema_value(mut self, schema: serde_json::Value) -> Self {
400 if let Some(last_node) = self.schematic.nodes.last_mut() {
401 last_node.input_schema = Some(schema);
402 }
403 self
404 }
405
406 pub fn with_output_schema_value(mut self, schema: serde_json::Value) -> Self {
408 if let Some(last_node) = self.schematic.nodes.last_mut() {
409 last_node.output_schema = Some(schema);
410 }
411 self
412 }
413}
414
415#[async_trait]
416impl<In, Out, E, Res> ranvier_inspector::StateInspector for Axon<In, Out, E, Res>
417where
418 In: Send + Sync + Serialize + DeserializeOwned + 'static,
419 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
420 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
421 Res: ranvier_core::transition::ResourceRequirement,
422{
423 async fn get_state(&self, trace_id: &str) -> Option<serde_json::Value> {
424 let store = self.persistence_store.as_ref()?;
425 let trace = store.load(trace_id).await.ok().flatten()?;
426 Some(serde_json::to_value(trace).unwrap_or(serde_json::Value::Null))
427 }
428
429 async fn force_resume(
430 &self,
431 trace_id: &str,
432 target_node: &str,
433 payload_override: Option<Value>,
434 ) -> Result<(), String> {
435 let store = self
436 .persistence_store
437 .as_ref()
438 .ok_or("No persistence store attached to Axon")?;
439
440 let intervention = crate::persistence::Intervention {
441 target_node: target_node.to_string(),
442 payload_override,
443 timestamp_ms: now_ms(),
444 };
445
446 store
447 .save_intervention(trace_id, intervention)
448 .await
449 .map_err(|e| format!("Failed to save intervention: {}", e))?;
450
451 if let Some(sink) = self.audit_sink.as_ref() {
452 let event = AuditEvent::new(
453 uuid::Uuid::new_v4().to_string(),
454 "Inspector".to_string(),
455 "ForceResume".to_string(),
456 trace_id.to_string(),
457 )
458 .with_metadata("target_node", target_node);
459
460 let _ = sink.append(&event).await;
461 }
462
463 tracing::info!(trace_id = %trace_id, target_node = %target_node, "Force resume requested via Inspector");
464 Ok(())
465 }
466}
467
468impl<In, Out, E, Res> Axon<In, Out, E, Res>
469where
470 In: Send + Sync + Serialize + DeserializeOwned + 'static,
471 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
472 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
473 Res: ranvier_core::transition::ResourceRequirement,
474{
475 #[track_caller]
479 pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
480 where
481 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
482 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
483 {
484 let caller = Location::caller();
485 let Axon {
487 mut schematic,
488 executor: prev_executor,
489 execution_mode,
490 persistence_store,
491 audit_sink,
492 dlq_sink,
493 dlq_policy,
494 dynamic_dlq_policy,
495 saga_policy,
496 dynamic_saga_policy,
497 saga_compensation_registry,
498 iam_handle,
499 } = self;
500
501 let next_node_id = uuid::Uuid::new_v4().to_string();
503 let next_node = Node {
504 id: next_node_id.clone(),
505 kind: NodeKind::Atom,
506 label: transition.label(),
507 description: transition.description(),
508 input_type: type_name_of::<Out>(),
509 output_type: type_name_of::<Next>(),
510 resource_type: type_name_of::<Res>(),
511 metadata: Default::default(),
512 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
513 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
514 position: transition
515 .position()
516 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
517 compensation_node_id: None,
518 input_schema: transition.input_schema(),
519 output_schema: None,
520 };
521
522 let last_node_id = schematic
523 .nodes
524 .last()
525 .map(|n| n.id.clone())
526 .unwrap_or_default();
527
528 schematic.nodes.push(next_node);
529 schematic.edges.push(Edge {
530 from: last_node_id,
531 to: next_node_id.clone(),
532 kind: EdgeType::Linear,
533 label: Some("Next".to_string()),
534 });
535
536 let node_id_for_exec = next_node_id.clone();
538 let node_label_for_exec = transition.label();
539 let bus_policy_for_exec = transition.bus_access_policy();
540 let bus_policy_clone = bus_policy_for_exec.clone();
541 let current_step_idx = schematic.nodes.len() as u64 - 1;
542 let next_executor: Executor<In, Next, E, Res> = Arc::new(
543 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
544 let prev = prev_executor.clone();
545 let trans = transition.clone();
546 let timeline_node_id = node_id_for_exec.clone();
547 let timeline_node_label = node_label_for_exec.clone();
548 let transition_bus_policy = bus_policy_clone.clone();
549 let step_idx = current_step_idx;
550
551 Box::pin(async move {
552 if let Some(jump) = bus.read::<ManualJump>()
554 && (jump.target_node == timeline_node_id
555 || jump.target_node == timeline_node_label)
556 {
557 tracing::info!(
558 node_id = %timeline_node_id,
559 node_label = %timeline_node_label,
560 "Manual jump target reached; skipping previous steps"
561 );
562
563 let state = if let Some(ow) = jump.payload_override.clone() {
564 match serde_json::from_value::<Out>(ow) {
565 Ok(s) => s,
566 Err(e) => {
567 tracing::error!(
568 "Payload override deserialization failed: {}",
569 e
570 );
571 return Outcome::emit(
572 "execution.jump.payload_error",
573 Some(serde_json::json!({"error": e.to_string()})),
574 )
575 .map(|_: ()| unreachable!());
576 }
577 }
578 } else {
579 return Outcome::emit(
583 "execution.jump.missing_payload",
584 Some(serde_json::json!({"node_id": timeline_node_id})),
585 );
586 };
587
588 return run_this_step::<Out, Next, E, Res>(
590 &trans,
591 state,
592 res,
593 bus,
594 &timeline_node_id,
595 &timeline_node_label,
596 &transition_bus_policy,
597 step_idx,
598 )
599 .await;
600 }
601
602 if let Some(start) = bus.read::<StartStep>()
604 && step_idx == start.0
605 && bus.read::<ResumptionState>().is_some()
606 {
607 let fresh_state = serde_json::to_value(&input)
611 .ok()
612 .and_then(|v| serde_json::from_value::<Out>(v).ok());
613 let persisted_state = bus
614 .read::<ResumptionState>()
615 .and_then(|r| r.payload.clone())
616 .and_then(|p| serde_json::from_value::<Out>(p).ok());
617
618 if let Some(s) = fresh_state.or(persisted_state) {
619 tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint");
620 return run_this_step::<Out, Next, E, Res>(
621 &trans,
622 s,
623 res,
624 bus,
625 &timeline_node_id,
626 &timeline_node_label,
627 &transition_bus_policy,
628 step_idx,
629 )
630 .await;
631 }
632
633 return Outcome::emit(
634 "execution.resumption.payload_error",
635 Some(serde_json::json!({"error": "no compatible resumption state"})),
636 )
637 .map(|_: ()| unreachable!());
638 }
639
640 let prev_result = prev(input, res, bus).await;
642
643 let state = match prev_result {
645 Outcome::Next(t) => t,
646 other => return other.map(|_| unreachable!()),
647 };
648
649 run_this_step::<Out, Next, E, Res>(
650 &trans,
651 state,
652 res,
653 bus,
654 &timeline_node_id,
655 &timeline_node_label,
656 &transition_bus_policy,
657 step_idx,
658 )
659 .await
660 })
661 },
662 );
663 Axon {
664 schematic,
665 executor: next_executor,
666 execution_mode,
667 persistence_store,
668 audit_sink,
669 dlq_sink,
670 dlq_policy,
671 dynamic_dlq_policy,
672 saga_policy,
673 dynamic_saga_policy,
674 saga_compensation_registry,
675 iam_handle,
676 }
677 }
678
679 #[track_caller]
695 pub fn then_with_retry<Next, Trans>(
696 self,
697 transition: Trans,
698 policy: crate::retry::RetryPolicy,
699 ) -> Axon<In, Next, E, Res>
700 where
701 Out: Clone,
702 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
703 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
704 {
705 let caller = Location::caller();
706 let Axon {
707 mut schematic,
708 executor: prev_executor,
709 execution_mode,
710 persistence_store,
711 audit_sink,
712 dlq_sink,
713 dlq_policy,
714 dynamic_dlq_policy,
715 saga_policy,
716 dynamic_saga_policy,
717 saga_compensation_registry,
718 iam_handle,
719 } = self;
720
721 let next_node_id = uuid::Uuid::new_v4().to_string();
722 let next_node = Node {
723 id: next_node_id.clone(),
724 kind: NodeKind::Atom,
725 label: transition.label(),
726 description: transition.description(),
727 input_type: type_name_of::<Out>(),
728 output_type: type_name_of::<Next>(),
729 resource_type: type_name_of::<Res>(),
730 metadata: Default::default(),
731 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
732 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
733 position: transition
734 .position()
735 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
736 compensation_node_id: None,
737 input_schema: transition.input_schema(),
738 output_schema: None,
739 };
740
741 let last_node_id = schematic
742 .nodes
743 .last()
744 .map(|n| n.id.clone())
745 .unwrap_or_default();
746
747 schematic.nodes.push(next_node);
748 schematic.edges.push(Edge {
749 from: last_node_id,
750 to: next_node_id.clone(),
751 kind: EdgeType::Linear,
752 label: Some("Next (retryable)".to_string()),
753 });
754
755 let node_id_for_exec = next_node_id.clone();
756 let node_label_for_exec = transition.label();
757 let bus_policy_for_exec = transition.bus_access_policy();
758 let bus_policy_clone = bus_policy_for_exec.clone();
759 let current_step_idx = schematic.nodes.len() as u64 - 1;
760 let next_executor: Executor<In, Next, E, Res> = Arc::new(
761 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
762 let prev = prev_executor.clone();
763 let trans = transition.clone();
764 let timeline_node_id = node_id_for_exec.clone();
765 let timeline_node_label = node_label_for_exec.clone();
766 let transition_bus_policy = bus_policy_clone.clone();
767 let step_idx = current_step_idx;
768 let retry_policy = policy.clone();
769
770 Box::pin(async move {
771 let prev_result = prev(input, res, bus).await;
773 let state = match prev_result {
774 Outcome::Next(t) => t,
775 other => return other.map(|_| unreachable!()),
776 };
777
778 let mut last_result = None;
780 for attempt in 0..=retry_policy.max_retries {
781 let attempt_state = state.clone();
782
783 let result = run_this_step::<Out, Next, E, Res>(
784 &trans,
785 attempt_state,
786 res,
787 bus,
788 &timeline_node_id,
789 &timeline_node_label,
790 &transition_bus_policy,
791 step_idx,
792 )
793 .await;
794
795 match &result {
796 Outcome::Next(_) => return result,
797 Outcome::Fault(_) if attempt < retry_policy.max_retries => {
798 let delay = retry_policy.delay_for_attempt(attempt);
799 tracing::warn!(
800 node_id = %timeline_node_id,
801 attempt = attempt + 1,
802 max = retry_policy.max_retries,
803 delay_ms = delay.as_millis() as u64,
804 "Transition failed, retrying"
805 );
806 if let Some(timeline) = bus.read_mut::<Timeline>() {
807 timeline.push(TimelineEvent::NodeRetry {
808 node_id: timeline_node_id.clone(),
809 attempt: attempt + 1,
810 max_attempts: retry_policy.max_retries,
811 backoff_ms: delay.as_millis() as u64,
812 timestamp: now_ms(),
813 });
814 }
815 tokio::time::sleep(delay).await;
816 }
817 _ => {
818 last_result = Some(result);
819 break;
820 }
821 }
822 }
823
824 last_result.unwrap_or_else(|| {
825 Outcome::emit("execution.retry.exhausted", None)
826 })
827 })
828 },
829 );
830 Axon {
831 schematic,
832 executor: next_executor,
833 execution_mode,
834 persistence_store,
835 audit_sink,
836 dlq_sink,
837 dlq_policy,
838 dynamic_dlq_policy,
839 saga_policy,
840 dynamic_saga_policy,
841 saga_compensation_registry,
842 iam_handle,
843 }
844 }
845
846 #[track_caller]
851 pub fn then_compensated<Next, Trans, Comp>(
852 self,
853 transition: Trans,
854 compensation: Comp,
855 ) -> Axon<In, Next, E, Res>
856 where
857 Out: Clone,
858 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
859 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
860 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
861 {
862 let caller = Location::caller();
863 let Axon {
864 mut schematic,
865 executor: prev_executor,
866 execution_mode,
867 persistence_store,
868 audit_sink,
869 dlq_sink,
870 dlq_policy,
871 dynamic_dlq_policy,
872 saga_policy,
873 dynamic_saga_policy,
874 saga_compensation_registry,
875 iam_handle,
876 } = self;
877
878 let next_node_id = uuid::Uuid::new_v4().to_string();
880 let comp_node_id = uuid::Uuid::new_v4().to_string();
881
882 let next_node = Node {
883 id: next_node_id.clone(),
884 kind: NodeKind::Atom,
885 label: transition.label(),
886 description: transition.description(),
887 input_type: type_name_of::<Out>(),
888 output_type: type_name_of::<Next>(),
889 resource_type: type_name_of::<Res>(),
890 metadata: Default::default(),
891 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
892 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
893 position: transition
894 .position()
895 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
896 compensation_node_id: Some(comp_node_id.clone()),
897 input_schema: None,
898 output_schema: None,
899 };
900
901 let comp_node = Node {
903 id: comp_node_id.clone(),
904 kind: NodeKind::Atom,
905 label: format!("Compensate: {}", compensation.label()),
906 description: compensation.description(),
907 input_type: type_name_of::<Out>(),
908 output_type: "void".to_string(),
909 resource_type: type_name_of::<Res>(),
910 metadata: Default::default(),
911 bus_capability: None,
912 source_location: None,
913 position: compensation
914 .position()
915 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
916 compensation_node_id: None,
917 input_schema: None,
918 output_schema: None,
919 };
920
921 let last_node_id = schematic
922 .nodes
923 .last()
924 .map(|n| n.id.clone())
925 .unwrap_or_default();
926
927 schematic.nodes.push(next_node);
928 schematic.nodes.push(comp_node);
929 schematic.edges.push(Edge {
930 from: last_node_id,
931 to: next_node_id.clone(),
932 kind: EdgeType::Linear,
933 label: Some("Next".to_string()),
934 });
935
936 let node_id_for_exec = next_node_id.clone();
938 let comp_id_for_exec = comp_node_id.clone();
939 let node_label_for_exec = transition.label();
940 let bus_policy_for_exec = transition.bus_access_policy();
941 let step_idx_for_exec = schematic.nodes.len() as u64 - 2;
942 let comp_for_exec = compensation.clone();
943 let bus_policy_for_executor = bus_policy_for_exec.clone();
944 let bus_policy_for_registry = bus_policy_for_exec.clone();
945 let next_executor: Executor<In, Next, E, Res> = Arc::new(
946 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
947 let prev = prev_executor.clone();
948 let trans = transition.clone();
949 let comp = comp_for_exec.clone();
950 let timeline_node_id = node_id_for_exec.clone();
951 let timeline_comp_id = comp_id_for_exec.clone();
952 let timeline_node_label = node_label_for_exec.clone();
953 let transition_bus_policy = bus_policy_for_executor.clone();
954 let step_idx = step_idx_for_exec;
955
956 Box::pin(async move {
957 if let Some(jump) = bus.read::<ManualJump>()
959 && (jump.target_node == timeline_node_id
960 || jump.target_node == timeline_node_label)
961 {
962 tracing::info!(
963 node_id = %timeline_node_id,
964 node_label = %timeline_node_label,
965 "Manual jump target reached (compensated); skipping previous steps"
966 );
967
968 let state = if let Some(ow) = jump.payload_override.clone() {
969 match serde_json::from_value::<Out>(ow) {
970 Ok(s) => s,
971 Err(e) => {
972 tracing::error!(
973 "Payload override deserialization failed: {}",
974 e
975 );
976 return Outcome::emit(
977 "execution.jump.payload_error",
978 Some(serde_json::json!({"error": e.to_string()})),
979 )
980 .map(|_: ()| unreachable!());
981 }
982 }
983 } else {
984 return Outcome::emit(
985 "execution.jump.missing_payload",
986 Some(serde_json::json!({"node_id": timeline_node_id})),
987 )
988 .map(|_: ()| unreachable!());
989 };
990
991 return run_this_compensated_step::<Out, Next, E, Res, Comp>(
993 &trans,
994 &comp,
995 state,
996 res,
997 bus,
998 &timeline_node_id,
999 &timeline_comp_id,
1000 &timeline_node_label,
1001 &transition_bus_policy,
1002 step_idx,
1003 )
1004 .await;
1005 }
1006
1007 if let Some(start) = bus.read::<StartStep>()
1009 && step_idx == start.0
1010 && bus.read::<ResumptionState>().is_some()
1011 {
1012 let fresh_state = serde_json::to_value(&input)
1013 .ok()
1014 .and_then(|v| serde_json::from_value::<Out>(v).ok());
1015 let persisted_state = bus
1016 .read::<ResumptionState>()
1017 .and_then(|r| r.payload.clone())
1018 .and_then(|p| serde_json::from_value::<Out>(p).ok());
1019
1020 if let Some(s) = fresh_state.or(persisted_state) {
1021 tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint (compensated)");
1022 return run_this_compensated_step::<Out, Next, E, Res, Comp>(
1023 &trans,
1024 &comp,
1025 s,
1026 res,
1027 bus,
1028 &timeline_node_id,
1029 &timeline_comp_id,
1030 &timeline_node_label,
1031 &transition_bus_policy,
1032 step_idx,
1033 )
1034 .await;
1035 }
1036
1037 return Outcome::emit(
1038 "execution.resumption.payload_error",
1039 Some(serde_json::json!({"error": "no compatible resumption state"})),
1040 )
1041 .map(|_: ()| unreachable!());
1042 }
1043
1044 let prev_result = prev(input, res, bus).await;
1046
1047 let state = match prev_result {
1049 Outcome::Next(t) => t,
1050 other => return other.map(|_| unreachable!()),
1051 };
1052
1053 run_this_compensated_step::<Out, Next, E, Res, Comp>(
1054 &trans,
1055 &comp,
1056 state,
1057 res,
1058 bus,
1059 &timeline_node_id,
1060 &timeline_comp_id,
1061 &timeline_node_label,
1062 &transition_bus_policy,
1063 step_idx,
1064 )
1065 .await
1066 })
1067 },
1068 );
1069 {
1071 let mut registry = saga_compensation_registry.write().expect("saga compensation registry lock poisoned");
1072 let comp_fn = compensation.clone();
1073 let transition_bus_policy = bus_policy_for_registry.clone();
1074
1075 let handler: ranvier_core::saga::SagaCompensationFn<E, Res> =
1076 Arc::new(move |input_data, res, bus| {
1077 let comp = comp_fn.clone();
1078 let bus_policy = transition_bus_policy.clone();
1079 Box::pin(async move {
1080 let input: Out = serde_json::from_slice(&input_data).expect("saga compensation input deserialization failed — type mismatch between snapshot and compensation handler");
1081 bus.set_access_policy(comp.label(), bus_policy);
1082 let res = comp.run(input, res, bus).await;
1083 bus.clear_access_policy();
1084 res
1085 })
1086 });
1087 registry.register(next_node_id.clone(), handler);
1088 }
1089
1090 Axon {
1091 schematic,
1092 executor: next_executor,
1093 execution_mode,
1094 persistence_store,
1095 audit_sink,
1096 dlq_sink,
1097 dlq_policy,
1098 dynamic_dlq_policy,
1099 saga_policy,
1100 dynamic_saga_policy,
1101 saga_compensation_registry,
1102 iam_handle,
1103 }
1104 }
1105
1106 #[track_caller]
1109 pub fn compensate_with<Comp>(mut self, transition: Comp) -> Self
1110 where
1111 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1112 {
1113 let caller = Location::caller();
1116 let comp_node_id = uuid::Uuid::new_v4().to_string();
1117
1118 let comp_node = Node {
1119 id: comp_node_id.clone(),
1120 kind: NodeKind::Atom,
1121 label: transition.label(),
1122 description: transition.description(),
1123 input_type: type_name_of::<Out>(),
1124 output_type: "void".to_string(),
1125 resource_type: type_name_of::<Res>(),
1126 metadata: Default::default(),
1127 bus_capability: None,
1128 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1129 position: transition
1130 .position()
1131 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1132 compensation_node_id: None,
1133 input_schema: None,
1134 output_schema: None,
1135 };
1136
1137 if let Some(last_node) = self.schematic.nodes.last_mut() {
1138 last_node.compensation_node_id = Some(comp_node_id.clone());
1139 }
1140
1141 self.schematic.nodes.push(comp_node);
1142 self
1143 }
1144
1145 #[track_caller]
1147 pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
1148 let caller = Location::caller();
1149 let branch_id_str = branch_id.into();
1150 let last_node_id = self
1151 .schematic
1152 .nodes
1153 .last()
1154 .map(|n| n.id.clone())
1155 .unwrap_or_default();
1156
1157 let branch_node = Node {
1158 id: uuid::Uuid::new_v4().to_string(),
1159 kind: NodeKind::Synapse,
1160 label: label.to_string(),
1161 description: None,
1162 input_type: type_name_of::<Out>(),
1163 output_type: type_name_of::<Out>(),
1164 resource_type: type_name_of::<Res>(),
1165 metadata: Default::default(),
1166 bus_capability: None,
1167 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1168 position: None,
1169 compensation_node_id: None,
1170 input_schema: None,
1171 output_schema: None,
1172 };
1173
1174 self.schematic.nodes.push(branch_node);
1175 self.schematic.edges.push(Edge {
1176 from: last_node_id,
1177 to: branch_id_str.clone(),
1178 kind: EdgeType::Branch(branch_id_str),
1179 label: Some("Branch".to_string()),
1180 });
1181
1182 self
1183 }
1184
1185 #[track_caller]
1222 pub fn parallel(
1223 self,
1224 transitions: Vec<Arc<dyn Transition<Out, Out, Resources = Res, Error = E> + Send + Sync>>,
1225 strategy: ParallelStrategy,
1226 ) -> Axon<In, Out, E, Res>
1227 where
1228 Out: Clone,
1229 {
1230 let caller = Location::caller();
1231 let Axon {
1232 mut schematic,
1233 executor: prev_executor,
1234 execution_mode,
1235 persistence_store,
1236 audit_sink,
1237 dlq_sink,
1238 dlq_policy,
1239 dynamic_dlq_policy,
1240 saga_policy,
1241 dynamic_saga_policy,
1242 saga_compensation_registry,
1243 iam_handle,
1244 } = self;
1245
1246 let fanout_id = uuid::Uuid::new_v4().to_string();
1248 let fanin_id = uuid::Uuid::new_v4().to_string();
1249
1250 let last_node_id = schematic
1251 .nodes
1252 .last()
1253 .map(|n| n.id.clone())
1254 .unwrap_or_default();
1255
1256 let fanout_node = Node {
1257 id: fanout_id.clone(),
1258 kind: NodeKind::FanOut,
1259 label: "FanOut".to_string(),
1260 description: Some(format!(
1261 "Parallel split ({} branches, {:?})",
1262 transitions.len(),
1263 strategy
1264 )),
1265 input_type: type_name_of::<Out>(),
1266 output_type: type_name_of::<Out>(),
1267 resource_type: type_name_of::<Res>(),
1268 metadata: Default::default(),
1269 bus_capability: None,
1270 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1271 position: None,
1272 compensation_node_id: None,
1273 input_schema: None,
1274 output_schema: None,
1275 };
1276
1277 schematic.nodes.push(fanout_node);
1278 schematic.edges.push(Edge {
1279 from: last_node_id,
1280 to: fanout_id.clone(),
1281 kind: EdgeType::Linear,
1282 label: Some("Next".to_string()),
1283 });
1284
1285 let mut branch_node_ids = Vec::with_capacity(transitions.len());
1287 for (i, trans) in transitions.iter().enumerate() {
1288 let branch_id = uuid::Uuid::new_v4().to_string();
1289 let branch_node = Node {
1290 id: branch_id.clone(),
1291 kind: NodeKind::Atom,
1292 label: trans.label(),
1293 description: trans.description(),
1294 input_type: type_name_of::<Out>(),
1295 output_type: type_name_of::<Out>(),
1296 resource_type: type_name_of::<Res>(),
1297 metadata: Default::default(),
1298 bus_capability: bus_capability_schema_from_policy(trans.bus_access_policy()),
1299 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1300 position: None,
1301 compensation_node_id: None,
1302 input_schema: trans.input_schema(),
1303 output_schema: None,
1304 };
1305 schematic.nodes.push(branch_node);
1306 schematic.edges.push(Edge {
1307 from: fanout_id.clone(),
1308 to: branch_id.clone(),
1309 kind: EdgeType::Parallel,
1310 label: Some(format!("Branch {}", i)),
1311 });
1312 branch_node_ids.push(branch_id);
1313 }
1314
1315 let fanin_node = Node {
1317 id: fanin_id.clone(),
1318 kind: NodeKind::FanIn,
1319 label: "FanIn".to_string(),
1320 description: Some(format!("Parallel join ({:?})", strategy)),
1321 input_type: type_name_of::<Out>(),
1322 output_type: type_name_of::<Out>(),
1323 resource_type: type_name_of::<Res>(),
1324 metadata: Default::default(),
1325 bus_capability: None,
1326 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1327 position: None,
1328 compensation_node_id: None,
1329 input_schema: None,
1330 output_schema: None,
1331 };
1332
1333 schematic.nodes.push(fanin_node);
1334 for branch_id in &branch_node_ids {
1335 schematic.edges.push(Edge {
1336 from: branch_id.clone(),
1337 to: fanin_id.clone(),
1338 kind: EdgeType::Parallel,
1339 label: Some("Join".to_string()),
1340 });
1341 }
1342
1343 let fanout_node_id = fanout_id.clone();
1345 let fanin_node_id = fanin_id.clone();
1346 let branch_ids_for_exec = branch_node_ids.clone();
1347 let step_idx = schematic.nodes.len() as u64 - 1;
1348
1349 let next_executor: Executor<In, Out, E, Res> = Arc::new(
1350 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Out, E>> {
1351 let prev = prev_executor.clone();
1352 let branches = transitions.clone();
1353 let fanout_id = fanout_node_id.clone();
1354 let fanin_id = fanin_node_id.clone();
1355 let branch_ids = branch_ids_for_exec.clone();
1356
1357 Box::pin(async move {
1358 let prev_result = prev(input, res, bus).await;
1360 let state = match prev_result {
1361 Outcome::Next(t) => t,
1362 other => return other.map(|_| unreachable!()),
1363 };
1364
1365 let fanout_enter_ts = now_ms();
1367 if let Some(timeline) = bus.read_mut::<Timeline>() {
1368 timeline.push(TimelineEvent::NodeEnter {
1369 node_id: fanout_id.clone(),
1370 node_label: "FanOut".to_string(),
1371 timestamp: fanout_enter_ts,
1372 });
1373 }
1374
1375 let futs: Vec<_> = branches
1379 .iter()
1380 .enumerate()
1381 .map(|(i, trans)| {
1382 let branch_state = state.clone();
1383 let branch_node_id = branch_ids[i].clone();
1384 let trans = trans.clone();
1385
1386 async move {
1387 let mut branch_bus = Bus::new();
1388 let label = trans.label();
1389 let bus_policy = trans.bus_access_policy();
1390
1391 branch_bus.set_access_policy(label.clone(), bus_policy);
1392 let result = trans.run(branch_state, res, &mut branch_bus).await;
1393 branch_bus.clear_access_policy();
1394
1395 (i, branch_node_id, label, result)
1396 }
1397 })
1398 .collect();
1399
1400 let results: Vec<(usize, String, String, Outcome<Out, E>)> =
1402 futures_util::future::join_all(futs).await;
1403
1404 for (_, branch_node_id, branch_label, outcome) in &results {
1406 if let Some(timeline) = bus.read_mut::<Timeline>() {
1407 let ts = now_ms();
1408 timeline.push(TimelineEvent::NodeEnter {
1409 node_id: branch_node_id.clone(),
1410 node_label: branch_label.clone(),
1411 timestamp: ts,
1412 });
1413 timeline.push(TimelineEvent::NodeExit {
1414 node_id: branch_node_id.clone(),
1415 outcome_type: outcome_type_name(outcome),
1416 duration_ms: 0,
1417 timestamp: ts,
1418 });
1419 }
1420 }
1421
1422 if let Some(timeline) = bus.read_mut::<Timeline>() {
1424 timeline.push(TimelineEvent::NodeExit {
1425 node_id: fanout_id.clone(),
1426 outcome_type: "Next".to_string(),
1427 duration_ms: 0,
1428 timestamp: now_ms(),
1429 });
1430 }
1431
1432 let combined = match strategy {
1434 ParallelStrategy::AllMustSucceed => {
1435 let mut first_fault = None;
1436 let mut first_success = None;
1437
1438 for (_, _, _, outcome) in results {
1439 match outcome {
1440 Outcome::Fault(e) => {
1441 if first_fault.is_none() {
1442 first_fault = Some(Outcome::Fault(e));
1443 }
1444 }
1445 Outcome::Next(val) => {
1446 if first_success.is_none() {
1447 first_success = Some(Outcome::Next(val));
1448 }
1449 }
1450 other => {
1451 if first_fault.is_none() {
1454 first_fault = Some(other);
1455 }
1456 }
1457 }
1458 }
1459
1460 if let Some(fault) = first_fault {
1461 fault
1462 } else {
1463 first_success.unwrap_or_else(|| {
1464 Outcome::emit("execution.parallel.no_results", None)
1465 })
1466 }
1467 }
1468 ParallelStrategy::AnyCanFail => {
1469 let mut first_success = None;
1470 let mut first_fault = None;
1471
1472 for (_, _, _, outcome) in results {
1473 match outcome {
1474 Outcome::Next(val) => {
1475 if first_success.is_none() {
1476 first_success = Some(Outcome::Next(val));
1477 }
1478 }
1479 Outcome::Fault(e) => {
1480 if first_fault.is_none() {
1481 first_fault = Some(Outcome::Fault(e));
1482 }
1483 }
1484 _ => {}
1485 }
1486 }
1487
1488 first_success.unwrap_or_else(|| {
1489 first_fault.unwrap_or_else(|| {
1490 Outcome::emit("execution.parallel.no_results", None)
1491 })
1492 })
1493 }
1494 };
1495
1496 let fanin_enter_ts = now_ms();
1498 if let Some(timeline) = bus.read_mut::<Timeline>() {
1499 timeline.push(TimelineEvent::NodeEnter {
1500 node_id: fanin_id.clone(),
1501 node_label: "FanIn".to_string(),
1502 timestamp: fanin_enter_ts,
1503 });
1504 timeline.push(TimelineEvent::NodeExit {
1505 node_id: fanin_id.clone(),
1506 outcome_type: outcome_type_name(&combined),
1507 duration_ms: 0,
1508 timestamp: fanin_enter_ts,
1509 });
1510 }
1511
1512 if let Some(handle) = bus.read::<PersistenceHandle>() {
1514 let trace_id = persistence_trace_id(bus);
1515 let circuit = bus
1516 .read::<ranvier_core::schematic::Schematic>()
1517 .map(|s| s.name.clone())
1518 .unwrap_or_default();
1519 let version = bus
1520 .read::<ranvier_core::schematic::Schematic>()
1521 .map(|s| s.schema_version.clone())
1522 .unwrap_or_default();
1523
1524 persist_execution_event(
1525 handle,
1526 &trace_id,
1527 &circuit,
1528 &version,
1529 step_idx,
1530 Some(fanin_id.clone()),
1531 outcome_kind_name(&combined),
1532 Some(combined.to_json_value()),
1533 )
1534 .await;
1535 }
1536
1537 combined
1538 })
1539 },
1540 );
1541
1542 Axon {
1543 schematic,
1544 executor: next_executor,
1545 execution_mode,
1546 persistence_store,
1547 audit_sink,
1548 dlq_sink,
1549 dlq_policy,
1550 dynamic_dlq_policy,
1551 saga_policy,
1552 dynamic_saga_policy,
1553 saga_compensation_registry,
1554 iam_handle,
1555 }
1556 }
1557
1558 pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
1560 if let ExecutionMode::Singleton {
1561 lock_key,
1562 ttl_ms,
1563 lock_provider,
1564 } = &self.execution_mode
1565 {
1566 let trace_span = tracing::info_span!("Singleton Execution", key = %lock_key);
1567 let _enter = trace_span.enter();
1568 match lock_provider.try_acquire(lock_key, *ttl_ms).await {
1569 Ok(true) => {
1570 tracing::debug!("Successfully acquired singleton lock: {}", lock_key);
1571 }
1572 Ok(false) => {
1573 tracing::debug!(
1574 "Singleton lock {} already held, aborting execution.",
1575 lock_key
1576 );
1577 return Outcome::emit(
1579 "execution.skipped.lock_held",
1580 Some(serde_json::json!({
1581 "lock_key": lock_key
1582 })),
1583 );
1584 }
1585 Err(e) => {
1586 tracing::error!("Failed to check singleton lock {}: {:?}", lock_key, e);
1587 return Outcome::emit(
1588 "execution.skipped.lock_error",
1589 Some(serde_json::json!({
1590 "error": e.to_string()
1591 })),
1592 );
1593 }
1594 }
1595 }
1596
1597 if let Some(iam) = &self.iam_handle {
1599 use ranvier_core::iam::{IamPolicy, IamToken, enforce_policy};
1600
1601 if matches!(iam.policy, IamPolicy::None) {
1602 } else {
1604 let token = bus.read::<IamToken>().map(|t| t.0.clone());
1605
1606 match token {
1607 Some(raw_token) => {
1608 match iam.verifier.verify(&raw_token).await {
1609 Ok(identity) => {
1610 if let Err(e) = enforce_policy(&iam.policy, &identity) {
1611 tracing::warn!(
1612 policy = ?iam.policy,
1613 subject = %identity.subject,
1614 "IAM policy enforcement failed: {}",
1615 e
1616 );
1617 return Outcome::emit(
1618 "iam.policy_denied",
1619 Some(serde_json::json!({
1620 "error": e.to_string(),
1621 "subject": identity.subject,
1622 })),
1623 );
1624 }
1625 bus.insert(identity);
1627 }
1628 Err(e) => {
1629 tracing::warn!("IAM token verification failed: {}", e);
1630 return Outcome::emit(
1631 "iam.verification_failed",
1632 Some(serde_json::json!({
1633 "error": e.to_string()
1634 })),
1635 );
1636 }
1637 }
1638 }
1639 None => {
1640 tracing::warn!("IAM policy requires token but none found in Bus");
1641 return Outcome::emit("iam.missing_token", None);
1642 }
1643 }
1644 }
1645 }
1646
1647 let trace_id = persistence_trace_id(bus);
1648 let label = self.schematic.name.clone();
1649
1650 if let Some(sink) = &self.dlq_sink {
1652 bus.insert(sink.clone());
1653 }
1654 let effective_dlq_policy = self
1656 .dynamic_dlq_policy
1657 .as_ref()
1658 .map(|d| d.current())
1659 .unwrap_or_else(|| self.dlq_policy.clone());
1660 bus.insert(effective_dlq_policy);
1661 bus.insert(self.schematic.clone());
1662 let effective_saga_policy = self
1663 .dynamic_saga_policy
1664 .as_ref()
1665 .map(|d| d.current())
1666 .unwrap_or_else(|| self.saga_policy.clone());
1667 bus.insert(effective_saga_policy.clone());
1668
1669 if effective_saga_policy == SagaPolicy::Enabled && bus.read::<SagaStack>().is_none() {
1671 bus.insert(SagaStack::new());
1672 }
1673
1674 let persistence_handle = bus.read::<PersistenceHandle>().cloned();
1675 let compensation_handle = bus.read::<CompensationHandle>().cloned();
1676 let compensation_retry_policy = compensation_retry_policy(bus);
1677 let compensation_idempotency = bus.read::<CompensationIdempotencyHandle>().cloned();
1678 let version = self.schematic.schema_version.clone();
1679 let migration_registry = bus
1680 .read::<ranvier_core::schematic::MigrationRegistry>()
1681 .cloned();
1682
1683 let persistence_start_step = if let Some(handle) = persistence_handle.as_ref() {
1684 let (mut start_step, trace_version, intervention, last_node_id, mut last_payload) =
1685 load_persistence_version(handle, &trace_id).await;
1686
1687 if let Some(interv) = intervention {
1688 tracing::info!(
1689 trace_id = %trace_id,
1690 target_node = %interv.target_node,
1691 "Applying manual intervention command"
1692 );
1693
1694 if let Some(target_idx) = self
1696 .schematic
1697 .nodes
1698 .iter()
1699 .position(|n| n.id == interv.target_node || n.label == interv.target_node)
1700 {
1701 tracing::info!(
1702 trace_id = %trace_id,
1703 target_node = %interv.target_node,
1704 target_step = target_idx,
1705 "Intervention: Jumping to target node"
1706 );
1707 start_step = target_idx as u64;
1708
1709 bus.insert(ManualJump {
1711 target_node: interv.target_node.clone(),
1712 payload_override: interv.payload_override.clone(),
1713 });
1714
1715 if let Some(sink) = self.audit_sink.as_ref() {
1717 let event = AuditEvent::new(
1718 uuid::Uuid::new_v4().to_string(),
1719 "System".to_string(),
1720 "ApplyIntervention".to_string(),
1721 trace_id.to_string(),
1722 )
1723 .with_metadata("target_node", interv.target_node.clone())
1724 .with_metadata("target_step", target_idx);
1725
1726 let _ = sink.append(&event).await;
1727 }
1728 } else {
1729 tracing::warn!(
1730 trace_id = %trace_id,
1731 target_node = %interv.target_node,
1732 "Intervention target node not found in schematic; ignoring jump"
1733 );
1734 }
1735 }
1736
1737 if let Some(old_version) = trace_version
1738 && old_version != version
1739 {
1740 tracing::info!(
1741 trace_id = %trace_id,
1742 old_version = %old_version,
1743 current_version = %version,
1744 "Version mismatch detected during resumption"
1745 );
1746
1747 let migration_path = migration_registry
1749 .as_ref()
1750 .and_then(|r| r.find_migration_path(&old_version, &version));
1751
1752 let (final_migration, mapped_payload) = if let Some(path) = migration_path {
1753 if path.is_empty() {
1754 (None, last_payload.clone())
1755 } else {
1756 let mut payload = last_payload.clone();
1758 for hop in &path {
1759 if let (Some(mapper), Some(p)) = (&hop.payload_mapper, payload.as_ref())
1760 {
1761 match mapper.map_state(p) {
1762 Ok(mapped) => payload = Some(mapped),
1763 Err(e) => {
1764 tracing::error!(
1765 trace_id = %trace_id,
1766 from = %hop.from_version,
1767 to = %hop.to_version,
1768 error = %e,
1769 "Payload migration mapper failed"
1770 );
1771 return Outcome::emit(
1772 "execution.resumption.payload_migration_failed",
1773 Some(serde_json::json!({
1774 "trace_id": trace_id,
1775 "from": hop.from_version,
1776 "to": hop.to_version,
1777 "error": e.to_string()
1778 })),
1779 );
1780 }
1781 }
1782 }
1783 }
1784 let hops: Vec<String> = path
1785 .iter()
1786 .map(|h| format!("{}->{}", h.from_version, h.to_version))
1787 .collect();
1788 tracing::info!(trace_id = %trace_id, hops = ?hops, "Applied multi-hop migration path");
1789 (path.last().copied(), payload)
1790 }
1791 } else {
1792 (None, last_payload.clone())
1793 };
1794
1795 let migration = final_migration.or_else(|| {
1797 migration_registry
1798 .as_ref()
1799 .and_then(|r| r.find_migration(&old_version, &version))
1800 });
1801
1802 if mapped_payload.is_some() {
1804 last_payload = mapped_payload;
1805 }
1806
1807 let strategy = if let (Some(m), Some(node_id)) = (migration, last_node_id.as_ref())
1808 {
1809 m.node_mapping
1810 .get(node_id)
1811 .cloned()
1812 .unwrap_or(m.default_strategy.clone())
1813 } else {
1814 migration
1815 .map(|m| m.default_strategy.clone())
1816 .unwrap_or(ranvier_core::schematic::MigrationStrategy::Fail)
1817 };
1818
1819 match strategy {
1820 ranvier_core::schematic::MigrationStrategy::ResumeFromStart => {
1821 tracing::info!(trace_id = %trace_id, "Applying ResumeFromStart migration strategy");
1822 start_step = 0;
1823 }
1824 ranvier_core::schematic::MigrationStrategy::MigrateActiveNode {
1825 new_node_id,
1826 ..
1827 } => {
1828 tracing::info!(trace_id = %trace_id, to_node = %new_node_id, "Applying MigrateActiveNode strategy");
1829 if let Some(target_idx) = self
1830 .schematic
1831 .nodes
1832 .iter()
1833 .position(|n| n.id == new_node_id || n.label == new_node_id)
1834 {
1835 start_step = target_idx as u64;
1836 } else {
1837 tracing::warn!(trace_id = %trace_id, "MigrateActiveNode: target node {} not found", new_node_id);
1838 return Outcome::emit(
1839 "execution.resumption.migration_target_not_found",
1840 Some(serde_json::json!({ "node_id": new_node_id })),
1841 );
1842 }
1843 }
1844 ranvier_core::schematic::MigrationStrategy::FallbackToNode(node_id) => {
1845 tracing::info!(trace_id = %trace_id, to_node = %node_id, "Applying FallbackToNode strategy");
1846 if let Some(target_idx) = self
1847 .schematic
1848 .nodes
1849 .iter()
1850 .position(|n| n.id == node_id || n.label == node_id)
1851 {
1852 start_step = target_idx as u64;
1853 } else {
1854 tracing::warn!(trace_id = %trace_id, "FallbackToNode: node {} not found", node_id);
1855 return Outcome::emit(
1856 "execution.resumption.migration_target_not_found",
1857 Some(serde_json::json!({ "node_id": node_id })),
1858 );
1859 }
1860 }
1861 ranvier_core::schematic::MigrationStrategy::Fail => {
1862 tracing::error!(trace_id = %trace_id, "Version mismatch: no migration path found. Failing resumption.");
1863 return Outcome::emit(
1864 "execution.resumption.version_mismatch_failed",
1865 Some(serde_json::json!({
1866 "trace_id": trace_id,
1867 "old_version": old_version,
1868 "current_version": version
1869 })),
1870 );
1871 }
1872 _ => {
1873 tracing::error!(trace_id = %trace_id, "Unsupported migration strategy: {:?}", strategy);
1874 return Outcome::emit(
1875 "execution.resumption.unsupported_migration",
1876 Some(serde_json::json!({
1877 "trace_id": trace_id,
1878 "strategy": format!("{:?}", strategy)
1879 })),
1880 );
1881 }
1882 }
1883 }
1884
1885 let ingress_node_id = self.schematic.nodes.first().map(|n| n.id.clone());
1886 persist_execution_event(
1887 handle,
1888 &trace_id,
1889 &label,
1890 &version,
1891 start_step,
1892 ingress_node_id,
1893 "Enter",
1894 None,
1895 )
1896 .await;
1897
1898 bus.insert(StartStep(start_step));
1899 if start_step > 0 {
1900 bus.insert(ResumptionState {
1901 payload: last_payload,
1902 });
1903 }
1904
1905 Some(start_step)
1906 } else {
1907 None
1908 };
1909
1910 let should_capture = should_attach_timeline(bus);
1911 let inserted_timeline = if should_capture {
1912 ensure_timeline(bus)
1913 } else {
1914 false
1915 };
1916 let ingress_started = std::time::Instant::now();
1917 let ingress_enter_ts = now_ms();
1918 if should_capture
1919 && let (Some(timeline), Some(ingress)) =
1920 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1921 {
1922 timeline.push(TimelineEvent::NodeEnter {
1923 node_id: ingress.id.clone(),
1924 node_label: ingress.label.clone(),
1925 timestamp: ingress_enter_ts,
1926 });
1927 }
1928
1929 let circuit_span = tracing::info_span!(
1930 "Circuit",
1931 ranvier.circuit = %label,
1932 ranvier.outcome_kind = tracing::field::Empty,
1933 ranvier.outcome_target = tracing::field::Empty
1934 );
1935 let outcome = (self.executor)(input, resources, bus)
1936 .instrument(circuit_span.clone())
1937 .await;
1938 circuit_span.record("ranvier.outcome_kind", outcome_kind_name(&outcome));
1939 if let Some(target) = outcome_target(&outcome) {
1940 circuit_span.record("ranvier.outcome_target", tracing::field::display(&target));
1941 }
1942
1943 if matches!(outcome, Outcome::Fault(_)) && self.saga_policy == SagaPolicy::Enabled {
1945 while let Some(task) = {
1946 let mut stack = bus.read_mut::<SagaStack>();
1947 stack.as_mut().and_then(|s| s.pop())
1948 } {
1949 tracing::info!(trace_id = %trace_id, node_id = %task.node_id, "Compensating step: {}", task.node_label);
1950
1951 let handler = {
1952 let registry = self.saga_compensation_registry.read().expect("saga compensation registry lock poisoned");
1953 registry.get(&task.node_id)
1954 };
1955 if let Some(handler) = handler {
1956 let res = handler(task.input_snapshot, resources, bus).await;
1957 if let Outcome::Fault(e) = res {
1958 tracing::error!(trace_id = %trace_id, node_id = %task.node_id, "Saga compensation FAILED: {:?}", e);
1959 }
1960 } else {
1961 tracing::warn!(trace_id = %trace_id, node_id = %task.node_id, "No compensation handler found in registry for saga rollback");
1962 }
1963 }
1964 tracing::info!(trace_id = %trace_id, "Saga automated rollback completed");
1965 }
1966
1967 let ingress_exit_ts = now_ms();
1968 if should_capture
1969 && let (Some(timeline), Some(ingress)) =
1970 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1971 {
1972 timeline.push(TimelineEvent::NodeExit {
1973 node_id: ingress.id.clone(),
1974 outcome_type: outcome_type_name(&outcome),
1975 duration_ms: ingress_started.elapsed().as_millis() as u64,
1976 timestamp: ingress_exit_ts,
1977 });
1978 }
1979
1980 if let Some(handle) = persistence_handle.as_ref() {
1981 let fault_step = persistence_start_step.map(|s| s + 1).unwrap_or(1);
1982 persist_execution_event(
1983 handle,
1984 &trace_id,
1985 &label,
1986 &version,
1987 fault_step,
1988 None, outcome_kind_name(&outcome),
1990 Some(outcome.to_json_value()),
1991 )
1992 .await;
1993
1994 let mut completion = completion_from_outcome(&outcome);
1995 if matches!(outcome, Outcome::Fault(_))
1996 && let Some(compensation) = compensation_handle.as_ref()
1997 && compensation_auto_trigger(bus)
1998 {
1999 let context = CompensationContext {
2000 trace_id: trace_id.clone(),
2001 circuit: label.clone(),
2002 fault_kind: outcome_kind_name(&outcome).to_string(),
2003 fault_step,
2004 timestamp_ms: now_ms(),
2005 };
2006
2007 if run_compensation(
2008 compensation,
2009 context,
2010 compensation_retry_policy,
2011 compensation_idempotency.clone(),
2012 )
2013 .await
2014 {
2015 persist_execution_event(
2016 handle,
2017 &trace_id,
2018 &label,
2019 &version,
2020 fault_step.saturating_add(1),
2021 None,
2022 "Compensated",
2023 None,
2024 )
2025 .await;
2026 completion = CompletionState::Compensated;
2027 }
2028 }
2029
2030 if persistence_auto_complete(bus) {
2031 persist_completion(handle, &trace_id, completion).await;
2032 }
2033 }
2034
2035 if should_capture {
2036 maybe_export_timeline(bus, &outcome);
2037 }
2038 if inserted_timeline {
2039 let _ = bus.remove::<Timeline>();
2040 }
2041
2042 outcome
2043 }
2044
2045 pub fn serve_inspector(self, port: u16) -> Self {
2048 if !inspector_dev_mode_from_env() {
2049 tracing::info!("Inspector disabled because RANVIER_MODE is production");
2050 return self;
2051 }
2052 if !inspector_enabled_from_env() {
2053 tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
2054 return self;
2055 }
2056
2057 let schematic = self.schematic.clone();
2058 let axon_inspector = Arc::new(self.clone());
2059 tokio::spawn(async move {
2060 if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
2061 .with_projection_files_from_env()
2062 .with_mode_from_env()
2063 .with_auth_policy_from_env()
2064 .with_state_inspector(axon_inspector)
2065 .serve()
2066 .await
2067 {
2068 tracing::error!("Inspector server failed: {}", e);
2069 }
2070 });
2071 self
2072 }
2073
2074 pub fn schematic(&self) -> &Schematic {
2076 &self.schematic
2077 }
2078
2079 pub fn into_schematic(self) -> Schematic {
2081 self.schematic
2082 }
2083
2084 pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
2095 schematic_export_request_from_process()
2096 }
2097
2098 pub fn maybe_export_and_exit(&self) -> anyhow::Result<bool> {
2110 self.maybe_export_and_exit_with(|_| ())
2111 }
2112
2113 pub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> anyhow::Result<bool>
2118 where
2119 F: FnOnce(&SchematicExportRequest),
2120 {
2121 let Some(request) = self.schematic_export_request() else {
2122 return Ok(false);
2123 };
2124 on_before_exit(&request);
2125 self.export_schematic(&request)?;
2126 Ok(true)
2127 }
2128
2129 pub fn export_schematic(&self, request: &SchematicExportRequest) -> anyhow::Result<()> {
2131 let json = serde_json::to_string_pretty(self.schematic())?;
2132 if let Some(path) = &request.output {
2133 if let Some(parent) = path.parent()
2134 && !parent.as_os_str().is_empty()
2135 {
2136 fs::create_dir_all(parent)?;
2137 }
2138 fs::write(path, json.as_bytes())?;
2139 return Ok(());
2140 }
2141 println!("{}", json);
2142 Ok(())
2143 }
2144}
2145
2146fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
2147 let args: Vec<OsString> = std::env::args_os().skip(1).collect();
2148 let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
2149 let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
2150
2151 let mut i = 0;
2152 while i < args.len() {
2153 let arg = args[i].to_string_lossy();
2154
2155 if arg == "--schematic" {
2156 enabled = true;
2157 i += 1;
2158 continue;
2159 }
2160
2161 if arg == "--schematic-output" || arg == "--output" {
2162 if let Some(next) = args.get(i + 1) {
2163 output = Some(PathBuf::from(next));
2164 i += 2;
2165 continue;
2166 }
2167 } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
2168 output = Some(PathBuf::from(value));
2169 } else if let Some(value) = arg.strip_prefix("--output=") {
2170 output = Some(PathBuf::from(value));
2171 }
2172
2173 i += 1;
2174 }
2175
2176 if enabled {
2177 Some(SchematicExportRequest { output })
2178 } else {
2179 None
2180 }
2181}
2182
2183fn env_flag_is_true(key: &str) -> bool {
2184 match std::env::var(key) {
2185 Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
2186 Err(_) => false,
2187 }
2188}
2189
2190fn inspector_enabled_from_env() -> bool {
2191 let raw = std::env::var("RANVIER_INSPECTOR").ok();
2192 inspector_enabled_from_value(raw.as_deref())
2193}
2194
2195fn inspector_enabled_from_value(value: Option<&str>) -> bool {
2196 match value {
2197 Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
2198 None => true,
2199 }
2200}
2201
2202fn inspector_dev_mode_from_env() -> bool {
2203 let raw = std::env::var("RANVIER_MODE").ok();
2204 inspector_dev_mode_from_value(raw.as_deref())
2205}
2206
2207fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
2208 !matches!(
2209 value.map(|v| v.to_ascii_lowercase()),
2210 Some(mode) if mode == "prod" || mode == "production"
2211 )
2212}
2213
2214fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
2215 let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
2216 Ok(v) if !v.trim().is_empty() => v,
2217 _ => return,
2218 };
2219
2220 let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
2221 let policy = timeline_adaptive_policy();
2222 let forced = should_force_export(outcome, &policy);
2223 let should_export = sampled || forced;
2224 if !should_export {
2225 record_sampling_stats(false, sampled, forced, "none", &policy);
2226 return;
2227 }
2228
2229 let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
2230 timeline.sort();
2231
2232 let mode = std::env::var("RANVIER_TIMELINE_MODE")
2233 .unwrap_or_else(|_| "overwrite".to_string())
2234 .to_ascii_lowercase();
2235
2236 if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
2237 tracing::warn!(
2238 "Failed to persist timeline file {} (mode={}): {}",
2239 path,
2240 mode,
2241 err
2242 );
2243 record_sampling_stats(false, sampled, forced, &mode, &policy);
2244 } else {
2245 record_sampling_stats(true, sampled, forced, &mode, &policy);
2246 }
2247}
2248
2249fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
2250 match outcome {
2251 Outcome::Next(_) => "Next".to_string(),
2252 Outcome::Branch(id, _) => format!("Branch:{}", id),
2253 Outcome::Jump(id, _) => format!("Jump:{}", id),
2254 Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
2255 Outcome::Fault(_) => "Fault".to_string(),
2256 }
2257}
2258
2259fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
2260 match outcome {
2261 Outcome::Next(_) => "Next",
2262 Outcome::Branch(_, _) => "Branch",
2263 Outcome::Jump(_, _) => "Jump",
2264 Outcome::Emit(_, _) => "Emit",
2265 Outcome::Fault(_) => "Fault",
2266 }
2267}
2268
2269fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
2270 match outcome {
2271 Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
2272 Outcome::Jump(node_id, _) => Some(node_id.to_string()),
2273 Outcome::Emit(event_type, _) => Some(event_type.clone()),
2274 Outcome::Next(_) | Outcome::Fault(_) => None,
2275 }
2276}
2277
2278fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
2279 match outcome {
2280 Outcome::Fault(_) => CompletionState::Fault,
2281 _ => CompletionState::Success,
2282 }
2283}
2284
2285fn persistence_trace_id(bus: &Bus) -> String {
2286 if let Some(explicit) = bus.read::<PersistenceTraceId>() {
2287 explicit.0.clone()
2288 } else {
2289 format!("{}:{}", bus.id, now_ms())
2290 }
2291}
2292
2293fn persistence_auto_complete(bus: &Bus) -> bool {
2294 bus.read::<PersistenceAutoComplete>()
2295 .map(|v| v.0)
2296 .unwrap_or(true)
2297}
2298
2299fn compensation_auto_trigger(bus: &Bus) -> bool {
2300 bus.read::<CompensationAutoTrigger>()
2301 .map(|v| v.0)
2302 .unwrap_or(true)
2303}
2304
2305fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
2306 bus.read::<CompensationRetryPolicy>()
2307 .copied()
2308 .unwrap_or_default()
2309}
2310
2311fn unwrap_outcome_payload(payload: Option<&serde_json::Value>) -> Option<serde_json::Value> {
2317 payload.map(|p| {
2318 p.get("Next")
2319 .or_else(|| p.get("Branch"))
2320 .or_else(|| p.get("Jump"))
2321 .cloned()
2322 .unwrap_or_else(|| p.clone())
2323 })
2324}
2325
2326async fn load_persistence_version(
2327 handle: &PersistenceHandle,
2328 trace_id: &str,
2329) -> (
2330 u64,
2331 Option<String>,
2332 Option<crate::persistence::Intervention>,
2333 Option<String>,
2334 Option<serde_json::Value>,
2335) {
2336 let store = handle.store();
2337 match store.load(trace_id).await {
2338 Ok(Some(trace)) => {
2339 let (next_step, last_node_id, last_payload) =
2340 if let Some(resume_from_step) = trace.resumed_from_step {
2341 let anchor_event = trace
2342 .events
2343 .iter()
2344 .rev()
2345 .find(|event| {
2346 event.step <= resume_from_step
2347 && event.outcome_kind == "Next"
2348 && event.payload.is_some()
2349 })
2350 .or_else(|| {
2351 trace.events.iter().rev().find(|event| {
2352 event.step <= resume_from_step
2353 && event.outcome_kind != "Fault"
2354 && event.payload.is_some()
2355 })
2356 })
2357 .or_else(|| {
2358 trace.events.iter().rev().find(|event| {
2359 event.step <= resume_from_step && event.payload.is_some()
2360 })
2361 })
2362 .or_else(|| trace.events.last());
2363
2364 (
2365 resume_from_step.saturating_add(1),
2366 anchor_event.and_then(|event| event.node_id.clone()),
2367 anchor_event.and_then(|event| {
2368 unwrap_outcome_payload(event.payload.as_ref())
2369 }),
2370 )
2371 } else {
2372 let last_event = trace.events.last();
2373 (
2374 last_event
2375 .map(|event| event.step.saturating_add(1))
2376 .unwrap_or(0),
2377 last_event.and_then(|event| event.node_id.clone()),
2378 last_event.and_then(|event| {
2379 unwrap_outcome_payload(event.payload.as_ref())
2380 }),
2381 )
2382 };
2383
2384 (
2385 next_step,
2386 Some(trace.schematic_version),
2387 trace.interventions.last().cloned(),
2388 last_node_id,
2389 last_payload,
2390 )
2391 }
2392 Ok(None) => (0, None, None, None, None),
2393 Err(err) => {
2394 tracing::warn!(
2395 trace_id = %trace_id,
2396 error = %err,
2397 "Failed to load persistence trace; falling back to step=0"
2398 );
2399 (0, None, None, None, None)
2400 }
2401 }
2402}
2403
2404#[allow(clippy::too_many_arguments)]
2405async fn run_this_step<In, Out, E, Res>(
2406 trans: &(impl Transition<In, Out, Resources = Res, Error = E> + Clone + 'static),
2407 state: In,
2408 res: &Res,
2409 bus: &mut Bus,
2410 node_id: &str,
2411 node_label: &str,
2412 bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2413 step_idx: u64,
2414) -> Outcome<Out, E>
2415where
2416 In: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2417 Out: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2418 E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2419 Res: ranvier_core::transition::ResourceRequirement,
2420{
2421 let label = trans.label();
2422 let res_type = std::any::type_name::<Res>()
2423 .split("::")
2424 .last()
2425 .unwrap_or("unknown");
2426
2427 let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2429 debug.should_pause(node_id)
2430 } else {
2431 false
2432 };
2433
2434 if should_pause {
2435 let trace_id = persistence_trace_id(bus);
2436 tracing::event!(
2437 target: "ranvier.debugger",
2438 tracing::Level::INFO,
2439 trace_id = %trace_id,
2440 node_id = %node_id,
2441 "Node paused"
2442 );
2443
2444 if let Some(timeline) = bus.read_mut::<Timeline>() {
2445 timeline.push(TimelineEvent::NodePaused {
2446 node_id: node_id.to_string(),
2447 timestamp: now_ms(),
2448 });
2449 }
2450 if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2451 debug.wait().await;
2452 }
2453 }
2454
2455 let enter_ts = now_ms();
2456 if let Some(timeline) = bus.read_mut::<Timeline>() {
2457 timeline.push(TimelineEvent::NodeEnter {
2458 node_id: node_id.to_string(),
2459 node_label: node_label.to_string(),
2460 timestamp: enter_ts,
2461 });
2462 }
2463
2464 let dlq_retry_config = bus.read::<DlqPolicy>().and_then(|p| {
2466 if let DlqPolicy::RetryThenDlq {
2467 max_attempts,
2468 backoff_ms,
2469 } = p
2470 {
2471 Some((*max_attempts, *backoff_ms))
2472 } else {
2473 None
2474 }
2475 });
2476 let retry_state_snapshot = if dlq_retry_config.is_some() {
2477 serde_json::to_value(&state).ok()
2478 } else {
2479 None
2480 };
2481
2482 let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2484 Some(serde_json::to_vec(&state).unwrap_or_default())
2485 } else {
2486 None
2487 };
2488
2489 let node_span = tracing::info_span!(
2490 "Node",
2491 ranvier.node = %label,
2492 ranvier.resource_type = %res_type,
2493 ranvier.outcome_kind = tracing::field::Empty,
2494 ranvier.outcome_target = tracing::field::Empty
2495 );
2496 let started = std::time::Instant::now();
2497 bus.set_access_policy(label.clone(), bus_policy.clone());
2498 let result = trans
2499 .run(state, res, bus)
2500 .instrument(node_span.clone())
2501 .await;
2502 bus.clear_access_policy();
2503
2504 let result = if let Outcome::Fault(_) = &result {
2507 if let (Some((max_attempts, backoff_ms)), Some(snapshot)) =
2508 (dlq_retry_config, &retry_state_snapshot)
2509 {
2510 let mut final_result = result;
2511 for attempt in 2..=max_attempts {
2513 let delay = backoff_ms.saturating_mul(2u64.saturating_pow(attempt - 2));
2514
2515 tracing::info!(
2516 ranvier.node = %label,
2517 attempt = attempt,
2518 max_attempts = max_attempts,
2519 backoff_ms = delay,
2520 "Retrying faulted node"
2521 );
2522
2523 if let Some(timeline) = bus.read_mut::<Timeline>() {
2524 timeline.push(TimelineEvent::NodeRetry {
2525 node_id: node_id.to_string(),
2526 attempt,
2527 max_attempts,
2528 backoff_ms: delay,
2529 timestamp: now_ms(),
2530 });
2531 }
2532
2533 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
2534
2535 if let Ok(retry_state) = serde_json::from_value::<In>(snapshot.clone()) {
2536 bus.set_access_policy(label.clone(), bus_policy.clone());
2537 let retry_result = trans
2538 .run(retry_state, res, bus)
2539 .instrument(tracing::info_span!(
2540 "NodeRetry",
2541 ranvier.node = %label,
2542 attempt = attempt
2543 ))
2544 .await;
2545 bus.clear_access_policy();
2546
2547 match &retry_result {
2548 Outcome::Fault(_) => {
2549 final_result = retry_result;
2550 }
2551 _ => {
2552 final_result = retry_result;
2553 break;
2554 }
2555 }
2556 }
2557 }
2558 final_result
2559 } else {
2560 result
2561 }
2562 } else {
2563 result
2564 };
2565
2566 node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
2567 if let Some(target) = outcome_target(&result) {
2568 node_span.record("ranvier.outcome_target", tracing::field::display(&target));
2569 }
2570 let duration_ms = started.elapsed().as_millis() as u64;
2571 let exit_ts = now_ms();
2572
2573 if let Some(timeline) = bus.read_mut::<Timeline>() {
2574 timeline.push(TimelineEvent::NodeExit {
2575 node_id: node_id.to_string(),
2576 outcome_type: outcome_type_name(&result),
2577 duration_ms,
2578 timestamp: exit_ts,
2579 });
2580
2581 if let Outcome::Branch(branch_id, _) = &result {
2582 timeline.push(TimelineEvent::Branchtaken {
2583 branch_id: branch_id.clone(),
2584 timestamp: exit_ts,
2585 });
2586 }
2587 }
2588
2589 if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot)
2591 && let Some(stack) = bus.read_mut::<SagaStack>()
2592 {
2593 stack.push(node_id.to_string(), label.clone(), snapshot);
2594 }
2595
2596 if let Some(handle) = bus.read::<PersistenceHandle>() {
2597 let trace_id = persistence_trace_id(bus);
2598 let circuit = bus
2599 .read::<ranvier_core::schematic::Schematic>()
2600 .map(|s| s.name.clone())
2601 .unwrap_or_default();
2602 let version = bus
2603 .read::<ranvier_core::schematic::Schematic>()
2604 .map(|s| s.schema_version.clone())
2605 .unwrap_or_default();
2606
2607 persist_execution_event(
2608 handle,
2609 &trace_id,
2610 &circuit,
2611 &version,
2612 step_idx,
2613 Some(node_id.to_string()),
2614 outcome_kind_name(&result),
2615 Some(result.to_json_value()),
2616 )
2617 .await;
2618 }
2619
2620 if let Outcome::Fault(f) = &result {
2623 let dlq_action = {
2625 let policy = bus.read::<DlqPolicy>();
2626 let sink = bus.read::<Arc<dyn DlqSink>>();
2627 match (sink, policy) {
2628 (Some(s), Some(p)) if !matches!(p, DlqPolicy::Drop) => Some(s.clone()),
2629 _ => None,
2630 }
2631 };
2632
2633 if let Some(sink) = dlq_action {
2634 if let Some((max_attempts, _)) = dlq_retry_config
2635 && let Some(timeline) = bus.read_mut::<Timeline>()
2636 {
2637 timeline.push(TimelineEvent::DlqExhausted {
2638 node_id: node_id.to_string(),
2639 total_attempts: max_attempts,
2640 timestamp: now_ms(),
2641 });
2642 }
2643
2644 let trace_id = persistence_trace_id(bus);
2645 let circuit = bus
2646 .read::<ranvier_core::schematic::Schematic>()
2647 .map(|s| s.name.clone())
2648 .unwrap_or_default();
2649 let _ = sink
2650 .store_dead_letter(
2651 &trace_id,
2652 &circuit,
2653 node_id,
2654 &format!("{:?}", f),
2655 &serde_json::to_vec(&f).unwrap_or_default(),
2656 )
2657 .await;
2658 }
2659 }
2660
2661 result
2662}
2663
2664#[allow(clippy::too_many_arguments)]
2665async fn run_this_compensated_step<Out, Next, E, Res, Comp>(
2666 trans: &(impl Transition<Out, Next, Resources = Res, Error = E> + Clone + 'static),
2667 comp: &Comp,
2668 state: Out,
2669 res: &Res,
2670 bus: &mut Bus,
2671 node_id: &str,
2672 comp_node_id: &str,
2673 node_label: &str,
2674 bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2675 step_idx: u64,
2676) -> Outcome<Next, E>
2677where
2678 Out: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
2679 Next: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2680 E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2681 Res: ranvier_core::transition::ResourceRequirement,
2682 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
2683{
2684 let label = trans.label();
2685
2686 let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2688 debug.should_pause(node_id)
2689 } else {
2690 false
2691 };
2692
2693 if should_pause {
2694 let trace_id = persistence_trace_id(bus);
2695 tracing::event!(
2696 target: "ranvier.debugger",
2697 tracing::Level::INFO,
2698 trace_id = %trace_id,
2699 node_id = %node_id,
2700 "Node paused (compensated)"
2701 );
2702
2703 if let Some(timeline) = bus.read_mut::<Timeline>() {
2704 timeline.push(TimelineEvent::NodePaused {
2705 node_id: node_id.to_string(),
2706 timestamp: now_ms(),
2707 });
2708 }
2709 if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2710 debug.wait().await;
2711 }
2712 }
2713
2714 let enter_ts = now_ms();
2715 if let Some(timeline) = bus.read_mut::<Timeline>() {
2716 timeline.push(TimelineEvent::NodeEnter {
2717 node_id: node_id.to_string(),
2718 node_label: node_label.to_string(),
2719 timestamp: enter_ts,
2720 });
2721 }
2722
2723 let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2725 Some(serde_json::to_vec(&state).unwrap_or_default())
2726 } else {
2727 None
2728 };
2729
2730 let node_span = tracing::info_span!("Node", ranvier.node = %label);
2731 bus.set_access_policy(label.clone(), bus_policy.clone());
2732 let result = trans
2733 .run(state.clone(), res, bus)
2734 .instrument(node_span)
2735 .await;
2736 bus.clear_access_policy();
2737
2738 let duration_ms = 0; let exit_ts = now_ms();
2740
2741 if let Some(timeline) = bus.read_mut::<Timeline>() {
2742 timeline.push(TimelineEvent::NodeExit {
2743 node_id: node_id.to_string(),
2744 outcome_type: outcome_type_name(&result),
2745 duration_ms,
2746 timestamp: exit_ts,
2747 });
2748 }
2749
2750 if let Outcome::Fault(ref err) = result {
2752 if compensation_auto_trigger(bus) {
2753 tracing::info!(
2754 ranvier.node = %label,
2755 ranvier.compensation.trigger = "saga",
2756 error = ?err,
2757 "Saga compensation triggered"
2758 );
2759
2760 if let Some(timeline) = bus.read_mut::<Timeline>() {
2761 timeline.push(TimelineEvent::NodeEnter {
2762 node_id: comp_node_id.to_string(),
2763 node_label: format!("Compensate: {}", comp.label()),
2764 timestamp: exit_ts,
2765 });
2766 }
2767
2768 let _ = comp.run(state, res, bus).await;
2770
2771 if let Some(timeline) = bus.read_mut::<Timeline>() {
2772 timeline.push(TimelineEvent::NodeExit {
2773 node_id: comp_node_id.to_string(),
2774 outcome_type: "Compensated".to_string(),
2775 duration_ms: 0,
2776 timestamp: now_ms(),
2777 });
2778 }
2779
2780 if let Some(handle) = bus.read::<PersistenceHandle>() {
2781 let trace_id = persistence_trace_id(bus);
2782 let circuit = bus
2783 .read::<ranvier_core::schematic::Schematic>()
2784 .map(|s| s.name.clone())
2785 .unwrap_or_default();
2786 let version = bus
2787 .read::<ranvier_core::schematic::Schematic>()
2788 .map(|s| s.schema_version.clone())
2789 .unwrap_or_default();
2790
2791 persist_execution_event(
2792 handle,
2793 &trace_id,
2794 &circuit,
2795 &version,
2796 step_idx + 1, Some(comp_node_id.to_string()),
2798 "Compensated",
2799 None,
2800 )
2801 .await;
2802 }
2803 }
2804 } else if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot) {
2805 if let Some(stack) = bus.read_mut::<SagaStack>() {
2807 stack.push(node_id.to_string(), label.clone(), snapshot);
2808 }
2809
2810 if let Some(handle) = bus.read::<PersistenceHandle>() {
2811 let trace_id = persistence_trace_id(bus);
2812 let circuit = bus
2813 .read::<ranvier_core::schematic::Schematic>()
2814 .map(|s| s.name.clone())
2815 .unwrap_or_default();
2816 let version = bus
2817 .read::<ranvier_core::schematic::Schematic>()
2818 .map(|s| s.schema_version.clone())
2819 .unwrap_or_default();
2820
2821 persist_execution_event(
2822 handle,
2823 &trace_id,
2824 &circuit,
2825 &version,
2826 step_idx,
2827 Some(node_id.to_string()),
2828 outcome_kind_name(&result),
2829 Some(result.to_json_value()),
2830 )
2831 .await;
2832 }
2833 }
2834
2835 if let Outcome::Fault(f) = &result
2837 && let (Some(sink), Some(policy)) =
2838 (bus.read::<Arc<dyn DlqSink>>(), bus.read::<DlqPolicy>())
2839 {
2840 let should_dlq = !matches!(policy, DlqPolicy::Drop);
2841 if should_dlq {
2842 let trace_id = persistence_trace_id(bus);
2843 let circuit = bus
2844 .read::<ranvier_core::schematic::Schematic>()
2845 .map(|s| s.name.clone())
2846 .unwrap_or_default();
2847 let _ = sink
2848 .store_dead_letter(
2849 &trace_id,
2850 &circuit,
2851 node_id,
2852 &format!("{:?}", f),
2853 &serde_json::to_vec(&f).unwrap_or_default(),
2854 )
2855 .await;
2856 }
2857 }
2858
2859 result
2860}
2861
2862#[allow(clippy::too_many_arguments)]
2863pub async fn persist_execution_event(
2864 handle: &PersistenceHandle,
2865 trace_id: &str,
2866 circuit: &str,
2867 schematic_version: &str,
2868 step: u64,
2869 node_id: Option<String>,
2870 outcome_kind: &str,
2871 payload: Option<serde_json::Value>,
2872) {
2873 let store = handle.store();
2874 let envelope = PersistenceEnvelope {
2875 trace_id: trace_id.to_string(),
2876 circuit: circuit.to_string(),
2877 schematic_version: schematic_version.to_string(),
2878 step,
2879 node_id,
2880 outcome_kind: outcome_kind.to_string(),
2881 timestamp_ms: now_ms(),
2882 payload_hash: None,
2883 payload,
2884 };
2885
2886 if let Err(err) = store.append(envelope).await {
2887 tracing::warn!(
2888 trace_id = %trace_id,
2889 circuit = %circuit,
2890 step,
2891 outcome_kind = %outcome_kind,
2892 error = %err,
2893 "Failed to append persistence envelope"
2894 );
2895 }
2896}
2897
2898async fn persist_completion(
2899 handle: &PersistenceHandle,
2900 trace_id: &str,
2901 completion: CompletionState,
2902) {
2903 let store = handle.store();
2904 if let Err(err) = store.complete(trace_id, completion).await {
2905 tracing::warn!(
2906 trace_id = %trace_id,
2907 error = %err,
2908 "Failed to complete persistence trace"
2909 );
2910 }
2911}
2912
2913fn compensation_idempotency_key(context: &CompensationContext) -> String {
2914 format!(
2915 "{}:{}:{}",
2916 context.trace_id, context.circuit, context.fault_kind
2917 )
2918}
2919
2920async fn run_compensation(
2921 handle: &CompensationHandle,
2922 context: CompensationContext,
2923 retry_policy: CompensationRetryPolicy,
2924 idempotency: Option<CompensationIdempotencyHandle>,
2925) -> bool {
2926 let hook = handle.hook();
2927 let key = compensation_idempotency_key(&context);
2928
2929 if let Some(store_handle) = idempotency.as_ref() {
2930 let store = store_handle.store();
2931 match store.was_compensated(&key).await {
2932 Ok(true) => {
2933 tracing::info!(
2934 trace_id = %context.trace_id,
2935 circuit = %context.circuit,
2936 key = %key,
2937 "Compensation already recorded; skipping duplicate hook execution"
2938 );
2939 return true;
2940 }
2941 Ok(false) => {}
2942 Err(err) => {
2943 tracing::warn!(
2944 trace_id = %context.trace_id,
2945 key = %key,
2946 error = %err,
2947 "Failed to check compensation idempotency state"
2948 );
2949 }
2950 }
2951 }
2952
2953 let max_attempts = retry_policy.max_attempts.max(1);
2954 for attempt in 1..=max_attempts {
2955 match hook.compensate(context.clone()).await {
2956 Ok(()) => {
2957 if let Some(store_handle) = idempotency.as_ref() {
2958 let store = store_handle.store();
2959 if let Err(err) = store.mark_compensated(&key).await {
2960 tracing::warn!(
2961 trace_id = %context.trace_id,
2962 key = %key,
2963 error = %err,
2964 "Failed to mark compensation idempotency state"
2965 );
2966 }
2967 }
2968 return true;
2969 }
2970 Err(err) => {
2971 let is_last = attempt == max_attempts;
2972 tracing::warn!(
2973 trace_id = %context.trace_id,
2974 circuit = %context.circuit,
2975 fault_kind = %context.fault_kind,
2976 fault_step = context.fault_step,
2977 attempt,
2978 max_attempts,
2979 error = %err,
2980 "Compensation hook attempt failed"
2981 );
2982 if !is_last && retry_policy.backoff_ms > 0 {
2983 tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
2984 .await;
2985 }
2986 }
2987 }
2988 }
2989 false
2990}
2991
2992fn ensure_timeline(bus: &mut Bus) -> bool {
2993 if bus.has::<Timeline>() {
2994 false
2995 } else {
2996 bus.insert(Timeline::new());
2997 true
2998 }
2999}
3000
3001fn should_attach_timeline(bus: &Bus) -> bool {
3002 if bus.has::<Timeline>() {
3004 return true;
3005 }
3006
3007 has_timeline_output_path()
3009}
3010
3011fn has_timeline_output_path() -> bool {
3012 std::env::var("RANVIER_TIMELINE_OUTPUT")
3013 .ok()
3014 .map(|v| !v.trim().is_empty())
3015 .unwrap_or(false)
3016}
3017
3018fn timeline_sample_rate() -> f64 {
3019 std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
3020 .ok()
3021 .and_then(|v| v.parse::<f64>().ok())
3022 .map(|v| v.clamp(0.0, 1.0))
3023 .unwrap_or(1.0)
3024}
3025
3026fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
3027 if rate <= 0.0 {
3028 return false;
3029 }
3030 if rate >= 1.0 {
3031 return true;
3032 }
3033 let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
3034 bucket < rate
3035}
3036
3037fn timeline_adaptive_policy() -> String {
3038 std::env::var("RANVIER_TIMELINE_ADAPTIVE")
3039 .unwrap_or_else(|_| "fault_branch".to_string())
3040 .to_ascii_lowercase()
3041}
3042
3043fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
3044 match policy {
3045 "off" => false,
3046 "fault_only" => matches!(outcome, Outcome::Fault(_)),
3047 "fault_branch_emit" => {
3048 matches!(
3049 outcome,
3050 Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
3051 )
3052 }
3053 _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
3054 }
3055}
3056
3057#[derive(Default, Clone)]
3058struct SamplingStats {
3059 total_decisions: u64,
3060 exported: u64,
3061 skipped: u64,
3062 sampled_exports: u64,
3063 forced_exports: u64,
3064 last_mode: String,
3065 last_policy: String,
3066 last_updated_ms: u64,
3067}
3068
3069static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
3070
3071fn stats_cell() -> &'static Mutex<SamplingStats> {
3072 TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
3073}
3074
3075fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
3076 let snapshot = {
3077 let mut stats = match stats_cell().lock() {
3078 Ok(guard) => guard,
3079 Err(_) => return,
3080 };
3081
3082 stats.total_decisions += 1;
3083 if exported {
3084 stats.exported += 1;
3085 } else {
3086 stats.skipped += 1;
3087 }
3088 if sampled && exported {
3089 stats.sampled_exports += 1;
3090 }
3091 if forced && exported {
3092 stats.forced_exports += 1;
3093 }
3094 stats.last_mode = mode.to_string();
3095 stats.last_policy = policy.to_string();
3096 stats.last_updated_ms = now_ms();
3097 stats.clone()
3098 };
3099
3100 tracing::debug!(
3101 ranvier.timeline.total_decisions = snapshot.total_decisions,
3102 ranvier.timeline.exported = snapshot.exported,
3103 ranvier.timeline.skipped = snapshot.skipped,
3104 ranvier.timeline.sampled_exports = snapshot.sampled_exports,
3105 ranvier.timeline.forced_exports = snapshot.forced_exports,
3106 ranvier.timeline.mode = %snapshot.last_mode,
3107 ranvier.timeline.policy = %snapshot.last_policy,
3108 "Timeline sampling stats updated"
3109 );
3110
3111 if let Some(path) = timeline_stats_output_path() {
3112 let payload = serde_json::json!({
3113 "total_decisions": snapshot.total_decisions,
3114 "exported": snapshot.exported,
3115 "skipped": snapshot.skipped,
3116 "sampled_exports": snapshot.sampled_exports,
3117 "forced_exports": snapshot.forced_exports,
3118 "last_mode": snapshot.last_mode,
3119 "last_policy": snapshot.last_policy,
3120 "last_updated_ms": snapshot.last_updated_ms
3121 });
3122 if let Some(parent) = Path::new(&path).parent() {
3123 let _ = fs::create_dir_all(parent);
3124 }
3125 if let Err(err) = fs::write(&path, payload.to_string()) {
3126 tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
3127 }
3128 }
3129}
3130
3131fn timeline_stats_output_path() -> Option<String> {
3132 std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
3133 .ok()
3134 .filter(|v| !v.trim().is_empty())
3135}
3136
3137fn write_timeline_with_policy(
3138 path: &str,
3139 mode: &str,
3140 mut timeline: Timeline,
3141) -> Result<(), String> {
3142 match mode {
3143 "append" => {
3144 if Path::new(path).exists() {
3145 let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
3146 match serde_json::from_str::<Timeline>(&content) {
3147 Ok(mut existing) => {
3148 existing.events.append(&mut timeline.events);
3149 existing.sort();
3150 if let Some(max_events) = max_events_limit() {
3151 truncate_timeline_events(&mut existing, max_events);
3152 }
3153 write_timeline_json(path, &existing)
3154 }
3155 Err(_) => {
3156 if let Some(max_events) = max_events_limit() {
3158 truncate_timeline_events(&mut timeline, max_events);
3159 }
3160 write_timeline_json(path, &timeline)
3161 }
3162 }
3163 } else {
3164 if let Some(max_events) = max_events_limit() {
3165 truncate_timeline_events(&mut timeline, max_events);
3166 }
3167 write_timeline_json(path, &timeline)
3168 }
3169 }
3170 "rotate" => {
3171 let rotated_path = rotated_path(path, now_ms());
3172 write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
3173 if let Some(keep) = rotate_keep_limit() {
3174 cleanup_rotated_files(path, keep)?;
3175 }
3176 Ok(())
3177 }
3178 _ => write_timeline_json(path, &timeline),
3179 }
3180}
3181
3182fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
3183 if let Some(parent) = Path::new(path).parent()
3184 && !parent.as_os_str().is_empty()
3185 {
3186 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
3187 }
3188 let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
3189 fs::write(path, json).map_err(|e| e.to_string())
3190}
3191
3192fn rotated_path(path: &str, suffix: u64) -> PathBuf {
3193 let p = Path::new(path);
3194 let parent = p.parent().unwrap_or_else(|| Path::new(""));
3195 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
3196 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
3197 parent.join(format!("{}_{}.{}", stem, suffix, ext))
3198}
3199
3200fn max_events_limit() -> Option<usize> {
3201 std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
3202 .ok()
3203 .and_then(|v| v.parse::<usize>().ok())
3204 .filter(|v| *v > 0)
3205}
3206
3207fn rotate_keep_limit() -> Option<usize> {
3208 std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
3209 .ok()
3210 .and_then(|v| v.parse::<usize>().ok())
3211 .filter(|v| *v > 0)
3212}
3213
3214fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
3215 let len = timeline.events.len();
3216 if len > max_events {
3217 let keep_from = len - max_events;
3218 timeline.events = timeline.events.split_off(keep_from);
3219 }
3220}
3221
3222fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
3223 let p = Path::new(base_path);
3224 let parent = p.parent().unwrap_or_else(|| Path::new("."));
3225 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
3226 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
3227 let prefix = format!("{}_", stem);
3228 let suffix = format!(".{}", ext);
3229
3230 let mut files = fs::read_dir(parent)
3231 .map_err(|e| e.to_string())?
3232 .filter_map(|entry| entry.ok())
3233 .filter(|entry| {
3234 let name = entry.file_name();
3235 let name = name.to_string_lossy();
3236 name.starts_with(&prefix) && name.ends_with(&suffix)
3237 })
3238 .map(|entry| {
3239 let modified = entry
3240 .metadata()
3241 .ok()
3242 .and_then(|m| m.modified().ok())
3243 .unwrap_or(SystemTime::UNIX_EPOCH);
3244 (entry.path(), modified)
3245 })
3246 .collect::<Vec<_>>();
3247
3248 files.sort_by(|a, b| b.1.cmp(&a.1));
3249 for (path, _) in files.into_iter().skip(keep) {
3250 let _ = fs::remove_file(path);
3251 }
3252 Ok(())
3253}
3254
3255fn bus_capability_schema_from_policy(
3256 policy: Option<ranvier_core::bus::BusAccessPolicy>,
3257) -> Option<BusCapabilitySchema> {
3258 let policy = policy?;
3259
3260 let mut allow = policy
3261 .allow
3262 .unwrap_or_default()
3263 .into_iter()
3264 .map(|entry| entry.type_name.to_string())
3265 .collect::<Vec<_>>();
3266 let mut deny = policy
3267 .deny
3268 .into_iter()
3269 .map(|entry| entry.type_name.to_string())
3270 .collect::<Vec<_>>();
3271 allow.sort();
3272 deny.sort();
3273
3274 if allow.is_empty() && deny.is_empty() {
3275 return None;
3276 }
3277
3278 Some(BusCapabilitySchema { allow, deny })
3279}
3280
3281fn now_ms() -> u64 {
3282 SystemTime::now()
3283 .duration_since(UNIX_EPOCH)
3284 .map(|d| d.as_millis() as u64)
3285 .unwrap_or(0)
3286}
3287
3288#[cfg(test)]
3289mod tests {
3290 use super::{
3291 Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
3292 should_force_export,
3293 };
3294 use crate::persistence::{
3295 CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
3296 CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
3297 InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
3298 PersistenceHandle, PersistenceStore, PersistenceTraceId,
3299 };
3300 use anyhow::Result;
3301 use async_trait::async_trait;
3302 use ranvier_audit::{AuditError, AuditEvent, AuditSink};
3303 use ranvier_core::event::{DlqPolicy, DlqSink};
3304 use ranvier_core::saga::SagaStack;
3305 use ranvier_core::timeline::{Timeline, TimelineEvent};
3306 use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
3307 use serde::{Deserialize, Serialize};
3308 use std::sync::Arc;
3309 use tokio::sync::Mutex;
3310 use uuid::Uuid;
3311
3312 struct MockAuditSink {
3313 events: Arc<Mutex<Vec<AuditEvent>>>,
3314 }
3315
3316 #[async_trait]
3317 impl AuditSink for MockAuditSink {
3318 async fn append(&self, event: &AuditEvent) -> Result<(), AuditError> {
3319 self.events.lock().await.push(event.clone());
3320 Ok(())
3321 }
3322 }
3323
3324 #[tokio::test]
3325 async fn execute_logs_audit_events_for_intervention() {
3326 use ranvier_inspector::StateInspector;
3327
3328 let trace_id = "test-audit-trace";
3329 let store_impl = InMemoryPersistenceStore::new();
3330 let events = Arc::new(Mutex::new(Vec::new()));
3331 let sink = MockAuditSink {
3332 events: events.clone(),
3333 };
3334
3335 let axon = Axon::<i32, i32, TestInfallible>::start("AuditTest")
3336 .then(AddOne)
3337 .with_persistence_store(store_impl.clone())
3338 .with_audit_sink(sink);
3339
3340 let mut bus = Bus::new();
3341 bus.insert(PersistenceHandle::from_arc(Arc::new(store_impl.clone())));
3342 bus.insert(PersistenceTraceId::new(trace_id));
3343 let target_node_id = axon.schematic.nodes[0].id.clone();
3344
3345 store_impl
3347 .append(crate::persistence::PersistenceEnvelope {
3348 trace_id: trace_id.to_string(),
3349 circuit: "AuditTest".to_string(),
3350 schematic_version: "v1.0".to_string(),
3351 step: 0,
3352 node_id: None,
3353 outcome_kind: "Next".to_string(),
3354 timestamp_ms: 0,
3355 payload_hash: None,
3356 payload: None,
3357 })
3358 .await
3359 .unwrap();
3360
3361 axon.force_resume(trace_id, &target_node_id, None)
3363 .await
3364 .unwrap();
3365
3366 axon.execute(10, &(), &mut bus).await;
3368
3369 let recorded = events.lock().await;
3370 assert_eq!(
3371 recorded.len(),
3372 2,
3373 "Should have 2 audit events: ForceResume and ApplyIntervention"
3374 );
3375 assert_eq!(recorded[0].action, "ForceResume");
3376 assert_eq!(recorded[0].target, trace_id);
3377 assert_eq!(recorded[1].action, "ApplyIntervention");
3378 assert_eq!(recorded[1].target, trace_id);
3379 }
3380
3381 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
3382 pub enum TestInfallible {}
3383
3384 #[test]
3385 fn inspector_enabled_flag_matrix() {
3386 assert!(inspector_enabled_from_value(None));
3387 assert!(inspector_enabled_from_value(Some("1")));
3388 assert!(inspector_enabled_from_value(Some("true")));
3389 assert!(inspector_enabled_from_value(Some("on")));
3390 assert!(!inspector_enabled_from_value(Some("0")));
3391 assert!(!inspector_enabled_from_value(Some("false")));
3392 }
3393
3394 #[test]
3395 fn inspector_dev_mode_matrix() {
3396 assert!(inspector_dev_mode_from_value(None));
3397 assert!(inspector_dev_mode_from_value(Some("dev")));
3398 assert!(inspector_dev_mode_from_value(Some("staging")));
3399 assert!(!inspector_dev_mode_from_value(Some("prod")));
3400 assert!(!inspector_dev_mode_from_value(Some("production")));
3401 }
3402
3403 #[test]
3404 fn adaptive_policy_force_export_matrix() {
3405 let next = Outcome::<(), &'static str>::Next(());
3406 let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
3407 let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
3408 let fault = Outcome::<(), &'static str>::Fault("boom");
3409
3410 assert!(!should_force_export(&next, "off"));
3411 assert!(!should_force_export(&fault, "off"));
3412
3413 assert!(!should_force_export(&branch, "fault_only"));
3414 assert!(should_force_export(&fault, "fault_only"));
3415
3416 assert!(should_force_export(&branch, "fault_branch"));
3417 assert!(!should_force_export(&emit, "fault_branch"));
3418 assert!(should_force_export(&fault, "fault_branch"));
3419
3420 assert!(should_force_export(&branch, "fault_branch_emit"));
3421 assert!(should_force_export(&emit, "fault_branch_emit"));
3422 assert!(should_force_export(&fault, "fault_branch_emit"));
3423 }
3424
3425 #[test]
3426 fn sampling_and_adaptive_combination_decisions() {
3427 let bus_id = Uuid::nil();
3428 let next = Outcome::<(), &'static str>::Next(());
3429 let fault = Outcome::<(), &'static str>::Fault("boom");
3430
3431 let sampled_never = sampled_by_bus_id(bus_id, 0.0);
3432 assert!(!sampled_never);
3433 assert!(!(sampled_never || should_force_export(&next, "off")));
3434 assert!(sampled_never || should_force_export(&fault, "fault_only"));
3435
3436 let sampled_always = sampled_by_bus_id(bus_id, 1.0);
3437 assert!(sampled_always);
3438 assert!(sampled_always || should_force_export(&next, "off"));
3439 assert!(sampled_always || should_force_export(&fault, "off"));
3440 }
3441
3442 #[derive(Clone)]
3443 struct AddOne;
3444
3445 #[async_trait]
3446 impl Transition<i32, i32> for AddOne {
3447 type Error = TestInfallible;
3448 type Resources = ();
3449
3450 async fn run(
3451 &self,
3452 state: i32,
3453 _resources: &Self::Resources,
3454 _bus: &mut Bus,
3455 ) -> Outcome<i32, Self::Error> {
3456 Outcome::Next(state + 1)
3457 }
3458 }
3459
3460 #[derive(Clone)]
3461 struct AlwaysFault;
3462
3463 #[async_trait]
3464 impl Transition<i32, i32> for AlwaysFault {
3465 type Error = String;
3466 type Resources = ();
3467
3468 async fn run(
3469 &self,
3470 _state: i32,
3471 _resources: &Self::Resources,
3472 _bus: &mut Bus,
3473 ) -> Outcome<i32, Self::Error> {
3474 Outcome::Fault("boom".to_string())
3475 }
3476 }
3477
3478 #[derive(Clone)]
3479 struct CapabilityGuarded;
3480
3481 #[async_trait]
3482 impl Transition<(), ()> for CapabilityGuarded {
3483 type Error = String;
3484 type Resources = ();
3485
3486 fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
3487 Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
3488 }
3489
3490 async fn run(
3491 &self,
3492 _state: (),
3493 _resources: &Self::Resources,
3494 bus: &mut Bus,
3495 ) -> Outcome<(), Self::Error> {
3496 match bus.get::<String>() {
3497 Ok(_) => Outcome::Next(()),
3498 Err(err) => Outcome::Fault(err.to_string()),
3499 }
3500 }
3501 }
3502
3503 #[derive(Clone)]
3504 struct RecordingCompensationHook {
3505 calls: Arc<Mutex<Vec<CompensationContext>>>,
3506 should_fail: bool,
3507 }
3508
3509 #[async_trait]
3510 impl CompensationHook for RecordingCompensationHook {
3511 async fn compensate(&self, context: CompensationContext) -> Result<()> {
3512 self.calls.lock().await.push(context);
3513 if self.should_fail {
3514 return Err(anyhow::anyhow!("compensation failed"));
3515 }
3516 Ok(())
3517 }
3518 }
3519
3520 #[derive(Clone)]
3521 struct FlakyCompensationHook {
3522 calls: Arc<Mutex<u32>>,
3523 failures_remaining: Arc<Mutex<u32>>,
3524 }
3525
3526 #[async_trait]
3527 impl CompensationHook for FlakyCompensationHook {
3528 async fn compensate(&self, _context: CompensationContext) -> Result<()> {
3529 {
3530 let mut calls = self.calls.lock().await;
3531 *calls += 1;
3532 }
3533 let mut failures_remaining = self.failures_remaining.lock().await;
3534 if *failures_remaining > 0 {
3535 *failures_remaining -= 1;
3536 return Err(anyhow::anyhow!("transient compensation failure"));
3537 }
3538 Ok(())
3539 }
3540 }
3541
3542 #[derive(Clone)]
3543 struct FailingCompensationIdempotencyStore {
3544 read_calls: Arc<Mutex<u32>>,
3545 write_calls: Arc<Mutex<u32>>,
3546 }
3547
3548 #[async_trait]
3549 impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
3550 async fn was_compensated(&self, _key: &str) -> Result<bool> {
3551 let mut read_calls = self.read_calls.lock().await;
3552 *read_calls += 1;
3553 Err(anyhow::anyhow!("forced idempotency read failure"))
3554 }
3555
3556 async fn mark_compensated(&self, _key: &str) -> Result<()> {
3557 let mut write_calls = self.write_calls.lock().await;
3558 *write_calls += 1;
3559 Err(anyhow::anyhow!("forced idempotency write failure"))
3560 }
3561 }
3562
3563 #[tokio::test]
3564 async fn execute_persists_success_trace_when_handle_exists() {
3565 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3566 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3567
3568 let mut bus = Bus::new();
3569 bus.insert(PersistenceHandle::from_arc(store_dyn));
3570 bus.insert(PersistenceTraceId::new("trace-success"));
3571
3572 let axon = Axon::<i32, i32, TestInfallible>::start("PersistSuccess").then(AddOne);
3573 let outcome = axon.execute(41, &(), &mut bus).await;
3574 assert!(matches!(outcome, Outcome::Next(42)));
3575
3576 let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
3577 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[0].outcome_kind, "Enter");
3579 assert_eq!(persisted.events[1].outcome_kind, "Next"); assert_eq!(persisted.events[2].outcome_kind, "Next"); assert_eq!(persisted.completion, Some(CompletionState::Success));
3582 }
3583
3584 #[tokio::test]
3585 async fn execute_persists_fault_completion_state() {
3586 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3587 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3588
3589 let mut bus = Bus::new();
3590 bus.insert(PersistenceHandle::from_arc(store_dyn));
3591 bus.insert(PersistenceTraceId::new("trace-fault"));
3592
3593 let axon = Axon::<i32, i32, String>::start("PersistFault").then(AlwaysFault);
3594 let outcome = axon.execute(41, &(), &mut bus).await;
3595 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3596
3597 let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
3598 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[1].outcome_kind, "Fault"); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.completion, Some(CompletionState::Fault));
3602 }
3603
3604 #[tokio::test]
3605 async fn execute_respects_persistence_auto_complete_off() {
3606 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3607 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3608
3609 let mut bus = Bus::new();
3610 bus.insert(PersistenceHandle::from_arc(store_dyn));
3611 bus.insert(PersistenceTraceId::new("trace-no-complete"));
3612 bus.insert(PersistenceAutoComplete(false));
3613
3614 let axon = Axon::<i32, i32, TestInfallible>::start("PersistNoComplete").then(AddOne);
3615 let outcome = axon.execute(1, &(), &mut bus).await;
3616 assert!(matches!(outcome, Outcome::Next(2)));
3617
3618 let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
3619 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.completion, None);
3621 }
3622
3623 #[tokio::test]
3624 async fn fault_triggers_compensation_and_marks_compensated() {
3625 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3626 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3627 let calls = Arc::new(Mutex::new(Vec::new()));
3628 let compensation = RecordingCompensationHook {
3629 calls: calls.clone(),
3630 should_fail: false,
3631 };
3632
3633 let mut bus = Bus::new();
3634 bus.insert(PersistenceHandle::from_arc(store_dyn));
3635 bus.insert(PersistenceTraceId::new("trace-compensated"));
3636 bus.insert(CompensationHandle::from_hook(compensation));
3637
3638 let axon = Axon::<i32, i32, String>::start("CompensatedFault").then(AlwaysFault);
3639 let outcome = axon.execute(7, &(), &mut bus).await;
3640 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3641
3642 let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
3643 assert_eq!(persisted.events.len(), 4); assert_eq!(persisted.events[0].outcome_kind, "Enter");
3645 assert_eq!(persisted.events[1].outcome_kind, "Fault"); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.events[3].outcome_kind, "Compensated");
3648 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3649
3650 let recorded = calls.lock().await;
3651 assert_eq!(recorded.len(), 1);
3652 assert_eq!(recorded[0].trace_id, "trace-compensated");
3653 assert_eq!(recorded[0].fault_kind, "Fault");
3654 }
3655
3656 #[tokio::test]
3657 async fn failed_compensation_keeps_fault_completion() {
3658 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3659 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3660 let calls = Arc::new(Mutex::new(Vec::new()));
3661 let compensation = RecordingCompensationHook {
3662 calls: calls.clone(),
3663 should_fail: true,
3664 };
3665
3666 let mut bus = Bus::new();
3667 bus.insert(PersistenceHandle::from_arc(store_dyn));
3668 bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
3669 bus.insert(CompensationHandle::from_hook(compensation));
3670
3671 let axon = Axon::<i32, i32, String>::start("CompensationFails").then(AlwaysFault);
3672 let outcome = axon.execute(7, &(), &mut bus).await;
3673 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3674
3675 let persisted = store_impl
3676 .load("trace-compensation-failed")
3677 .await
3678 .unwrap()
3679 .unwrap();
3680 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.completion, Some(CompletionState::Fault));
3683
3684 let recorded = calls.lock().await;
3685 assert_eq!(recorded.len(), 1);
3686 }
3687
3688 #[tokio::test]
3689 async fn compensation_retry_policy_succeeds_after_retries() {
3690 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3691 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3692 let calls = Arc::new(Mutex::new(0u32));
3693 let failures_remaining = Arc::new(Mutex::new(2u32));
3694 let compensation = FlakyCompensationHook {
3695 calls: calls.clone(),
3696 failures_remaining,
3697 };
3698
3699 let mut bus = Bus::new();
3700 bus.insert(PersistenceHandle::from_arc(store_dyn));
3701 bus.insert(PersistenceTraceId::new("trace-retry-success"));
3702 bus.insert(CompensationHandle::from_hook(compensation));
3703 bus.insert(CompensationRetryPolicy {
3704 max_attempts: 3,
3705 backoff_ms: 0,
3706 });
3707
3708 let axon = Axon::<i32, i32, String>::start("CompensationRetry").then(AlwaysFault);
3709 let outcome = axon.execute(7, &(), &mut bus).await;
3710 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3711
3712 let persisted = store_impl
3713 .load("trace-retry-success")
3714 .await
3715 .unwrap()
3716 .unwrap();
3717 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3718 assert_eq!(
3719 persisted.events.last().map(|e| e.outcome_kind.as_str()),
3720 Some("Compensated")
3721 );
3722
3723 let attempt_count = calls.lock().await;
3724 assert_eq!(*attempt_count, 3);
3725 }
3726
3727 #[tokio::test]
3728 async fn compensation_idempotency_skips_duplicate_hook_execution() {
3729 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3730 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3731 let calls = Arc::new(Mutex::new(Vec::new()));
3732 let compensation = RecordingCompensationHook {
3733 calls: calls.clone(),
3734 should_fail: false,
3735 };
3736 let idempotency = InMemoryCompensationIdempotencyStore::new();
3737
3738 let mut bus = Bus::new();
3739 bus.insert(PersistenceHandle::from_arc(store_dyn));
3740 bus.insert(PersistenceTraceId::new("trace-idempotent"));
3741 bus.insert(PersistenceAutoComplete(false));
3742 bus.insert(CompensationHandle::from_hook(compensation));
3743 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3744
3745 let axon = Axon::<i32, i32, String>::start("CompensationIdempotency").then(AlwaysFault);
3746
3747 let outcome1 = axon.execute(7, &(), &mut bus).await;
3748 let outcome2 = axon.execute(8, &(), &mut bus).await;
3749 assert!(matches!(outcome1, Outcome::Fault(msg) if msg == "boom"));
3750 assert!(matches!(outcome2, Outcome::Fault(msg) if msg == "boom"));
3751
3752 let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
3753 assert_eq!(persisted.completion, None);
3754 let compensated_count = persisted
3756 .events
3757 .iter()
3758 .filter(|e| e.outcome_kind == "Compensated")
3759 .count();
3760 assert_eq!(
3761 compensated_count, 2,
3762 "Should have 2 Compensated events (one per execution)"
3763 );
3764
3765 let recorded = calls.lock().await;
3766 assert_eq!(recorded.len(), 1);
3767 }
3768
3769 #[tokio::test]
3770 async fn compensation_idempotency_store_failure_does_not_block_compensation() {
3771 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3772 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3773 let calls = Arc::new(Mutex::new(Vec::new()));
3774 let read_calls = Arc::new(Mutex::new(0u32));
3775 let write_calls = Arc::new(Mutex::new(0u32));
3776 let compensation = RecordingCompensationHook {
3777 calls: calls.clone(),
3778 should_fail: false,
3779 };
3780 let idempotency = FailingCompensationIdempotencyStore {
3781 read_calls: read_calls.clone(),
3782 write_calls: write_calls.clone(),
3783 };
3784
3785 let mut bus = Bus::new();
3786 bus.insert(PersistenceHandle::from_arc(store_dyn));
3787 bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
3788 bus.insert(CompensationHandle::from_hook(compensation));
3789 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3790
3791 let axon = Axon::<i32, i32, String>::start("IdempotencyStoreFailure").then(AlwaysFault);
3792 let outcome = axon.execute(9, &(), &mut bus).await;
3793 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3794
3795 let persisted = store_impl
3796 .load("trace-idempotency-store-failure")
3797 .await
3798 .unwrap()
3799 .unwrap();
3800 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3801 assert_eq!(
3802 persisted.events.last().map(|e| e.outcome_kind.as_str()),
3803 Some("Compensated")
3804 );
3805
3806 let recorded = calls.lock().await;
3807 assert_eq!(recorded.len(), 1);
3808 assert_eq!(*read_calls.lock().await, 1);
3809 assert_eq!(*write_calls.lock().await, 1);
3810 }
3811
3812 #[tokio::test]
3813 async fn transition_bus_policy_blocks_unauthorized_resource_access() {
3814 let mut bus = Bus::new();
3815 bus.insert(1_i32);
3816 bus.insert("secret".to_string());
3817
3818 let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
3819 let outcome = axon.execute((), &(), &mut bus).await;
3820
3821 match outcome {
3822 Outcome::Fault(msg) => {
3823 assert!(msg.contains("Bus access denied"), "{msg}");
3824 assert!(msg.contains("CapabilityGuarded"), "{msg}");
3825 assert!(msg.contains("alloc::string::String"), "{msg}");
3826 }
3827 other => panic!("expected fault, got {other:?}"),
3828 }
3829 }
3830
3831 #[tokio::test]
3832 async fn execute_fails_on_version_mismatch_without_migration() {
3833 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3834 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3835
3836 let trace_id = "v-mismatch";
3837 let old_envelope = crate::persistence::PersistenceEnvelope {
3839 trace_id: trace_id.to_string(),
3840 circuit: "TestCircuit".to_string(),
3841 schematic_version: "0.9".to_string(),
3842 step: 0,
3843 node_id: None,
3844 outcome_kind: "Enter".to_string(),
3845 timestamp_ms: 0,
3846 payload_hash: None,
3847 payload: None,
3848 };
3849 store_impl.append(old_envelope).await.unwrap();
3850
3851 let mut bus = Bus::new();
3852 bus.insert(PersistenceHandle::from_arc(store_dyn));
3853 bus.insert(PersistenceTraceId::new(trace_id));
3854
3855 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3857 let outcome = axon.execute(10, &(), &mut bus).await;
3858
3859 if let Outcome::Emit(kind, _) = outcome {
3860 assert_eq!(kind, "execution.resumption.version_mismatch_failed");
3861 } else {
3862 panic!("Expected version mismatch emission, got {:?}", outcome);
3863 }
3864 }
3865
3866 #[tokio::test]
3867 async fn execute_resumes_from_start_on_migration_strategy() {
3868 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3869 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3870
3871 let trace_id = "v-migration";
3872 let old_envelope = crate::persistence::PersistenceEnvelope {
3874 trace_id: trace_id.to_string(),
3875 circuit: "TestCircuit".to_string(),
3876 schematic_version: "0.9".to_string(),
3877 step: 5,
3878 node_id: None,
3879 outcome_kind: "Next".to_string(),
3880 timestamp_ms: 0,
3881 payload_hash: None,
3882 payload: None,
3883 };
3884 store_impl.append(old_envelope).await.unwrap();
3885
3886 let mut registry = ranvier_core::schematic::MigrationRegistry::new("TestCircuit");
3887 registry.register(ranvier_core::schematic::SnapshotMigration {
3888 name: Some("v0.9 to v1.0".to_string()),
3889 from_version: "0.9".to_string(),
3890 to_version: "1.0".to_string(),
3891 default_strategy: ranvier_core::schematic::MigrationStrategy::ResumeFromStart,
3892 node_mapping: std::collections::HashMap::new(),
3893 payload_mapper: None,
3894 });
3895
3896 let mut bus = Bus::new();
3897 bus.insert(PersistenceHandle::from_arc(store_dyn));
3898 bus.insert(PersistenceTraceId::new(trace_id));
3899 bus.insert(registry);
3900
3901 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3902 let outcome = axon.execute(10, &(), &mut bus).await;
3903
3904 assert!(matches!(outcome, Outcome::Next(11)));
3906
3907 let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3909 assert_eq!(persisted.schematic_version, "1.0");
3910 }
3911
3912 #[tokio::test]
3913 async fn execute_applies_manual_intervention_jump_and_payload() {
3914 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3915 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3916
3917 let trace_id = "intervention-test";
3918 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit")
3920 .then(AddOne)
3921 .then(AddOne);
3922
3923 let mut bus = Bus::new();
3924 bus.insert(PersistenceHandle::from_arc(store_dyn));
3925 bus.insert(PersistenceTraceId::new(trace_id));
3926
3927 let _target_node_label = "AddOne";
3932 let target_node_id = axon.schematic.nodes[2].id.clone();
3934
3935 store_impl
3937 .append(crate::persistence::PersistenceEnvelope {
3938 trace_id: trace_id.to_string(),
3939 circuit: "TestCircuit".to_string(),
3940 schematic_version: "1.0".to_string(),
3941 step: 0,
3942 node_id: None,
3943 outcome_kind: "Enter".to_string(),
3944 timestamp_ms: 0,
3945 payload_hash: None,
3946 payload: None,
3947 })
3948 .await
3949 .unwrap();
3950
3951 store_impl
3952 .save_intervention(
3953 trace_id,
3954 crate::persistence::Intervention {
3955 target_node: target_node_id.clone(),
3956 payload_override: Some(serde_json::json!(100)),
3957 timestamp_ms: 0,
3958 },
3959 )
3960 .await
3961 .unwrap();
3962
3963 let outcome = axon.execute(10, &(), &mut bus).await;
3966
3967 match outcome {
3968 Outcome::Next(val) => assert_eq!(val, 101, "Should have used payload 100 and added 1"),
3969 other => panic!("Expected Outcome::Next(101), got {:?}", other),
3970 }
3971
3972 let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3974 assert_eq!(persisted.interventions.len(), 1);
3976 assert_eq!(persisted.interventions[0].target_node, target_node_id);
3977 }
3978
3979 #[derive(Clone)]
3983 struct FailNThenSucceed {
3984 remaining: Arc<tokio::sync::Mutex<u32>>,
3985 }
3986
3987 #[async_trait]
3988 impl Transition<i32, i32> for FailNThenSucceed {
3989 type Error = String;
3990 type Resources = ();
3991
3992 async fn run(
3993 &self,
3994 state: i32,
3995 _resources: &Self::Resources,
3996 _bus: &mut Bus,
3997 ) -> Outcome<i32, Self::Error> {
3998 let mut rem = self.remaining.lock().await;
3999 if *rem > 0 {
4000 *rem -= 1;
4001 Outcome::Fault("transient failure".to_string())
4002 } else {
4003 Outcome::Next(state + 1)
4004 }
4005 }
4006 }
4007
4008 #[derive(Clone)]
4010 struct MockDlqSink {
4011 letters: Arc<tokio::sync::Mutex<Vec<String>>>,
4012 }
4013
4014 #[async_trait]
4015 impl DlqSink for MockDlqSink {
4016 async fn store_dead_letter(
4017 &self,
4018 workflow_id: &str,
4019 _circuit_label: &str,
4020 node_id: &str,
4021 error_msg: &str,
4022 _payload: &[u8],
4023 ) -> Result<(), String> {
4024 let entry = format!("{}:{}:{}", workflow_id, node_id, error_msg);
4025 self.letters.lock().await.push(entry);
4026 Ok(())
4027 }
4028 }
4029
4030 #[tokio::test]
4031 async fn retry_then_dlq_retries_and_succeeds_before_exhaustion() {
4032 let remaining = Arc::new(tokio::sync::Mutex::new(2u32));
4034 let trans = FailNThenSucceed { remaining };
4035
4036 let dlq_sink = MockDlqSink {
4037 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4038 };
4039
4040 let mut bus = Bus::new();
4041 bus.insert(Timeline::new());
4042
4043 let axon = Axon::<i32, i32, String>::start("RetrySucceed")
4044 .then(trans)
4045 .with_dlq_policy(DlqPolicy::RetryThenDlq {
4046 max_attempts: 5,
4047 backoff_ms: 1,
4048 })
4049 .with_dlq_sink(dlq_sink.clone());
4050 let outcome = axon.execute(10, &(), &mut bus).await;
4051
4052 assert!(
4054 matches!(outcome, Outcome::Next(11)),
4055 "Expected Next(11), got {:?}",
4056 outcome
4057 );
4058
4059 let letters = dlq_sink.letters.lock().await;
4061 assert!(
4062 letters.is_empty(),
4063 "Should have 0 dead letters, got {}",
4064 letters.len()
4065 );
4066
4067 let timeline = bus.read::<Timeline>().unwrap();
4069 let retry_count = timeline
4070 .events
4071 .iter()
4072 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4073 .count();
4074 assert_eq!(retry_count, 2, "Should have 2 retry events");
4075 }
4076
4077 #[tokio::test]
4078 async fn retry_then_dlq_exhausts_retries_and_sends_to_dlq() {
4079 let mut bus = Bus::new();
4081 bus.insert(Timeline::new());
4082
4083 let dlq_sink = MockDlqSink {
4084 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4085 };
4086
4087 let axon = Axon::<i32, i32, String>::start("RetryExhaust")
4088 .then(AlwaysFault)
4089 .with_dlq_policy(DlqPolicy::RetryThenDlq {
4090 max_attempts: 3,
4091 backoff_ms: 1,
4092 })
4093 .with_dlq_sink(dlq_sink.clone());
4094 let outcome = axon.execute(42, &(), &mut bus).await;
4095
4096 assert!(
4097 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4098 "Expected Fault(boom), got {:?}",
4099 outcome
4100 );
4101
4102 let letters = dlq_sink.letters.lock().await;
4104 assert_eq!(letters.len(), 1, "Should have 1 dead letter");
4105
4106 let timeline = bus.read::<Timeline>().unwrap();
4108 let retry_count = timeline
4109 .events
4110 .iter()
4111 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4112 .count();
4113 let dlq_count = timeline
4114 .events
4115 .iter()
4116 .filter(|e| matches!(e, TimelineEvent::DlqExhausted { .. }))
4117 .count();
4118 assert_eq!(
4119 retry_count, 2,
4120 "Should have 2 retry events (attempts 2 and 3)"
4121 );
4122 assert_eq!(dlq_count, 1, "Should have 1 DlqExhausted event");
4123 }
4124
4125 #[tokio::test]
4126 async fn send_to_dlq_policy_sends_immediately_without_retry() {
4127 let mut bus = Bus::new();
4128 bus.insert(Timeline::new());
4129
4130 let dlq_sink = MockDlqSink {
4131 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4132 };
4133
4134 let axon = Axon::<i32, i32, String>::start("SendDlq")
4135 .then(AlwaysFault)
4136 .with_dlq_policy(DlqPolicy::SendToDlq)
4137 .with_dlq_sink(dlq_sink.clone());
4138 let outcome = axon.execute(1, &(), &mut bus).await;
4139
4140 assert!(matches!(outcome, Outcome::Fault(_)));
4141
4142 let letters = dlq_sink.letters.lock().await;
4144 assert_eq!(letters.len(), 1);
4145
4146 let timeline = bus.read::<Timeline>().unwrap();
4148 let retry_count = timeline
4149 .events
4150 .iter()
4151 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4152 .count();
4153 assert_eq!(retry_count, 0);
4154 }
4155
4156 #[tokio::test]
4157 async fn drop_policy_does_not_send_to_dlq() {
4158 let mut bus = Bus::new();
4159
4160 let dlq_sink = MockDlqSink {
4161 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4162 };
4163
4164 let axon = Axon::<i32, i32, String>::start("DropDlq")
4165 .then(AlwaysFault)
4166 .with_dlq_policy(DlqPolicy::Drop)
4167 .with_dlq_sink(dlq_sink.clone());
4168 let outcome = axon.execute(1, &(), &mut bus).await;
4169
4170 assert!(matches!(outcome, Outcome::Fault(_)));
4171
4172 let letters = dlq_sink.letters.lock().await;
4174 assert!(letters.is_empty());
4175 }
4176
4177 #[tokio::test]
4178 async fn dynamic_policy_hot_reload_changes_dlq_behavior() {
4179 use ranvier_core::policy::DynamicPolicy;
4180
4181 let (tx, dynamic) = DynamicPolicy::new(DlqPolicy::Drop);
4183 let dlq_sink = MockDlqSink {
4184 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4185 };
4186
4187 let axon = Axon::<i32, i32, String>::start("DynamicDlq")
4188 .then(AlwaysFault)
4189 .with_dynamic_dlq_policy(dynamic)
4190 .with_dlq_sink(dlq_sink.clone());
4191
4192 let mut bus = Bus::new();
4194 let outcome = axon.execute(1, &(), &mut bus).await;
4195 assert!(matches!(outcome, Outcome::Fault(_)));
4196 assert!(
4197 dlq_sink.letters.lock().await.is_empty(),
4198 "Drop policy should produce no DLQ entries"
4199 );
4200
4201 tx.send(DlqPolicy::SendToDlq).unwrap();
4203
4204 let mut bus2 = Bus::new();
4206 let outcome2 = axon.execute(2, &(), &mut bus2).await;
4207 assert!(matches!(outcome2, Outcome::Fault(_)));
4208 assert_eq!(
4209 dlq_sink.letters.lock().await.len(),
4210 1,
4211 "SendToDlq policy should produce 1 DLQ entry"
4212 );
4213 }
4214
4215 #[tokio::test]
4216 async fn dynamic_saga_policy_hot_reload() {
4217 use ranvier_core::policy::DynamicPolicy;
4218 use ranvier_core::saga::SagaPolicy;
4219
4220 let (tx, dynamic) = DynamicPolicy::new(SagaPolicy::Disabled);
4222
4223 let axon = Axon::<i32, i32, TestInfallible>::start("DynamicSaga")
4224 .then(AddOne)
4225 .with_dynamic_saga_policy(dynamic);
4226
4227 let mut bus = Bus::new();
4229 let _outcome = axon.execute(1, &(), &mut bus).await;
4230 assert!(
4231 bus.read::<SagaStack>().is_none() || bus.read::<SagaStack>().unwrap().is_empty(),
4232 "SagaStack should be absent or empty when disabled"
4233 );
4234
4235 tx.send(SagaPolicy::Enabled).unwrap();
4237
4238 let mut bus2 = Bus::new();
4240 let _outcome2 = axon.execute(10, &(), &mut bus2).await;
4241 assert!(
4242 bus2.read::<SagaStack>().is_some(),
4243 "SagaStack should exist when saga is enabled"
4244 );
4245 }
4246
4247 mod iam_tests {
4250 use super::*;
4251 use ranvier_core::iam::{IamError, IamIdentity, IamPolicy, IamToken, IamVerifier};
4252
4253 #[derive(Clone)]
4255 struct MockVerifier {
4256 identity: IamIdentity,
4257 should_fail: bool,
4258 }
4259
4260 #[async_trait]
4261 impl IamVerifier for MockVerifier {
4262 async fn verify(&self, _token: &str) -> Result<IamIdentity, IamError> {
4263 if self.should_fail {
4264 Err(IamError::InvalidToken("mock verification failure".into()))
4265 } else {
4266 Ok(self.identity.clone())
4267 }
4268 }
4269 }
4270
4271 #[tokio::test]
4272 async fn iam_require_identity_passes_with_valid_token() {
4273 let verifier = MockVerifier {
4274 identity: IamIdentity::new("alice").with_role("user"),
4275 should_fail: false,
4276 };
4277
4278 let axon = Axon::<i32, i32, TestInfallible>::new("IamTest")
4279 .with_iam(IamPolicy::RequireIdentity, verifier)
4280 .then(AddOne);
4281
4282 let mut bus = Bus::new();
4283 bus.insert(IamToken("valid-token".to_string()));
4284 let outcome = axon.execute(10, &(), &mut bus).await;
4285
4286 assert!(matches!(outcome, Outcome::Next(11)));
4287 let identity = bus
4289 .read::<IamIdentity>()
4290 .expect("IamIdentity should be in Bus");
4291 assert_eq!(identity.subject, "alice");
4292 }
4293
4294 #[tokio::test]
4295 async fn iam_require_identity_rejects_missing_token() {
4296 let verifier = MockVerifier {
4297 identity: IamIdentity::new("ignored"),
4298 should_fail: false,
4299 };
4300
4301 let axon = Axon::<i32, i32, TestInfallible>::new("IamNoToken")
4302 .with_iam(IamPolicy::RequireIdentity, verifier)
4303 .then(AddOne);
4304
4305 let mut bus = Bus::new();
4306 let outcome = axon.execute(10, &(), &mut bus).await;
4308
4309 match &outcome {
4311 Outcome::Emit(label, _) => {
4312 assert_eq!(label, "iam.missing_token");
4313 }
4314 other => panic!("Expected Emit(iam.missing_token), got {:?}", other),
4315 }
4316 }
4317
4318 #[tokio::test]
4319 async fn iam_rejects_failed_verification() {
4320 let verifier = MockVerifier {
4321 identity: IamIdentity::new("ignored"),
4322 should_fail: true,
4323 };
4324
4325 let axon = Axon::<i32, i32, TestInfallible>::new("IamBadToken")
4326 .with_iam(IamPolicy::RequireIdentity, verifier)
4327 .then(AddOne);
4328
4329 let mut bus = Bus::new();
4330 bus.insert(IamToken("bad-token".to_string()));
4331 let outcome = axon.execute(10, &(), &mut bus).await;
4332
4333 match &outcome {
4334 Outcome::Emit(label, _) => {
4335 assert_eq!(label, "iam.verification_failed");
4336 }
4337 other => panic!("Expected Emit(iam.verification_failed), got {:?}", other),
4338 }
4339 }
4340
4341 #[tokio::test]
4342 async fn iam_require_role_passes_with_matching_role() {
4343 let verifier = MockVerifier {
4344 identity: IamIdentity::new("bob").with_role("admin").with_role("user"),
4345 should_fail: false,
4346 };
4347
4348 let axon = Axon::<i32, i32, TestInfallible>::new("IamRole")
4349 .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
4350 .then(AddOne);
4351
4352 let mut bus = Bus::new();
4353 bus.insert(IamToken("token".to_string()));
4354 let outcome = axon.execute(5, &(), &mut bus).await;
4355
4356 assert!(matches!(outcome, Outcome::Next(6)));
4357 }
4358
4359 #[tokio::test]
4360 async fn iam_require_role_denies_without_role() {
4361 let verifier = MockVerifier {
4362 identity: IamIdentity::new("carol").with_role("user"),
4363 should_fail: false,
4364 };
4365
4366 let axon = Axon::<i32, i32, TestInfallible>::new("IamRoleDeny")
4367 .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
4368 .then(AddOne);
4369
4370 let mut bus = Bus::new();
4371 bus.insert(IamToken("token".to_string()));
4372 let outcome = axon.execute(5, &(), &mut bus).await;
4373
4374 match &outcome {
4375 Outcome::Emit(label, _) => {
4376 assert_eq!(label, "iam.policy_denied");
4377 }
4378 other => panic!("Expected Emit(iam.policy_denied), got {:?}", other),
4379 }
4380 }
4381
4382 #[tokio::test]
4383 async fn iam_policy_none_skips_verification() {
4384 let verifier = MockVerifier {
4385 identity: IamIdentity::new("ignored"),
4386 should_fail: true, };
4388
4389 let axon = Axon::<i32, i32, TestInfallible>::new("IamNone")
4390 .with_iam(IamPolicy::None, verifier)
4391 .then(AddOne);
4392
4393 let mut bus = Bus::new();
4394 let outcome = axon.execute(10, &(), &mut bus).await;
4396
4397 assert!(matches!(outcome, Outcome::Next(11)));
4398 }
4399 }
4400
4401 #[derive(Clone)]
4404 struct SchemaTransition;
4405
4406 #[async_trait]
4407 impl Transition<String, String> for SchemaTransition {
4408 type Error = String;
4409 type Resources = ();
4410
4411 fn input_schema(&self) -> Option<serde_json::Value> {
4412 Some(serde_json::json!({
4413 "type": "object",
4414 "required": ["name"],
4415 "properties": {
4416 "name": { "type": "string" }
4417 }
4418 }))
4419 }
4420
4421 async fn run(
4422 &self,
4423 state: String,
4424 _resources: &Self::Resources,
4425 _bus: &mut Bus,
4426 ) -> Outcome<String, Self::Error> {
4427 Outcome::Next(state)
4428 }
4429 }
4430
4431 #[test]
4432 fn then_auto_populates_input_schema_from_transition() {
4433 let axon = Axon::<String, String, String>::new("SchemaTest").then(SchemaTransition);
4434
4435 let last_node = axon.schematic.nodes.last().unwrap();
4437 assert!(last_node.input_schema.is_some());
4438 let schema = last_node.input_schema.as_ref().unwrap();
4439 assert_eq!(schema["type"], "object");
4440 assert_eq!(schema["required"][0], "name");
4441 }
4442
4443 #[test]
4444 fn then_leaves_input_schema_none_when_not_provided() {
4445 let axon = Axon::<i32, i32, TestInfallible>::new("NoSchema").then(AddOne);
4446
4447 let last_node = axon.schematic.nodes.last().unwrap();
4448 assert!(last_node.input_schema.is_none());
4449 }
4450
4451 #[test]
4452 fn with_input_schema_value_sets_on_last_node() {
4453 let schema = serde_json::json!({"type": "integer"});
4454 let axon = Axon::<i32, i32, TestInfallible>::new("ManualSchema")
4455 .then(AddOne)
4456 .with_input_schema_value(schema.clone());
4457
4458 let last_node = axon.schematic.nodes.last().unwrap();
4459 assert_eq!(last_node.input_schema.as_ref().unwrap(), &schema);
4460 }
4461
4462 #[test]
4463 fn with_output_schema_value_sets_on_last_node() {
4464 let schema = serde_json::json!({"type": "integer"});
4465 let axon = Axon::<i32, i32, TestInfallible>::new("OutputSchema")
4466 .then(AddOne)
4467 .with_output_schema_value(schema.clone());
4468
4469 let last_node = axon.schematic.nodes.last().unwrap();
4470 assert_eq!(last_node.output_schema.as_ref().unwrap(), &schema);
4471 }
4472
4473 #[test]
4474 fn schematic_export_includes_schema_fields() {
4475 let axon = Axon::<String, String, String>::new("ExportTest")
4476 .then(SchemaTransition)
4477 .with_output_schema_value(serde_json::json!({"type": "string"}));
4478
4479 let json = serde_json::to_value(&axon.schematic).unwrap();
4480 let nodes = json["nodes"].as_array().unwrap();
4481 let last = nodes.last().unwrap();
4483 assert!(last.get("input_schema").is_some());
4484 assert_eq!(last["input_schema"]["type"], "object");
4485 assert_eq!(last["output_schema"]["type"], "string");
4486 }
4487
4488 #[test]
4489 fn schematic_export_omits_schema_fields_when_none() {
4490 let axon = Axon::<i32, i32, TestInfallible>::new("NoSchemaExport").then(AddOne);
4491
4492 let json = serde_json::to_value(&axon.schematic).unwrap();
4493 let nodes = json["nodes"].as_array().unwrap();
4494 let last = nodes.last().unwrap();
4495 let obj = last.as_object().unwrap();
4496 assert!(!obj.contains_key("input_schema"));
4497 assert!(!obj.contains_key("output_schema"));
4498 }
4499
4500 #[test]
4501 fn schematic_json_roundtrip_preserves_schemas() {
4502 let axon = Axon::<String, String, String>::new("Roundtrip")
4503 .then(SchemaTransition)
4504 .with_output_schema_value(serde_json::json!({"type": "string"}));
4505
4506 let json_str = serde_json::to_string(&axon.schematic).unwrap();
4507 let deserialized: ranvier_core::schematic::Schematic =
4508 serde_json::from_str(&json_str).unwrap();
4509
4510 let last = deserialized.nodes.last().unwrap();
4511 assert!(last.input_schema.is_some());
4512 assert!(last.output_schema.is_some());
4513 assert_eq!(last.input_schema.as_ref().unwrap()["required"][0], "name");
4514 assert_eq!(last.output_schema.as_ref().unwrap()["type"], "string");
4515 }
4516
4517 #[derive(Clone)]
4519 struct MultiplyByTwo;
4520
4521 #[async_trait]
4522 impl Transition<i32, i32> for MultiplyByTwo {
4523 type Error = TestInfallible;
4524 type Resources = ();
4525
4526 async fn run(
4527 &self,
4528 state: i32,
4529 _resources: &Self::Resources,
4530 _bus: &mut Bus,
4531 ) -> Outcome<i32, Self::Error> {
4532 Outcome::Next(state * 2)
4533 }
4534 }
4535
4536 #[derive(Clone)]
4537 struct AddTen;
4538
4539 #[async_trait]
4540 impl Transition<i32, i32> for AddTen {
4541 type Error = TestInfallible;
4542 type Resources = ();
4543
4544 async fn run(
4545 &self,
4546 state: i32,
4547 _resources: &Self::Resources,
4548 _bus: &mut Bus,
4549 ) -> Outcome<i32, Self::Error> {
4550 Outcome::Next(state + 10)
4551 }
4552 }
4553
4554 #[derive(Clone)]
4555 struct AddOneString;
4556
4557 #[async_trait]
4558 impl Transition<i32, i32> for AddOneString {
4559 type Error = String;
4560 type Resources = ();
4561
4562 async fn run(
4563 &self,
4564 state: i32,
4565 _resources: &Self::Resources,
4566 _bus: &mut Bus,
4567 ) -> Outcome<i32, Self::Error> {
4568 Outcome::Next(state + 1)
4569 }
4570 }
4571
4572 #[derive(Clone)]
4573 struct AddTenString;
4574
4575 #[async_trait]
4576 impl Transition<i32, i32> for AddTenString {
4577 type Error = String;
4578 type Resources = ();
4579
4580 async fn run(
4581 &self,
4582 state: i32,
4583 _resources: &Self::Resources,
4584 _bus: &mut Bus,
4585 ) -> Outcome<i32, Self::Error> {
4586 Outcome::Next(state + 10)
4587 }
4588 }
4589
4590 #[tokio::test]
4591 async fn axon_single_step_chain_executes_and_returns_next() {
4592 let mut bus = Bus::new();
4593 let axon = Axon::<i32, i32, TestInfallible>::start("SingleStep").then(AddOne);
4594
4595 let outcome = axon.execute(5, &(), &mut bus).await;
4596 assert!(matches!(outcome, Outcome::Next(6)));
4597 }
4598
4599 #[tokio::test]
4600 async fn axon_three_step_chain_executes_in_order() {
4601 let mut bus = Bus::new();
4602 let axon = Axon::<i32, i32, TestInfallible>::start("ThreeStep")
4603 .then(AddOne)
4604 .then(MultiplyByTwo)
4605 .then(AddTen);
4606
4607 let outcome = axon.execute(5, &(), &mut bus).await;
4609 assert!(matches!(outcome, Outcome::Next(22)));
4610 }
4611
4612 #[tokio::test]
4613 async fn axon_with_fault_in_middle_step_propagates_error() {
4614 let mut bus = Bus::new();
4615
4616 let axon = Axon::<i32, i32, String>::start("FaultInMiddle")
4621 .then(AddOneString)
4622 .then(AlwaysFault)
4623 .then(AddTenString);
4624
4625 let outcome = axon.execute(5, &(), &mut bus).await;
4626 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
4627 }
4628
4629 #[test]
4630 fn axon_schematic_has_correct_node_count_after_chaining() {
4631 let axon = Axon::<i32, i32, TestInfallible>::start("NodeCount")
4632 .then(AddOne)
4633 .then(MultiplyByTwo)
4634 .then(AddTen);
4635
4636 assert_eq!(axon.schematic.nodes.len(), 4);
4638 assert_eq!(axon.schematic.name, "NodeCount");
4639 }
4640
4641 #[tokio::test]
4642 async fn axon_execution_records_timeline_events() {
4643 let mut bus = Bus::new();
4644 bus.insert(Timeline::new());
4645
4646 let axon = Axon::<i32, i32, TestInfallible>::start("TimelineTest")
4647 .then(AddOne)
4648 .then(MultiplyByTwo);
4649
4650 let outcome = axon.execute(3, &(), &mut bus).await;
4651 assert!(matches!(outcome, Outcome::Next(8))); let timeline = bus.read::<Timeline>().unwrap();
4654
4655 let enter_count = timeline
4657 .events
4658 .iter()
4659 .filter(|e| matches!(e, TimelineEvent::NodeEnter { .. }))
4660 .count();
4661 let exit_count = timeline
4662 .events
4663 .iter()
4664 .filter(|e| matches!(e, TimelineEvent::NodeExit { .. }))
4665 .count();
4666
4667 assert!(enter_count >= 1, "Should have at least 1 NodeEnter event");
4669 assert!(exit_count >= 1, "Should have at least 1 NodeExit event");
4670 }
4671
4672 #[tokio::test]
4675 async fn parallel_all_succeed_returns_first_next() {
4676 use super::ParallelStrategy;
4677
4678 let mut bus = Bus::new();
4679 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelAllSucceed")
4680 .parallel(
4681 vec![
4682 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4683 Arc::new(MultiplyByTwo),
4684 ],
4685 ParallelStrategy::AllMustSucceed,
4686 );
4687
4688 let outcome = axon.execute(5, &(), &mut bus).await;
4691 assert!(matches!(outcome, Outcome::Next(6)));
4692 }
4693
4694 #[tokio::test]
4695 async fn parallel_all_must_succeed_returns_fault_when_any_fails() {
4696 use super::ParallelStrategy;
4697
4698 let mut bus = Bus::new();
4699 let axon = Axon::<i32, i32, String>::start("ParallelAllFault")
4700 .parallel(
4701 vec![
4702 Arc::new(AddOneString) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4703 Arc::new(AlwaysFault),
4704 ],
4705 ParallelStrategy::AllMustSucceed,
4706 );
4707
4708 let outcome = axon.execute(5, &(), &mut bus).await;
4709 assert!(
4710 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4711 "Expected Fault(boom), got {:?}",
4712 outcome
4713 );
4714 }
4715
4716 #[tokio::test]
4717 async fn parallel_any_can_fail_returns_success_despite_fault() {
4718 use super::ParallelStrategy;
4719
4720 let mut bus = Bus::new();
4721 let axon = Axon::<i32, i32, String>::start("ParallelAnyCanFail")
4722 .parallel(
4723 vec![
4724 Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4725 Arc::new(AddOneString),
4726 ],
4727 ParallelStrategy::AnyCanFail,
4728 );
4729
4730 let outcome = axon.execute(5, &(), &mut bus).await;
4732 assert!(
4733 matches!(outcome, Outcome::Next(6)),
4734 "Expected Next(6), got {:?}",
4735 outcome
4736 );
4737 }
4738
4739 #[tokio::test]
4740 async fn parallel_any_can_fail_all_fault_returns_first_fault() {
4741 use super::ParallelStrategy;
4742
4743 #[derive(Clone)]
4744 struct AlwaysFault2;
4745 #[async_trait]
4746 impl Transition<i32, i32> for AlwaysFault2 {
4747 type Error = String;
4748 type Resources = ();
4749 async fn run(&self, _state: i32, _resources: &(), _bus: &mut Bus) -> Outcome<i32, String> {
4750 Outcome::Fault("boom2".to_string())
4751 }
4752 }
4753
4754 let mut bus = Bus::new();
4755 let axon = Axon::<i32, i32, String>::start("ParallelAllFault2")
4756 .parallel(
4757 vec![
4758 Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4759 Arc::new(AlwaysFault2),
4760 ],
4761 ParallelStrategy::AnyCanFail,
4762 );
4763
4764 let outcome = axon.execute(5, &(), &mut bus).await;
4765 assert!(
4767 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4768 "Expected Fault(boom), got {:?}",
4769 outcome
4770 );
4771 }
4772
4773 #[test]
4774 fn parallel_schematic_has_fanout_fanin_nodes() {
4775 use super::ParallelStrategy;
4776 use ranvier_core::schematic::{EdgeType, NodeKind};
4777
4778 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelSchematic")
4779 .parallel(
4780 vec![
4781 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4782 Arc::new(MultiplyByTwo),
4783 Arc::new(AddTen),
4784 ],
4785 ParallelStrategy::AllMustSucceed,
4786 );
4787
4788 assert_eq!(axon.schematic.nodes.len(), 6);
4790 assert!(matches!(axon.schematic.nodes[1].kind, NodeKind::FanOut));
4791 assert!(matches!(axon.schematic.nodes[2].kind, NodeKind::Atom));
4792 assert!(matches!(axon.schematic.nodes[3].kind, NodeKind::Atom));
4793 assert!(matches!(axon.schematic.nodes[4].kind, NodeKind::Atom));
4794 assert!(matches!(axon.schematic.nodes[5].kind, NodeKind::FanIn));
4795
4796 assert!(axon.schematic.nodes[1]
4798 .description
4799 .as_ref()
4800 .unwrap()
4801 .contains("3 branches"));
4802
4803 let parallel_edges: Vec<_> = axon
4805 .schematic
4806 .edges
4807 .iter()
4808 .filter(|e| matches!(e.kind, EdgeType::Parallel))
4809 .collect();
4810 assert_eq!(parallel_edges.len(), 6);
4812 }
4813
4814 #[tokio::test]
4815 async fn parallel_then_chain_composes_correctly() {
4816 use super::ParallelStrategy;
4817
4818 let mut bus = Bus::new();
4819 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelThenChain")
4820 .then(AddOne)
4821 .parallel(
4822 vec![
4823 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4824 Arc::new(MultiplyByTwo),
4825 ],
4826 ParallelStrategy::AllMustSucceed,
4827 )
4828 .then(AddTen);
4829
4830 let outcome = axon.execute(5, &(), &mut bus).await;
4832 assert!(
4833 matches!(outcome, Outcome::Next(17)),
4834 "Expected Next(17), got {:?}",
4835 outcome
4836 );
4837 }
4838
4839 #[tokio::test]
4840 async fn parallel_records_timeline_events() {
4841 use super::ParallelStrategy;
4842 use ranvier_core::timeline::TimelineEvent;
4843
4844 let mut bus = Bus::new();
4845 bus.insert(Timeline::new());
4846
4847 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelTimeline")
4848 .parallel(
4849 vec![
4850 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4851 Arc::new(MultiplyByTwo),
4852 ],
4853 ParallelStrategy::AllMustSucceed,
4854 );
4855
4856 let outcome = axon.execute(3, &(), &mut bus).await;
4857 assert!(matches!(outcome, Outcome::Next(4)));
4858
4859 let timeline = bus.read::<Timeline>().unwrap();
4860
4861 let fanout_enters = timeline
4863 .events
4864 .iter()
4865 .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanOut"))
4866 .count();
4867 let fanin_enters = timeline
4868 .events
4869 .iter()
4870 .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanIn"))
4871 .count();
4872
4873 assert_eq!(fanout_enters, 1, "Should have 1 FanOut enter");
4874 assert_eq!(fanin_enters, 1, "Should have 1 FanIn enter");
4875 }
4876
4877 #[derive(Clone)]
4880 struct Greet;
4881
4882 #[async_trait]
4883 impl Transition<(), String> for Greet {
4884 type Error = String;
4885 type Resources = ();
4886
4887 async fn run(
4888 &self,
4889 _state: (),
4890 _resources: &Self::Resources,
4891 _bus: &mut Bus,
4892 ) -> Outcome<String, Self::Error> {
4893 Outcome::Next("Hello from simple!".to_string())
4894 }
4895 }
4896
4897 #[tokio::test]
4898 async fn axon_simple_creates_pipeline() {
4899 let axon = Axon::simple::<String>("SimpleTest").then(Greet);
4900
4901 let mut bus = Bus::new();
4902 let result = axon.execute((), &(), &mut bus).await;
4903
4904 match result {
4905 Outcome::Next(msg) => assert_eq!(msg, "Hello from simple!"),
4906 other => panic!("Expected Outcome::Next, got {:?}", other),
4907 }
4908 }
4909
4910 #[tokio::test]
4911 async fn axon_simple_equivalent_to_explicit() {
4912 let simple = Axon::simple::<String>("Equiv").then(Greet);
4914 let explicit = Axon::<(), (), String>::new("Equiv").then(Greet);
4915
4916 let mut bus1 = Bus::new();
4917 let mut bus2 = Bus::new();
4918
4919 let r1 = simple.execute((), &(), &mut bus1).await;
4920 let r2 = explicit.execute((), &(), &mut bus2).await;
4921
4922 match (r1, r2) {
4923 (Outcome::Next(a), Outcome::Next(b)) => assert_eq!(a, b),
4924 _ => panic!("Both should produce Outcome::Next"),
4925 }
4926 }
4927}