1use crate::persistence::{
15 CompensationAutoTrigger, CompensationContext, CompensationHandle,
16 CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17 PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use async_trait::async_trait;
20use ranvier_audit::{AuditEvent, AuditSink};
21use ranvier_core::bus::Bus;
22use ranvier_core::cluster::DistributedLock;
23use ranvier_core::event::{DlqPolicy, DlqSink};
24use ranvier_core::outcome::Outcome;
25use ranvier_core::policy::DynamicPolicy;
26use ranvier_core::saga::{SagaPolicy, SagaStack};
27use ranvier_core::schematic::{
28 BusCapabilitySchema, Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation,
29};
30use ranvier_core::timeline::{Timeline, TimelineEvent};
31use ranvier_core::transition::Transition;
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use std::any::type_name;
35use std::ffi::OsString;
36use std::fs;
37use std::future::Future;
38use std::panic::Location;
39use std::path::{Path, PathBuf};
40use std::pin::Pin;
41use std::sync::{Arc, Mutex, OnceLock};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tracing::Instrument;
44
45#[derive(Clone)]
47pub enum ExecutionMode {
48 Local,
50 Singleton {
52 lock_key: String,
53 ttl_ms: u64,
54 lock_provider: Arc<dyn DistributedLock>,
55 },
56}
57
58pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
60
61pub type Executor<In, Out, E, Res> =
65 Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
66
67#[derive(Debug, Clone)]
69pub struct ManualJump {
70 pub target_node: String,
71 pub payload_override: Option<serde_json::Value>,
72}
73
74#[derive(Debug, Clone, Copy)]
76struct StartStep(u64);
77
78#[derive(Debug, Clone)]
80struct ResumptionState {
81 payload: Option<serde_json::Value>,
82}
83
84fn type_name_of<T: ?Sized>() -> String {
86 let full = type_name::<T>();
87 full.split("::").last().unwrap_or(full).to_string()
88}
89
90pub struct Axon<In, Out, E, Res = ()> {
110 pub schematic: Schematic,
112 executor: Executor<In, Out, E, Res>,
114 pub execution_mode: ExecutionMode,
116 pub persistence_store: Option<Arc<dyn crate::persistence::PersistenceStore>>,
118 pub audit_sink: Option<Arc<dyn AuditSink>>,
120 pub dlq_sink: Option<Arc<dyn DlqSink>>,
122 pub dlq_policy: DlqPolicy,
124 pub dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>,
126 pub saga_policy: SagaPolicy,
128 pub dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>,
130 pub saga_compensation_registry:
132 Arc<std::sync::RwLock<ranvier_core::saga::SagaCompensationRegistry<E, Res>>>,
133 pub iam_handle: Option<ranvier_core::iam::IamHandle>,
135}
136
137#[derive(Debug, Clone)]
139pub struct SchematicExportRequest {
140 pub output: Option<PathBuf>,
142}
143
144impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
145 fn clone(&self) -> Self {
146 Self {
147 schematic: self.schematic.clone(),
148 executor: self.executor.clone(),
149 execution_mode: self.execution_mode.clone(),
150 persistence_store: self.persistence_store.clone(),
151 audit_sink: self.audit_sink.clone(),
152 dlq_sink: self.dlq_sink.clone(),
153 dlq_policy: self.dlq_policy.clone(),
154 dynamic_dlq_policy: self.dynamic_dlq_policy.clone(),
155 saga_policy: self.saga_policy.clone(),
156 dynamic_saga_policy: self.dynamic_saga_policy.clone(),
157 saga_compensation_registry: self.saga_compensation_registry.clone(),
158 iam_handle: self.iam_handle.clone(),
159 }
160 }
161}
162
163impl<In, E, Res> Axon<In, In, E, Res>
164where
165 In: Send + Sync + Serialize + DeserializeOwned + 'static,
166 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
167 Res: ranvier_core::transition::ResourceRequirement,
168{
169 #[track_caller]
172 pub fn new(label: &str) -> Self {
173 let caller = Location::caller();
174 Self::start_with_source(label, caller)
175 }
176
177 #[track_caller]
180 pub fn start(label: &str) -> Self {
181 let caller = Location::caller();
182 Self::start_with_source(label, caller)
183 }
184
185 fn start_with_source(label: &str, caller: &'static Location<'static>) -> Self {
186 let node_id = uuid::Uuid::new_v4().to_string();
187 let node = Node {
188 id: node_id,
189 kind: NodeKind::Ingress,
190 label: label.to_string(),
191 description: None,
192 input_type: "void".to_string(),
193 output_type: type_name_of::<In>(),
194 resource_type: type_name_of::<Res>(),
195 metadata: Default::default(),
196 bus_capability: None,
197 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
198 position: None,
199 compensation_node_id: None,
200 input_schema: None,
201 output_schema: None,
202 };
203
204 let mut schematic = Schematic::new(label);
205 schematic.nodes.push(node);
206
207 let executor: Executor<In, In, E, Res> =
208 Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
209
210 Self {
211 schematic,
212 executor,
213 execution_mode: ExecutionMode::Local,
214 persistence_store: None,
215 audit_sink: None,
216 dlq_sink: None,
217 dlq_policy: DlqPolicy::default(),
218 dynamic_dlq_policy: None,
219 saga_policy: SagaPolicy::default(),
220 dynamic_saga_policy: None,
221 saga_compensation_registry: Arc::new(std::sync::RwLock::new(
222 ranvier_core::saga::SagaCompensationRegistry::new(),
223 )),
224 iam_handle: None,
225 }
226 }
227}
228
229impl<In, Out, E, Res> Axon<In, Out, E, Res>
230where
231 In: Send + Sync + Serialize + DeserializeOwned + 'static,
232 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
233 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
234 Res: ranvier_core::transition::ResourceRequirement,
235{
236 pub fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
238 self.execution_mode = mode;
239 self
240 }
241
242 pub fn with_version(mut self, version: impl Into<String>) -> Self {
244 self.schematic.schema_version = version.into();
245 self
246 }
247
248 pub fn with_persistence_store<S>(mut self, store: S) -> Self
250 where
251 S: crate::persistence::PersistenceStore + 'static,
252 {
253 self.persistence_store = Some(Arc::new(store));
254 self
255 }
256
257 pub fn with_audit_sink<S>(mut self, sink: S) -> Self
259 where
260 S: AuditSink + 'static,
261 {
262 self.audit_sink = Some(Arc::new(sink));
263 self
264 }
265
266 pub fn with_dlq_sink<S>(mut self, sink: S) -> Self
268 where
269 S: DlqSink + 'static,
270 {
271 self.dlq_sink = Some(Arc::new(sink));
272 self
273 }
274
275 pub fn with_dlq_policy(mut self, policy: DlqPolicy) -> Self {
277 self.dlq_policy = policy;
278 self
279 }
280
281 pub fn with_saga_policy(mut self, policy: SagaPolicy) -> Self {
283 self.saga_policy = policy;
284 self
285 }
286
287 pub fn with_dynamic_dlq_policy(mut self, dynamic: DynamicPolicy<DlqPolicy>) -> Self {
290 self.dynamic_dlq_policy = Some(dynamic);
291 self
292 }
293
294 pub fn with_dynamic_saga_policy(mut self, dynamic: DynamicPolicy<SagaPolicy>) -> Self {
297 self.dynamic_saga_policy = Some(dynamic);
298 self
299 }
300
301 pub fn with_iam(
309 mut self,
310 policy: ranvier_core::iam::IamPolicy,
311 verifier: impl ranvier_core::iam::IamVerifier + 'static,
312 ) -> Self {
313 self.iam_handle = Some(ranvier_core::iam::IamHandle::new(
314 policy,
315 Arc::new(verifier),
316 ));
317 self
318 }
319
320 #[cfg(feature = "schema")]
331 pub fn with_input_schema<T>(mut self) -> Self
332 where
333 T: schemars::JsonSchema,
334 {
335 if let Some(last_node) = self.schematic.nodes.last_mut() {
336 let schema = schemars::schema_for!(T);
337 last_node.input_schema =
338 Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
339 }
340 self
341 }
342
343 #[cfg(feature = "schema")]
347 pub fn with_output_schema<T>(mut self) -> Self
348 where
349 T: schemars::JsonSchema,
350 {
351 if let Some(last_node) = self.schematic.nodes.last_mut() {
352 let schema = schemars::schema_for!(T);
353 last_node.output_schema =
354 Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
355 }
356 self
357 }
358
359 pub fn with_input_schema_value(mut self, schema: serde_json::Value) -> Self {
363 if let Some(last_node) = self.schematic.nodes.last_mut() {
364 last_node.input_schema = Some(schema);
365 }
366 self
367 }
368
369 pub fn with_output_schema_value(mut self, schema: serde_json::Value) -> Self {
371 if let Some(last_node) = self.schematic.nodes.last_mut() {
372 last_node.output_schema = Some(schema);
373 }
374 self
375 }
376}
377
378#[async_trait]
379impl<In, Out, E, Res> ranvier_inspector::StateInspector for Axon<In, Out, E, Res>
380where
381 In: Send + Sync + Serialize + DeserializeOwned + 'static,
382 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
383 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
384 Res: ranvier_core::transition::ResourceRequirement,
385{
386 async fn get_state(&self, trace_id: &str) -> Option<serde_json::Value> {
387 let store = self.persistence_store.as_ref()?;
388 let trace = store.load(trace_id).await.ok().flatten()?;
389 Some(serde_json::to_value(trace).unwrap_or(serde_json::Value::Null))
390 }
391
392 async fn force_resume(
393 &self,
394 trace_id: &str,
395 target_node: &str,
396 payload_override: Option<Value>,
397 ) -> Result<(), String> {
398 let store = self
399 .persistence_store
400 .as_ref()
401 .ok_or("No persistence store attached to Axon")?;
402
403 let intervention = crate::persistence::Intervention {
404 target_node: target_node.to_string(),
405 payload_override,
406 timestamp_ms: now_ms(),
407 };
408
409 store
410 .save_intervention(trace_id, intervention)
411 .await
412 .map_err(|e| format!("Failed to save intervention: {}", e))?;
413
414 if let Some(sink) = self.audit_sink.as_ref() {
415 let event = AuditEvent::new(
416 uuid::Uuid::new_v4().to_string(),
417 "Inspector".to_string(),
418 "ForceResume".to_string(),
419 trace_id.to_string(),
420 )
421 .with_metadata("target_node", target_node);
422
423 let _ = sink.append(&event).await;
424 }
425
426 tracing::info!(trace_id = %trace_id, target_node = %target_node, "Force resume requested via Inspector");
427 Ok(())
428 }
429}
430
431impl<In, Out, E, Res> Axon<In, Out, E, Res>
432where
433 In: Send + Sync + Serialize + DeserializeOwned + 'static,
434 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
435 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
436 Res: ranvier_core::transition::ResourceRequirement,
437{
438 #[track_caller]
442 pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
443 where
444 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
445 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
446 {
447 let caller = Location::caller();
448 let Axon {
450 mut schematic,
451 executor: prev_executor,
452 execution_mode,
453 persistence_store,
454 audit_sink,
455 dlq_sink,
456 dlq_policy,
457 dynamic_dlq_policy,
458 saga_policy,
459 dynamic_saga_policy,
460 saga_compensation_registry,
461 iam_handle,
462 } = self;
463
464 let next_node_id = uuid::Uuid::new_v4().to_string();
466 let next_node = Node {
467 id: next_node_id.clone(),
468 kind: NodeKind::Atom,
469 label: transition.label(),
470 description: transition.description(),
471 input_type: type_name_of::<Out>(),
472 output_type: type_name_of::<Next>(),
473 resource_type: type_name_of::<Res>(),
474 metadata: Default::default(),
475 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
476 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
477 position: transition
478 .position()
479 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
480 compensation_node_id: None,
481 input_schema: transition.input_schema(),
482 output_schema: None,
483 };
484
485 let last_node_id = schematic
486 .nodes
487 .last()
488 .map(|n| n.id.clone())
489 .unwrap_or_default();
490
491 schematic.nodes.push(next_node);
492 schematic.edges.push(Edge {
493 from: last_node_id,
494 to: next_node_id.clone(),
495 kind: EdgeType::Linear,
496 label: Some("Next".to_string()),
497 });
498
499 let node_id_for_exec = next_node_id.clone();
501 let node_label_for_exec = transition.label();
502 let bus_policy_for_exec = transition.bus_access_policy();
503 let bus_policy_clone = bus_policy_for_exec.clone();
504 let current_step_idx = schematic.nodes.len() as u64 - 1;
505 let next_executor: Executor<In, Next, E, Res> = Arc::new(
506 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
507 let prev = prev_executor.clone();
508 let trans = transition.clone();
509 let timeline_node_id = node_id_for_exec.clone();
510 let timeline_node_label = node_label_for_exec.clone();
511 let transition_bus_policy = bus_policy_clone.clone();
512 let step_idx = current_step_idx;
513
514 Box::pin(async move {
515 if let Some(jump) = bus.read::<ManualJump>()
517 && (jump.target_node == timeline_node_id
518 || jump.target_node == timeline_node_label)
519 {
520 tracing::info!(
521 node_id = %timeline_node_id,
522 node_label = %timeline_node_label,
523 "Manual jump target reached; skipping previous steps"
524 );
525
526 let state = if let Some(ow) = jump.payload_override.clone() {
527 match serde_json::from_value::<Out>(ow) {
528 Ok(s) => s,
529 Err(e) => {
530 tracing::error!(
531 "Payload override deserialization failed: {}",
532 e
533 );
534 return Outcome::emit(
535 "execution.jump.payload_error",
536 Some(serde_json::json!({"error": e.to_string()})),
537 )
538 .map(|_: ()| unreachable!());
539 }
540 }
541 } else {
542 return Outcome::emit(
546 "execution.jump.missing_payload",
547 Some(serde_json::json!({"node_id": timeline_node_id})),
548 );
549 };
550
551 return run_this_step::<Out, Next, E, Res>(
553 &trans,
554 state,
555 res,
556 bus,
557 &timeline_node_id,
558 &timeline_node_label,
559 &transition_bus_policy,
560 step_idx,
561 )
562 .await;
563 }
564
565 if let Some(start) = bus.read::<StartStep>()
567 && step_idx == start.0
568 && bus.read::<ResumptionState>().is_some()
569 {
570 let fresh_state = serde_json::to_value(&input)
574 .ok()
575 .and_then(|v| serde_json::from_value::<Out>(v).ok());
576 let persisted_state = bus
577 .read::<ResumptionState>()
578 .and_then(|r| r.payload.clone())
579 .and_then(|p| serde_json::from_value::<Out>(p).ok());
580
581 if let Some(s) = fresh_state.or(persisted_state) {
582 tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint");
583 return run_this_step::<Out, Next, E, Res>(
584 &trans,
585 s,
586 res,
587 bus,
588 &timeline_node_id,
589 &timeline_node_label,
590 &transition_bus_policy,
591 step_idx,
592 )
593 .await;
594 }
595
596 return Outcome::emit(
597 "execution.resumption.payload_error",
598 Some(serde_json::json!({"error": "no compatible resumption state"})),
599 )
600 .map(|_: ()| unreachable!());
601 }
602
603 let prev_result = prev(input, res, bus).await;
605
606 let state = match prev_result {
608 Outcome::Next(t) => t,
609 other => return other.map(|_| unreachable!()),
610 };
611
612 run_this_step::<Out, Next, E, Res>(
613 &trans,
614 state,
615 res,
616 bus,
617 &timeline_node_id,
618 &timeline_node_label,
619 &transition_bus_policy,
620 step_idx,
621 )
622 .await
623 })
624 },
625 );
626 Axon {
627 schematic,
628 executor: next_executor,
629 execution_mode,
630 persistence_store,
631 audit_sink,
632 dlq_sink,
633 dlq_policy,
634 dynamic_dlq_policy,
635 saga_policy,
636 dynamic_saga_policy,
637 saga_compensation_registry,
638 iam_handle,
639 }
640 }
641
642 #[track_caller]
658 pub fn then_with_retry<Next, Trans>(
659 self,
660 transition: Trans,
661 policy: crate::retry::RetryPolicy,
662 ) -> Axon<In, Next, E, Res>
663 where
664 Out: Clone,
665 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
666 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
667 {
668 let caller = Location::caller();
669 let Axon {
670 mut schematic,
671 executor: prev_executor,
672 execution_mode,
673 persistence_store,
674 audit_sink,
675 dlq_sink,
676 dlq_policy,
677 dynamic_dlq_policy,
678 saga_policy,
679 dynamic_saga_policy,
680 saga_compensation_registry,
681 iam_handle,
682 } = self;
683
684 let next_node_id = uuid::Uuid::new_v4().to_string();
685 let next_node = Node {
686 id: next_node_id.clone(),
687 kind: NodeKind::Atom,
688 label: transition.label(),
689 description: transition.description(),
690 input_type: type_name_of::<Out>(),
691 output_type: type_name_of::<Next>(),
692 resource_type: type_name_of::<Res>(),
693 metadata: Default::default(),
694 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
695 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
696 position: transition
697 .position()
698 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
699 compensation_node_id: None,
700 input_schema: transition.input_schema(),
701 output_schema: None,
702 };
703
704 let last_node_id = schematic
705 .nodes
706 .last()
707 .map(|n| n.id.clone())
708 .unwrap_or_default();
709
710 schematic.nodes.push(next_node);
711 schematic.edges.push(Edge {
712 from: last_node_id,
713 to: next_node_id.clone(),
714 kind: EdgeType::Linear,
715 label: Some("Next (retryable)".to_string()),
716 });
717
718 let node_id_for_exec = next_node_id.clone();
719 let node_label_for_exec = transition.label();
720 let bus_policy_for_exec = transition.bus_access_policy();
721 let bus_policy_clone = bus_policy_for_exec.clone();
722 let current_step_idx = schematic.nodes.len() as u64 - 1;
723 let next_executor: Executor<In, Next, E, Res> = Arc::new(
724 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
725 let prev = prev_executor.clone();
726 let trans = transition.clone();
727 let timeline_node_id = node_id_for_exec.clone();
728 let timeline_node_label = node_label_for_exec.clone();
729 let transition_bus_policy = bus_policy_clone.clone();
730 let step_idx = current_step_idx;
731 let retry_policy = policy.clone();
732
733 Box::pin(async move {
734 let prev_result = prev(input, res, bus).await;
736 let state = match prev_result {
737 Outcome::Next(t) => t,
738 other => return other.map(|_| unreachable!()),
739 };
740
741 let mut last_result = None;
743 for attempt in 0..=retry_policy.max_retries {
744 let attempt_state = state.clone();
745
746 let result = run_this_step::<Out, Next, E, Res>(
747 &trans,
748 attempt_state,
749 res,
750 bus,
751 &timeline_node_id,
752 &timeline_node_label,
753 &transition_bus_policy,
754 step_idx,
755 )
756 .await;
757
758 match &result {
759 Outcome::Next(_) => return result,
760 Outcome::Fault(_) if attempt < retry_policy.max_retries => {
761 let delay = retry_policy.delay_for_attempt(attempt);
762 tracing::warn!(
763 node_id = %timeline_node_id,
764 attempt = attempt + 1,
765 max = retry_policy.max_retries,
766 delay_ms = delay.as_millis() as u64,
767 "Transition failed, retrying"
768 );
769 if let Some(timeline) = bus.read_mut::<Timeline>() {
770 timeline.push(TimelineEvent::NodeRetry {
771 node_id: timeline_node_id.clone(),
772 attempt: attempt + 1,
773 max_attempts: retry_policy.max_retries,
774 backoff_ms: delay.as_millis() as u64,
775 timestamp: now_ms(),
776 });
777 }
778 tokio::time::sleep(delay).await;
779 }
780 _ => {
781 last_result = Some(result);
782 break;
783 }
784 }
785 }
786
787 last_result.unwrap_or_else(|| {
788 Outcome::emit("execution.retry.exhausted", None)
789 })
790 })
791 },
792 );
793 Axon {
794 schematic,
795 executor: next_executor,
796 execution_mode,
797 persistence_store,
798 audit_sink,
799 dlq_sink,
800 dlq_policy,
801 dynamic_dlq_policy,
802 saga_policy,
803 dynamic_saga_policy,
804 saga_compensation_registry,
805 iam_handle,
806 }
807 }
808
809 #[track_caller]
814 pub fn then_compensated<Next, Trans, Comp>(
815 self,
816 transition: Trans,
817 compensation: Comp,
818 ) -> Axon<In, Next, E, Res>
819 where
820 Out: Clone,
821 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
822 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
823 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
824 {
825 let caller = Location::caller();
826 let Axon {
827 mut schematic,
828 executor: prev_executor,
829 execution_mode,
830 persistence_store,
831 audit_sink,
832 dlq_sink,
833 dlq_policy,
834 dynamic_dlq_policy,
835 saga_policy,
836 dynamic_saga_policy,
837 saga_compensation_registry,
838 iam_handle,
839 } = self;
840
841 let next_node_id = uuid::Uuid::new_v4().to_string();
843 let comp_node_id = uuid::Uuid::new_v4().to_string();
844
845 let next_node = Node {
846 id: next_node_id.clone(),
847 kind: NodeKind::Atom,
848 label: transition.label(),
849 description: transition.description(),
850 input_type: type_name_of::<Out>(),
851 output_type: type_name_of::<Next>(),
852 resource_type: type_name_of::<Res>(),
853 metadata: Default::default(),
854 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
855 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
856 position: transition
857 .position()
858 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
859 compensation_node_id: Some(comp_node_id.clone()),
860 input_schema: None,
861 output_schema: None,
862 };
863
864 let comp_node = Node {
866 id: comp_node_id.clone(),
867 kind: NodeKind::Atom,
868 label: format!("Compensate: {}", compensation.label()),
869 description: compensation.description(),
870 input_type: type_name_of::<Out>(),
871 output_type: "void".to_string(),
872 resource_type: type_name_of::<Res>(),
873 metadata: Default::default(),
874 bus_capability: None,
875 source_location: None,
876 position: compensation
877 .position()
878 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
879 compensation_node_id: None,
880 input_schema: None,
881 output_schema: None,
882 };
883
884 let last_node_id = schematic
885 .nodes
886 .last()
887 .map(|n| n.id.clone())
888 .unwrap_or_default();
889
890 schematic.nodes.push(next_node);
891 schematic.nodes.push(comp_node);
892 schematic.edges.push(Edge {
893 from: last_node_id,
894 to: next_node_id.clone(),
895 kind: EdgeType::Linear,
896 label: Some("Next".to_string()),
897 });
898
899 let node_id_for_exec = next_node_id.clone();
901 let comp_id_for_exec = comp_node_id.clone();
902 let node_label_for_exec = transition.label();
903 let bus_policy_for_exec = transition.bus_access_policy();
904 let step_idx_for_exec = schematic.nodes.len() as u64 - 2;
905 let comp_for_exec = compensation.clone();
906 let bus_policy_for_executor = bus_policy_for_exec.clone();
907 let bus_policy_for_registry = bus_policy_for_exec.clone();
908 let next_executor: Executor<In, Next, E, Res> = Arc::new(
909 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
910 let prev = prev_executor.clone();
911 let trans = transition.clone();
912 let comp = comp_for_exec.clone();
913 let timeline_node_id = node_id_for_exec.clone();
914 let timeline_comp_id = comp_id_for_exec.clone();
915 let timeline_node_label = node_label_for_exec.clone();
916 let transition_bus_policy = bus_policy_for_executor.clone();
917 let step_idx = step_idx_for_exec;
918
919 Box::pin(async move {
920 if let Some(jump) = bus.read::<ManualJump>()
922 && (jump.target_node == timeline_node_id
923 || jump.target_node == timeline_node_label)
924 {
925 tracing::info!(
926 node_id = %timeline_node_id,
927 node_label = %timeline_node_label,
928 "Manual jump target reached (compensated); skipping previous steps"
929 );
930
931 let state = if let Some(ow) = jump.payload_override.clone() {
932 match serde_json::from_value::<Out>(ow) {
933 Ok(s) => s,
934 Err(e) => {
935 tracing::error!(
936 "Payload override deserialization failed: {}",
937 e
938 );
939 return Outcome::emit(
940 "execution.jump.payload_error",
941 Some(serde_json::json!({"error": e.to_string()})),
942 )
943 .map(|_: ()| unreachable!());
944 }
945 }
946 } else {
947 return Outcome::emit(
948 "execution.jump.missing_payload",
949 Some(serde_json::json!({"node_id": timeline_node_id})),
950 )
951 .map(|_: ()| unreachable!());
952 };
953
954 return run_this_compensated_step::<Out, Next, E, Res, Comp>(
956 &trans,
957 &comp,
958 state,
959 res,
960 bus,
961 &timeline_node_id,
962 &timeline_comp_id,
963 &timeline_node_label,
964 &transition_bus_policy,
965 step_idx,
966 )
967 .await;
968 }
969
970 if let Some(start) = bus.read::<StartStep>()
972 && step_idx == start.0
973 && bus.read::<ResumptionState>().is_some()
974 {
975 let fresh_state = serde_json::to_value(&input)
976 .ok()
977 .and_then(|v| serde_json::from_value::<Out>(v).ok());
978 let persisted_state = bus
979 .read::<ResumptionState>()
980 .and_then(|r| r.payload.clone())
981 .and_then(|p| serde_json::from_value::<Out>(p).ok());
982
983 if let Some(s) = fresh_state.or(persisted_state) {
984 tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint (compensated)");
985 return run_this_compensated_step::<Out, Next, E, Res, Comp>(
986 &trans,
987 &comp,
988 s,
989 res,
990 bus,
991 &timeline_node_id,
992 &timeline_comp_id,
993 &timeline_node_label,
994 &transition_bus_policy,
995 step_idx,
996 )
997 .await;
998 }
999
1000 return Outcome::emit(
1001 "execution.resumption.payload_error",
1002 Some(serde_json::json!({"error": "no compatible resumption state"})),
1003 )
1004 .map(|_: ()| unreachable!());
1005 }
1006
1007 let prev_result = prev(input, res, bus).await;
1009
1010 let state = match prev_result {
1012 Outcome::Next(t) => t,
1013 other => return other.map(|_| unreachable!()),
1014 };
1015
1016 run_this_compensated_step::<Out, Next, E, Res, Comp>(
1017 &trans,
1018 &comp,
1019 state,
1020 res,
1021 bus,
1022 &timeline_node_id,
1023 &timeline_comp_id,
1024 &timeline_node_label,
1025 &transition_bus_policy,
1026 step_idx,
1027 )
1028 .await
1029 })
1030 },
1031 );
1032 {
1034 let mut registry = saga_compensation_registry.write().unwrap();
1035 let comp_fn = compensation.clone();
1036 let transition_bus_policy = bus_policy_for_registry.clone();
1037
1038 let handler: ranvier_core::saga::SagaCompensationFn<E, Res> =
1039 Arc::new(move |input_data, res, bus| {
1040 let comp = comp_fn.clone();
1041 let bus_policy = transition_bus_policy.clone();
1042 Box::pin(async move {
1043 let input: Out = serde_json::from_slice(&input_data).unwrap();
1044 bus.set_access_policy(comp.label(), bus_policy);
1045 let res = comp.run(input, res, bus).await;
1046 bus.clear_access_policy();
1047 res
1048 })
1049 });
1050 registry.register(next_node_id.clone(), handler);
1051 }
1052
1053 Axon {
1054 schematic,
1055 executor: next_executor,
1056 execution_mode,
1057 persistence_store,
1058 audit_sink,
1059 dlq_sink,
1060 dlq_policy,
1061 dynamic_dlq_policy,
1062 saga_policy,
1063 dynamic_saga_policy,
1064 saga_compensation_registry,
1065 iam_handle,
1066 }
1067 }
1068
1069 #[track_caller]
1072 pub fn compensate_with<Comp>(mut self, transition: Comp) -> Self
1073 where
1074 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1075 {
1076 let caller = Location::caller();
1079 let comp_node_id = uuid::Uuid::new_v4().to_string();
1080
1081 let comp_node = Node {
1082 id: comp_node_id.clone(),
1083 kind: NodeKind::Atom,
1084 label: transition.label(),
1085 description: transition.description(),
1086 input_type: type_name_of::<Out>(),
1087 output_type: "void".to_string(),
1088 resource_type: type_name_of::<Res>(),
1089 metadata: Default::default(),
1090 bus_capability: None,
1091 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1092 position: transition
1093 .position()
1094 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1095 compensation_node_id: None,
1096 input_schema: None,
1097 output_schema: None,
1098 };
1099
1100 if let Some(last_node) = self.schematic.nodes.last_mut() {
1101 last_node.compensation_node_id = Some(comp_node_id.clone());
1102 }
1103
1104 self.schematic.nodes.push(comp_node);
1105 self
1106 }
1107
1108 #[track_caller]
1110 pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
1111 let caller = Location::caller();
1112 let branch_id_str = branch_id.into();
1113 let last_node_id = self
1114 .schematic
1115 .nodes
1116 .last()
1117 .map(|n| n.id.clone())
1118 .unwrap_or_default();
1119
1120 let branch_node = Node {
1121 id: uuid::Uuid::new_v4().to_string(),
1122 kind: NodeKind::Synapse,
1123 label: label.to_string(),
1124 description: None,
1125 input_type: type_name_of::<Out>(),
1126 output_type: type_name_of::<Out>(),
1127 resource_type: type_name_of::<Res>(),
1128 metadata: Default::default(),
1129 bus_capability: None,
1130 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1131 position: None,
1132 compensation_node_id: None,
1133 input_schema: None,
1134 output_schema: None,
1135 };
1136
1137 self.schematic.nodes.push(branch_node);
1138 self.schematic.edges.push(Edge {
1139 from: last_node_id,
1140 to: branch_id_str.clone(),
1141 kind: EdgeType::Branch(branch_id_str),
1142 label: Some("Branch".to_string()),
1143 });
1144
1145 self
1146 }
1147
1148 pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
1150 if let ExecutionMode::Singleton {
1151 lock_key,
1152 ttl_ms,
1153 lock_provider,
1154 } = &self.execution_mode
1155 {
1156 let trace_span = tracing::info_span!("Singleton Execution", key = %lock_key);
1157 let _enter = trace_span.enter();
1158 match lock_provider.try_acquire(lock_key, *ttl_ms).await {
1159 Ok(true) => {
1160 tracing::debug!("Successfully acquired singleton lock: {}", lock_key);
1161 }
1162 Ok(false) => {
1163 tracing::debug!(
1164 "Singleton lock {} already held, aborting execution.",
1165 lock_key
1166 );
1167 return Outcome::emit(
1169 "execution.skipped.lock_held",
1170 Some(serde_json::json!({
1171 "lock_key": lock_key
1172 })),
1173 );
1174 }
1175 Err(e) => {
1176 tracing::error!("Failed to check singleton lock {}: {:?}", lock_key, e);
1177 return Outcome::emit(
1178 "execution.skipped.lock_error",
1179 Some(serde_json::json!({
1180 "error": e.to_string()
1181 })),
1182 );
1183 }
1184 }
1185 }
1186
1187 if let Some(iam) = &self.iam_handle {
1189 use ranvier_core::iam::{IamPolicy, IamToken, enforce_policy};
1190
1191 if matches!(iam.policy, IamPolicy::None) {
1192 } else {
1194 let token = bus.read::<IamToken>().map(|t| t.0.clone());
1195
1196 match token {
1197 Some(raw_token) => {
1198 match iam.verifier.verify(&raw_token).await {
1199 Ok(identity) => {
1200 if let Err(e) = enforce_policy(&iam.policy, &identity) {
1201 tracing::warn!(
1202 policy = ?iam.policy,
1203 subject = %identity.subject,
1204 "IAM policy enforcement failed: {}",
1205 e
1206 );
1207 return Outcome::emit(
1208 "iam.policy_denied",
1209 Some(serde_json::json!({
1210 "error": e.to_string(),
1211 "subject": identity.subject,
1212 })),
1213 );
1214 }
1215 bus.insert(identity);
1217 }
1218 Err(e) => {
1219 tracing::warn!("IAM token verification failed: {}", e);
1220 return Outcome::emit(
1221 "iam.verification_failed",
1222 Some(serde_json::json!({
1223 "error": e.to_string()
1224 })),
1225 );
1226 }
1227 }
1228 }
1229 None => {
1230 tracing::warn!("IAM policy requires token but none found in Bus");
1231 return Outcome::emit("iam.missing_token", None);
1232 }
1233 }
1234 }
1235 }
1236
1237 let trace_id = persistence_trace_id(bus);
1238 let label = self.schematic.name.clone();
1239
1240 if let Some(sink) = &self.dlq_sink {
1242 bus.insert(sink.clone());
1243 }
1244 let effective_dlq_policy = self
1246 .dynamic_dlq_policy
1247 .as_ref()
1248 .map(|d| d.current())
1249 .unwrap_or_else(|| self.dlq_policy.clone());
1250 bus.insert(effective_dlq_policy);
1251 bus.insert(self.schematic.clone());
1252 let effective_saga_policy = self
1253 .dynamic_saga_policy
1254 .as_ref()
1255 .map(|d| d.current())
1256 .unwrap_or_else(|| self.saga_policy.clone());
1257 bus.insert(effective_saga_policy.clone());
1258
1259 if effective_saga_policy == SagaPolicy::Enabled && bus.read::<SagaStack>().is_none() {
1261 bus.insert(SagaStack::new());
1262 }
1263
1264 let persistence_handle = bus.read::<PersistenceHandle>().cloned();
1265 let compensation_handle = bus.read::<CompensationHandle>().cloned();
1266 let compensation_retry_policy = compensation_retry_policy(bus);
1267 let compensation_idempotency = bus.read::<CompensationIdempotencyHandle>().cloned();
1268 let version = self.schematic.schema_version.clone();
1269 let migration_registry = bus
1270 .read::<ranvier_core::schematic::MigrationRegistry>()
1271 .cloned();
1272
1273 let persistence_start_step = if let Some(handle) = persistence_handle.as_ref() {
1274 let (mut start_step, trace_version, intervention, last_node_id, mut last_payload) =
1275 load_persistence_version(handle, &trace_id).await;
1276
1277 if let Some(interv) = intervention {
1278 tracing::info!(
1279 trace_id = %trace_id,
1280 target_node = %interv.target_node,
1281 "Applying manual intervention command"
1282 );
1283
1284 if let Some(target_idx) = self
1286 .schematic
1287 .nodes
1288 .iter()
1289 .position(|n| n.id == interv.target_node || n.label == interv.target_node)
1290 {
1291 tracing::info!(
1292 trace_id = %trace_id,
1293 target_node = %interv.target_node,
1294 target_step = target_idx,
1295 "Intervention: Jumping to target node"
1296 );
1297 start_step = target_idx as u64;
1298
1299 bus.insert(ManualJump {
1301 target_node: interv.target_node.clone(),
1302 payload_override: interv.payload_override.clone(),
1303 });
1304
1305 if let Some(sink) = self.audit_sink.as_ref() {
1307 let event = AuditEvent::new(
1308 uuid::Uuid::new_v4().to_string(),
1309 "System".to_string(),
1310 "ApplyIntervention".to_string(),
1311 trace_id.to_string(),
1312 )
1313 .with_metadata("target_node", interv.target_node.clone())
1314 .with_metadata("target_step", target_idx);
1315
1316 let _ = sink.append(&event).await;
1317 }
1318 } else {
1319 tracing::warn!(
1320 trace_id = %trace_id,
1321 target_node = %interv.target_node,
1322 "Intervention target node not found in schematic; ignoring jump"
1323 );
1324 }
1325 }
1326
1327 if let Some(old_version) = trace_version
1328 && old_version != version
1329 {
1330 tracing::info!(
1331 trace_id = %trace_id,
1332 old_version = %old_version,
1333 current_version = %version,
1334 "Version mismatch detected during resumption"
1335 );
1336
1337 let migration_path = migration_registry
1339 .as_ref()
1340 .and_then(|r| r.find_migration_path(&old_version, &version));
1341
1342 let (final_migration, mapped_payload) = if let Some(path) = migration_path {
1343 if path.is_empty() {
1344 (None, last_payload.clone())
1345 } else {
1346 let mut payload = last_payload.clone();
1348 for hop in &path {
1349 if let (Some(mapper), Some(p)) = (&hop.payload_mapper, payload.as_ref())
1350 {
1351 match mapper.map_state(p) {
1352 Ok(mapped) => payload = Some(mapped),
1353 Err(e) => {
1354 tracing::error!(
1355 trace_id = %trace_id,
1356 from = %hop.from_version,
1357 to = %hop.to_version,
1358 error = %e,
1359 "Payload migration mapper failed"
1360 );
1361 return Outcome::emit(
1362 "execution.resumption.payload_migration_failed",
1363 Some(serde_json::json!({
1364 "trace_id": trace_id,
1365 "from": hop.from_version,
1366 "to": hop.to_version,
1367 "error": e.to_string()
1368 })),
1369 );
1370 }
1371 }
1372 }
1373 }
1374 let hops: Vec<String> = path
1375 .iter()
1376 .map(|h| format!("{}->{}", h.from_version, h.to_version))
1377 .collect();
1378 tracing::info!(trace_id = %trace_id, hops = ?hops, "Applied multi-hop migration path");
1379 (path.last().copied(), payload)
1380 }
1381 } else {
1382 (None, last_payload.clone())
1383 };
1384
1385 let migration = final_migration.or_else(|| {
1387 migration_registry
1388 .as_ref()
1389 .and_then(|r| r.find_migration(&old_version, &version))
1390 });
1391
1392 if mapped_payload.is_some() {
1394 last_payload = mapped_payload;
1395 }
1396
1397 let strategy = if let (Some(m), Some(node_id)) = (migration, last_node_id.as_ref())
1398 {
1399 m.node_mapping
1400 .get(node_id)
1401 .cloned()
1402 .unwrap_or(m.default_strategy.clone())
1403 } else {
1404 migration
1405 .map(|m| m.default_strategy.clone())
1406 .unwrap_or(ranvier_core::schematic::MigrationStrategy::Fail)
1407 };
1408
1409 match strategy {
1410 ranvier_core::schematic::MigrationStrategy::ResumeFromStart => {
1411 tracing::info!(trace_id = %trace_id, "Applying ResumeFromStart migration strategy");
1412 start_step = 0;
1413 }
1414 ranvier_core::schematic::MigrationStrategy::MigrateActiveNode {
1415 new_node_id,
1416 ..
1417 } => {
1418 tracing::info!(trace_id = %trace_id, to_node = %new_node_id, "Applying MigrateActiveNode strategy");
1419 if let Some(target_idx) = self
1420 .schematic
1421 .nodes
1422 .iter()
1423 .position(|n| n.id == new_node_id || n.label == new_node_id)
1424 {
1425 start_step = target_idx as u64;
1426 } else {
1427 tracing::warn!(trace_id = %trace_id, "MigrateActiveNode: target node {} not found", new_node_id);
1428 return Outcome::emit(
1429 "execution.resumption.migration_target_not_found",
1430 Some(serde_json::json!({ "node_id": new_node_id })),
1431 );
1432 }
1433 }
1434 ranvier_core::schematic::MigrationStrategy::FallbackToNode(node_id) => {
1435 tracing::info!(trace_id = %trace_id, to_node = %node_id, "Applying FallbackToNode strategy");
1436 if let Some(target_idx) = self
1437 .schematic
1438 .nodes
1439 .iter()
1440 .position(|n| n.id == node_id || n.label == node_id)
1441 {
1442 start_step = target_idx as u64;
1443 } else {
1444 tracing::warn!(trace_id = %trace_id, "FallbackToNode: node {} not found", node_id);
1445 return Outcome::emit(
1446 "execution.resumption.migration_target_not_found",
1447 Some(serde_json::json!({ "node_id": node_id })),
1448 );
1449 }
1450 }
1451 ranvier_core::schematic::MigrationStrategy::Fail => {
1452 tracing::error!(trace_id = %trace_id, "Version mismatch: no migration path found. Failing resumption.");
1453 return Outcome::emit(
1454 "execution.resumption.version_mismatch_failed",
1455 Some(serde_json::json!({
1456 "trace_id": trace_id,
1457 "old_version": old_version,
1458 "current_version": version
1459 })),
1460 );
1461 }
1462 _ => {
1463 tracing::error!(trace_id = %trace_id, "Unsupported migration strategy: {:?}", strategy);
1464 return Outcome::emit(
1465 "execution.resumption.unsupported_migration",
1466 Some(serde_json::json!({
1467 "trace_id": trace_id,
1468 "strategy": format!("{:?}", strategy)
1469 })),
1470 );
1471 }
1472 }
1473 }
1474
1475 let ingress_node_id = self.schematic.nodes.first().map(|n| n.id.clone());
1476 persist_execution_event(
1477 handle,
1478 &trace_id,
1479 &label,
1480 &version,
1481 start_step,
1482 ingress_node_id,
1483 "Enter",
1484 None,
1485 )
1486 .await;
1487
1488 bus.insert(StartStep(start_step));
1489 if start_step > 0 {
1490 bus.insert(ResumptionState {
1491 payload: last_payload,
1492 });
1493 }
1494
1495 Some(start_step)
1496 } else {
1497 None
1498 };
1499
1500 let should_capture = should_attach_timeline(bus);
1501 let inserted_timeline = if should_capture {
1502 ensure_timeline(bus)
1503 } else {
1504 false
1505 };
1506 let ingress_started = std::time::Instant::now();
1507 let ingress_enter_ts = now_ms();
1508 if should_capture
1509 && let (Some(timeline), Some(ingress)) =
1510 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1511 {
1512 timeline.push(TimelineEvent::NodeEnter {
1513 node_id: ingress.id.clone(),
1514 node_label: ingress.label.clone(),
1515 timestamp: ingress_enter_ts,
1516 });
1517 }
1518
1519 let circuit_span = tracing::info_span!(
1520 "Circuit",
1521 ranvier.circuit = %label,
1522 ranvier.outcome_kind = tracing::field::Empty,
1523 ranvier.outcome_target = tracing::field::Empty
1524 );
1525 let outcome = (self.executor)(input, resources, bus)
1526 .instrument(circuit_span.clone())
1527 .await;
1528 circuit_span.record("ranvier.outcome_kind", outcome_kind_name(&outcome));
1529 if let Some(target) = outcome_target(&outcome) {
1530 circuit_span.record("ranvier.outcome_target", tracing::field::display(&target));
1531 }
1532
1533 if matches!(outcome, Outcome::Fault(_)) && self.saga_policy == SagaPolicy::Enabled {
1535 while let Some(task) = {
1536 let mut stack = bus.read_mut::<SagaStack>();
1537 stack.as_mut().and_then(|s| s.pop())
1538 } {
1539 tracing::info!(trace_id = %trace_id, node_id = %task.node_id, "Compensating step: {}", task.node_label);
1540
1541 let handler = {
1542 let registry = self.saga_compensation_registry.read().unwrap();
1543 registry.get(&task.node_id)
1544 };
1545 if let Some(handler) = handler {
1546 let res = handler(task.input_snapshot, resources, bus).await;
1547 if let Outcome::Fault(e) = res {
1548 tracing::error!(trace_id = %trace_id, node_id = %task.node_id, "Saga compensation FAILED: {:?}", e);
1549 }
1550 } else {
1551 tracing::warn!(trace_id = %trace_id, node_id = %task.node_id, "No compensation handler found in registry for saga rollback");
1552 }
1553 }
1554 tracing::info!(trace_id = %trace_id, "Saga automated rollback completed");
1555 }
1556
1557 let ingress_exit_ts = now_ms();
1558 if should_capture
1559 && let (Some(timeline), Some(ingress)) =
1560 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1561 {
1562 timeline.push(TimelineEvent::NodeExit {
1563 node_id: ingress.id.clone(),
1564 outcome_type: outcome_type_name(&outcome),
1565 duration_ms: ingress_started.elapsed().as_millis() as u64,
1566 timestamp: ingress_exit_ts,
1567 });
1568 }
1569
1570 if let Some(handle) = persistence_handle.as_ref() {
1571 let fault_step = persistence_start_step.map(|s| s + 1).unwrap_or(1);
1572 persist_execution_event(
1573 handle,
1574 &trace_id,
1575 &label,
1576 &version,
1577 fault_step,
1578 None, outcome_kind_name(&outcome),
1580 Some(outcome.to_json_value()),
1581 )
1582 .await;
1583
1584 let mut completion = completion_from_outcome(&outcome);
1585 if matches!(outcome, Outcome::Fault(_))
1586 && let Some(compensation) = compensation_handle.as_ref()
1587 && compensation_auto_trigger(bus)
1588 {
1589 let context = CompensationContext {
1590 trace_id: trace_id.clone(),
1591 circuit: label.clone(),
1592 fault_kind: outcome_kind_name(&outcome).to_string(),
1593 fault_step,
1594 timestamp_ms: now_ms(),
1595 };
1596
1597 if run_compensation(
1598 compensation,
1599 context,
1600 compensation_retry_policy,
1601 compensation_idempotency.clone(),
1602 )
1603 .await
1604 {
1605 persist_execution_event(
1606 handle,
1607 &trace_id,
1608 &label,
1609 &version,
1610 fault_step.saturating_add(1),
1611 None,
1612 "Compensated",
1613 None,
1614 )
1615 .await;
1616 completion = CompletionState::Compensated;
1617 }
1618 }
1619
1620 if persistence_auto_complete(bus) {
1621 persist_completion(handle, &trace_id, completion).await;
1622 }
1623 }
1624
1625 if should_capture {
1626 maybe_export_timeline(bus, &outcome);
1627 }
1628 if inserted_timeline {
1629 let _ = bus.remove::<Timeline>();
1630 }
1631
1632 outcome
1633 }
1634
1635 pub fn serve_inspector(self, port: u16) -> Self {
1638 if !inspector_dev_mode_from_env() {
1639 tracing::info!("Inspector disabled because RANVIER_MODE is production");
1640 return self;
1641 }
1642 if !inspector_enabled_from_env() {
1643 tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
1644 return self;
1645 }
1646
1647 let schematic = self.schematic.clone();
1648 let axon_inspector = Arc::new(self.clone());
1649 tokio::spawn(async move {
1650 if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
1651 .with_projection_files_from_env()
1652 .with_mode_from_env()
1653 .with_auth_policy_from_env()
1654 .with_state_inspector(axon_inspector)
1655 .serve()
1656 .await
1657 {
1658 tracing::error!("Inspector server failed: {}", e);
1659 }
1660 });
1661 self
1662 }
1663
1664 pub fn schematic(&self) -> &Schematic {
1666 &self.schematic
1667 }
1668
1669 pub fn into_schematic(self) -> Schematic {
1671 self.schematic
1672 }
1673
1674 pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
1685 schematic_export_request_from_process()
1686 }
1687
1688 pub fn maybe_export_and_exit(&self) -> anyhow::Result<bool> {
1700 self.maybe_export_and_exit_with(|_| ())
1701 }
1702
1703 pub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> anyhow::Result<bool>
1708 where
1709 F: FnOnce(&SchematicExportRequest),
1710 {
1711 let Some(request) = self.schematic_export_request() else {
1712 return Ok(false);
1713 };
1714 on_before_exit(&request);
1715 self.export_schematic(&request)?;
1716 Ok(true)
1717 }
1718
1719 pub fn export_schematic(&self, request: &SchematicExportRequest) -> anyhow::Result<()> {
1721 let json = serde_json::to_string_pretty(self.schematic())?;
1722 if let Some(path) = &request.output {
1723 if let Some(parent) = path.parent()
1724 && !parent.as_os_str().is_empty()
1725 {
1726 fs::create_dir_all(parent)?;
1727 }
1728 fs::write(path, json.as_bytes())?;
1729 return Ok(());
1730 }
1731 println!("{}", json);
1732 Ok(())
1733 }
1734}
1735
1736fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
1737 let args: Vec<OsString> = std::env::args_os().skip(1).collect();
1738 let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
1739 let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
1740
1741 let mut i = 0;
1742 while i < args.len() {
1743 let arg = args[i].to_string_lossy();
1744
1745 if arg == "--schematic" {
1746 enabled = true;
1747 i += 1;
1748 continue;
1749 }
1750
1751 if arg == "--schematic-output" || arg == "--output" {
1752 if let Some(next) = args.get(i + 1) {
1753 output = Some(PathBuf::from(next));
1754 i += 2;
1755 continue;
1756 }
1757 } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
1758 output = Some(PathBuf::from(value));
1759 } else if let Some(value) = arg.strip_prefix("--output=") {
1760 output = Some(PathBuf::from(value));
1761 }
1762
1763 i += 1;
1764 }
1765
1766 if enabled {
1767 Some(SchematicExportRequest { output })
1768 } else {
1769 None
1770 }
1771}
1772
1773fn env_flag_is_true(key: &str) -> bool {
1774 match std::env::var(key) {
1775 Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
1776 Err(_) => false,
1777 }
1778}
1779
1780fn inspector_enabled_from_env() -> bool {
1781 let raw = std::env::var("RANVIER_INSPECTOR").ok();
1782 inspector_enabled_from_value(raw.as_deref())
1783}
1784
1785fn inspector_enabled_from_value(value: Option<&str>) -> bool {
1786 match value {
1787 Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
1788 None => true,
1789 }
1790}
1791
1792fn inspector_dev_mode_from_env() -> bool {
1793 let raw = std::env::var("RANVIER_MODE").ok();
1794 inspector_dev_mode_from_value(raw.as_deref())
1795}
1796
1797fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
1798 !matches!(
1799 value.map(|v| v.to_ascii_lowercase()),
1800 Some(mode) if mode == "prod" || mode == "production"
1801 )
1802}
1803
1804fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
1805 let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
1806 Ok(v) if !v.trim().is_empty() => v,
1807 _ => return,
1808 };
1809
1810 let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
1811 let policy = timeline_adaptive_policy();
1812 let forced = should_force_export(outcome, &policy);
1813 let should_export = sampled || forced;
1814 if !should_export {
1815 record_sampling_stats(false, sampled, forced, "none", &policy);
1816 return;
1817 }
1818
1819 let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
1820 timeline.sort();
1821
1822 let mode = std::env::var("RANVIER_TIMELINE_MODE")
1823 .unwrap_or_else(|_| "overwrite".to_string())
1824 .to_ascii_lowercase();
1825
1826 if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
1827 tracing::warn!(
1828 "Failed to persist timeline file {} (mode={}): {}",
1829 path,
1830 mode,
1831 err
1832 );
1833 record_sampling_stats(false, sampled, forced, &mode, &policy);
1834 } else {
1835 record_sampling_stats(true, sampled, forced, &mode, &policy);
1836 }
1837}
1838
1839fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
1840 match outcome {
1841 Outcome::Next(_) => "Next".to_string(),
1842 Outcome::Branch(id, _) => format!("Branch:{}", id),
1843 Outcome::Jump(id, _) => format!("Jump:{}", id),
1844 Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
1845 Outcome::Fault(_) => "Fault".to_string(),
1846 }
1847}
1848
1849fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
1850 match outcome {
1851 Outcome::Next(_) => "Next",
1852 Outcome::Branch(_, _) => "Branch",
1853 Outcome::Jump(_, _) => "Jump",
1854 Outcome::Emit(_, _) => "Emit",
1855 Outcome::Fault(_) => "Fault",
1856 }
1857}
1858
1859fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
1860 match outcome {
1861 Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
1862 Outcome::Jump(node_id, _) => Some(node_id.to_string()),
1863 Outcome::Emit(event_type, _) => Some(event_type.clone()),
1864 Outcome::Next(_) | Outcome::Fault(_) => None,
1865 }
1866}
1867
1868fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
1869 match outcome {
1870 Outcome::Fault(_) => CompletionState::Fault,
1871 _ => CompletionState::Success,
1872 }
1873}
1874
1875fn persistence_trace_id(bus: &Bus) -> String {
1876 if let Some(explicit) = bus.read::<PersistenceTraceId>() {
1877 explicit.0.clone()
1878 } else {
1879 format!("{}:{}", bus.id, now_ms())
1880 }
1881}
1882
1883fn persistence_auto_complete(bus: &Bus) -> bool {
1884 bus.read::<PersistenceAutoComplete>()
1885 .map(|v| v.0)
1886 .unwrap_or(true)
1887}
1888
1889fn compensation_auto_trigger(bus: &Bus) -> bool {
1890 bus.read::<CompensationAutoTrigger>()
1891 .map(|v| v.0)
1892 .unwrap_or(true)
1893}
1894
1895fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
1896 bus.read::<CompensationRetryPolicy>()
1897 .copied()
1898 .unwrap_or_default()
1899}
1900
1901fn unwrap_outcome_payload(payload: Option<&serde_json::Value>) -> Option<serde_json::Value> {
1907 payload.map(|p| {
1908 p.get("Next")
1909 .or_else(|| p.get("Branch"))
1910 .or_else(|| p.get("Jump"))
1911 .cloned()
1912 .unwrap_or_else(|| p.clone())
1913 })
1914}
1915
1916async fn load_persistence_version(
1917 handle: &PersistenceHandle,
1918 trace_id: &str,
1919) -> (
1920 u64,
1921 Option<String>,
1922 Option<crate::persistence::Intervention>,
1923 Option<String>,
1924 Option<serde_json::Value>,
1925) {
1926 let store = handle.store();
1927 match store.load(trace_id).await {
1928 Ok(Some(trace)) => {
1929 let (next_step, last_node_id, last_payload) =
1930 if let Some(resume_from_step) = trace.resumed_from_step {
1931 let anchor_event = trace
1932 .events
1933 .iter()
1934 .rev()
1935 .find(|event| {
1936 event.step <= resume_from_step
1937 && event.outcome_kind == "Next"
1938 && event.payload.is_some()
1939 })
1940 .or_else(|| {
1941 trace.events.iter().rev().find(|event| {
1942 event.step <= resume_from_step
1943 && event.outcome_kind != "Fault"
1944 && event.payload.is_some()
1945 })
1946 })
1947 .or_else(|| {
1948 trace.events.iter().rev().find(|event| {
1949 event.step <= resume_from_step && event.payload.is_some()
1950 })
1951 })
1952 .or_else(|| trace.events.last());
1953
1954 (
1955 resume_from_step.saturating_add(1),
1956 anchor_event.and_then(|event| event.node_id.clone()),
1957 anchor_event.and_then(|event| {
1958 unwrap_outcome_payload(event.payload.as_ref())
1959 }),
1960 )
1961 } else {
1962 let last_event = trace.events.last();
1963 (
1964 last_event
1965 .map(|event| event.step.saturating_add(1))
1966 .unwrap_or(0),
1967 last_event.and_then(|event| event.node_id.clone()),
1968 last_event.and_then(|event| {
1969 unwrap_outcome_payload(event.payload.as_ref())
1970 }),
1971 )
1972 };
1973
1974 (
1975 next_step,
1976 Some(trace.schematic_version),
1977 trace.interventions.last().cloned(),
1978 last_node_id,
1979 last_payload,
1980 )
1981 }
1982 Ok(None) => (0, None, None, None, None),
1983 Err(err) => {
1984 tracing::warn!(
1985 trace_id = %trace_id,
1986 error = %err,
1987 "Failed to load persistence trace; falling back to step=0"
1988 );
1989 (0, None, None, None, None)
1990 }
1991 }
1992}
1993
1994#[allow(clippy::too_many_arguments)]
1995async fn run_this_step<In, Out, E, Res>(
1996 trans: &(impl Transition<In, Out, Resources = Res, Error = E> + Clone + 'static),
1997 state: In,
1998 res: &Res,
1999 bus: &mut Bus,
2000 node_id: &str,
2001 node_label: &str,
2002 bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2003 step_idx: u64,
2004) -> Outcome<Out, E>
2005where
2006 In: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2007 Out: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2008 E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2009 Res: ranvier_core::transition::ResourceRequirement,
2010{
2011 let label = trans.label();
2012 let res_type = std::any::type_name::<Res>()
2013 .split("::")
2014 .last()
2015 .unwrap_or("unknown");
2016
2017 let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2019 debug.should_pause(node_id)
2020 } else {
2021 false
2022 };
2023
2024 if should_pause {
2025 let trace_id = persistence_trace_id(bus);
2026 tracing::event!(
2027 target: "ranvier.debugger",
2028 tracing::Level::INFO,
2029 trace_id = %trace_id,
2030 node_id = %node_id,
2031 "Node paused"
2032 );
2033
2034 if let Some(timeline) = bus.read_mut::<Timeline>() {
2035 timeline.push(TimelineEvent::NodePaused {
2036 node_id: node_id.to_string(),
2037 timestamp: now_ms(),
2038 });
2039 }
2040 if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2041 debug.wait().await;
2042 }
2043 }
2044
2045 let enter_ts = now_ms();
2046 if let Some(timeline) = bus.read_mut::<Timeline>() {
2047 timeline.push(TimelineEvent::NodeEnter {
2048 node_id: node_id.to_string(),
2049 node_label: node_label.to_string(),
2050 timestamp: enter_ts,
2051 });
2052 }
2053
2054 let dlq_retry_config = bus.read::<DlqPolicy>().and_then(|p| {
2056 if let DlqPolicy::RetryThenDlq {
2057 max_attempts,
2058 backoff_ms,
2059 } = p
2060 {
2061 Some((*max_attempts, *backoff_ms))
2062 } else {
2063 None
2064 }
2065 });
2066 let retry_state_snapshot = if dlq_retry_config.is_some() {
2067 serde_json::to_value(&state).ok()
2068 } else {
2069 None
2070 };
2071
2072 let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2074 Some(serde_json::to_vec(&state).unwrap_or_default())
2075 } else {
2076 None
2077 };
2078
2079 let node_span = tracing::info_span!(
2080 "Node",
2081 ranvier.node = %label,
2082 ranvier.resource_type = %res_type,
2083 ranvier.outcome_kind = tracing::field::Empty,
2084 ranvier.outcome_target = tracing::field::Empty
2085 );
2086 let started = std::time::Instant::now();
2087 bus.set_access_policy(label.clone(), bus_policy.clone());
2088 let result = trans
2089 .run(state, res, bus)
2090 .instrument(node_span.clone())
2091 .await;
2092 bus.clear_access_policy();
2093
2094 let result = if let Outcome::Fault(_) = &result {
2097 if let (Some((max_attempts, backoff_ms)), Some(snapshot)) =
2098 (dlq_retry_config, &retry_state_snapshot)
2099 {
2100 let mut final_result = result;
2101 for attempt in 2..=max_attempts {
2103 let delay = backoff_ms.saturating_mul(2u64.saturating_pow(attempt - 2));
2104
2105 tracing::info!(
2106 ranvier.node = %label,
2107 attempt = attempt,
2108 max_attempts = max_attempts,
2109 backoff_ms = delay,
2110 "Retrying faulted node"
2111 );
2112
2113 if let Some(timeline) = bus.read_mut::<Timeline>() {
2114 timeline.push(TimelineEvent::NodeRetry {
2115 node_id: node_id.to_string(),
2116 attempt,
2117 max_attempts,
2118 backoff_ms: delay,
2119 timestamp: now_ms(),
2120 });
2121 }
2122
2123 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
2124
2125 if let Ok(retry_state) = serde_json::from_value::<In>(snapshot.clone()) {
2126 bus.set_access_policy(label.clone(), bus_policy.clone());
2127 let retry_result = trans
2128 .run(retry_state, res, bus)
2129 .instrument(tracing::info_span!(
2130 "NodeRetry",
2131 ranvier.node = %label,
2132 attempt = attempt
2133 ))
2134 .await;
2135 bus.clear_access_policy();
2136
2137 match &retry_result {
2138 Outcome::Fault(_) => {
2139 final_result = retry_result;
2140 }
2141 _ => {
2142 final_result = retry_result;
2143 break;
2144 }
2145 }
2146 }
2147 }
2148 final_result
2149 } else {
2150 result
2151 }
2152 } else {
2153 result
2154 };
2155
2156 node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
2157 if let Some(target) = outcome_target(&result) {
2158 node_span.record("ranvier.outcome_target", tracing::field::display(&target));
2159 }
2160 let duration_ms = started.elapsed().as_millis() as u64;
2161 let exit_ts = now_ms();
2162
2163 if let Some(timeline) = bus.read_mut::<Timeline>() {
2164 timeline.push(TimelineEvent::NodeExit {
2165 node_id: node_id.to_string(),
2166 outcome_type: outcome_type_name(&result),
2167 duration_ms,
2168 timestamp: exit_ts,
2169 });
2170
2171 if let Outcome::Branch(branch_id, _) = &result {
2172 timeline.push(TimelineEvent::Branchtaken {
2173 branch_id: branch_id.clone(),
2174 timestamp: exit_ts,
2175 });
2176 }
2177 }
2178
2179 if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot)
2181 && let Some(stack) = bus.read_mut::<SagaStack>()
2182 {
2183 stack.push(node_id.to_string(), label.clone(), snapshot);
2184 }
2185
2186 if let Some(handle) = bus.read::<PersistenceHandle>() {
2187 let trace_id = persistence_trace_id(bus);
2188 let circuit = bus
2189 .read::<ranvier_core::schematic::Schematic>()
2190 .map(|s| s.name.clone())
2191 .unwrap_or_default();
2192 let version = bus
2193 .read::<ranvier_core::schematic::Schematic>()
2194 .map(|s| s.schema_version.clone())
2195 .unwrap_or_default();
2196
2197 persist_execution_event(
2198 handle,
2199 &trace_id,
2200 &circuit,
2201 &version,
2202 step_idx,
2203 Some(node_id.to_string()),
2204 outcome_kind_name(&result),
2205 Some(result.to_json_value()),
2206 )
2207 .await;
2208 }
2209
2210 if let Outcome::Fault(f) = &result {
2213 let dlq_action = {
2215 let policy = bus.read::<DlqPolicy>();
2216 let sink = bus.read::<Arc<dyn DlqSink>>();
2217 match (sink, policy) {
2218 (Some(s), Some(p)) if !matches!(p, DlqPolicy::Drop) => Some(s.clone()),
2219 _ => None,
2220 }
2221 };
2222
2223 if let Some(sink) = dlq_action {
2224 if let Some((max_attempts, _)) = dlq_retry_config
2225 && let Some(timeline) = bus.read_mut::<Timeline>()
2226 {
2227 timeline.push(TimelineEvent::DlqExhausted {
2228 node_id: node_id.to_string(),
2229 total_attempts: max_attempts,
2230 timestamp: now_ms(),
2231 });
2232 }
2233
2234 let trace_id = persistence_trace_id(bus);
2235 let circuit = bus
2236 .read::<ranvier_core::schematic::Schematic>()
2237 .map(|s| s.name.clone())
2238 .unwrap_or_default();
2239 let _ = sink
2240 .store_dead_letter(
2241 &trace_id,
2242 &circuit,
2243 node_id,
2244 &format!("{:?}", f),
2245 &serde_json::to_vec(&f).unwrap_or_default(),
2246 )
2247 .await;
2248 }
2249 }
2250
2251 result
2252}
2253
2254#[allow(clippy::too_many_arguments)]
2255async fn run_this_compensated_step<Out, Next, E, Res, Comp>(
2256 trans: &(impl Transition<Out, Next, Resources = Res, Error = E> + Clone + 'static),
2257 comp: &Comp,
2258 state: Out,
2259 res: &Res,
2260 bus: &mut Bus,
2261 node_id: &str,
2262 comp_node_id: &str,
2263 node_label: &str,
2264 bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2265 step_idx: u64,
2266) -> Outcome<Next, E>
2267where
2268 Out: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
2269 Next: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2270 E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2271 Res: ranvier_core::transition::ResourceRequirement,
2272 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
2273{
2274 let label = trans.label();
2275
2276 let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2278 debug.should_pause(node_id)
2279 } else {
2280 false
2281 };
2282
2283 if should_pause {
2284 let trace_id = persistence_trace_id(bus);
2285 tracing::event!(
2286 target: "ranvier.debugger",
2287 tracing::Level::INFO,
2288 trace_id = %trace_id,
2289 node_id = %node_id,
2290 "Node paused (compensated)"
2291 );
2292
2293 if let Some(timeline) = bus.read_mut::<Timeline>() {
2294 timeline.push(TimelineEvent::NodePaused {
2295 node_id: node_id.to_string(),
2296 timestamp: now_ms(),
2297 });
2298 }
2299 if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2300 debug.wait().await;
2301 }
2302 }
2303
2304 let enter_ts = now_ms();
2305 if let Some(timeline) = bus.read_mut::<Timeline>() {
2306 timeline.push(TimelineEvent::NodeEnter {
2307 node_id: node_id.to_string(),
2308 node_label: node_label.to_string(),
2309 timestamp: enter_ts,
2310 });
2311 }
2312
2313 let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2315 Some(serde_json::to_vec(&state).unwrap_or_default())
2316 } else {
2317 None
2318 };
2319
2320 let node_span = tracing::info_span!("Node", ranvier.node = %label);
2321 bus.set_access_policy(label.clone(), bus_policy.clone());
2322 let result = trans
2323 .run(state.clone(), res, bus)
2324 .instrument(node_span)
2325 .await;
2326 bus.clear_access_policy();
2327
2328 let duration_ms = 0; let exit_ts = now_ms();
2330
2331 if let Some(timeline) = bus.read_mut::<Timeline>() {
2332 timeline.push(TimelineEvent::NodeExit {
2333 node_id: node_id.to_string(),
2334 outcome_type: outcome_type_name(&result),
2335 duration_ms,
2336 timestamp: exit_ts,
2337 });
2338 }
2339
2340 if let Outcome::Fault(ref err) = result {
2342 if compensation_auto_trigger(bus) {
2343 tracing::info!(
2344 ranvier.node = %label,
2345 ranvier.compensation.trigger = "saga",
2346 error = ?err,
2347 "Saga compensation triggered"
2348 );
2349
2350 if let Some(timeline) = bus.read_mut::<Timeline>() {
2351 timeline.push(TimelineEvent::NodeEnter {
2352 node_id: comp_node_id.to_string(),
2353 node_label: format!("Compensate: {}", comp.label()),
2354 timestamp: exit_ts,
2355 });
2356 }
2357
2358 let _ = comp.run(state, res, bus).await;
2360
2361 if let Some(timeline) = bus.read_mut::<Timeline>() {
2362 timeline.push(TimelineEvent::NodeExit {
2363 node_id: comp_node_id.to_string(),
2364 outcome_type: "Compensated".to_string(),
2365 duration_ms: 0,
2366 timestamp: now_ms(),
2367 });
2368 }
2369
2370 if let Some(handle) = bus.read::<PersistenceHandle>() {
2371 let trace_id = persistence_trace_id(bus);
2372 let circuit = bus
2373 .read::<ranvier_core::schematic::Schematic>()
2374 .map(|s| s.name.clone())
2375 .unwrap_or_default();
2376 let version = bus
2377 .read::<ranvier_core::schematic::Schematic>()
2378 .map(|s| s.schema_version.clone())
2379 .unwrap_or_default();
2380
2381 persist_execution_event(
2382 handle,
2383 &trace_id,
2384 &circuit,
2385 &version,
2386 step_idx + 1, Some(comp_node_id.to_string()),
2388 "Compensated",
2389 None,
2390 )
2391 .await;
2392 }
2393 }
2394 } else if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot) {
2395 if let Some(stack) = bus.read_mut::<SagaStack>() {
2397 stack.push(node_id.to_string(), label.clone(), snapshot);
2398 }
2399
2400 if let Some(handle) = bus.read::<PersistenceHandle>() {
2401 let trace_id = persistence_trace_id(bus);
2402 let circuit = bus
2403 .read::<ranvier_core::schematic::Schematic>()
2404 .map(|s| s.name.clone())
2405 .unwrap_or_default();
2406 let version = bus
2407 .read::<ranvier_core::schematic::Schematic>()
2408 .map(|s| s.schema_version.clone())
2409 .unwrap_or_default();
2410
2411 persist_execution_event(
2412 handle,
2413 &trace_id,
2414 &circuit,
2415 &version,
2416 step_idx,
2417 Some(node_id.to_string()),
2418 outcome_kind_name(&result),
2419 Some(result.to_json_value()),
2420 )
2421 .await;
2422 }
2423 }
2424
2425 if let Outcome::Fault(f) = &result
2427 && let (Some(sink), Some(policy)) =
2428 (bus.read::<Arc<dyn DlqSink>>(), bus.read::<DlqPolicy>())
2429 {
2430 let should_dlq = !matches!(policy, DlqPolicy::Drop);
2431 if should_dlq {
2432 let trace_id = persistence_trace_id(bus);
2433 let circuit = bus
2434 .read::<ranvier_core::schematic::Schematic>()
2435 .map(|s| s.name.clone())
2436 .unwrap_or_default();
2437 let _ = sink
2438 .store_dead_letter(
2439 &trace_id,
2440 &circuit,
2441 node_id,
2442 &format!("{:?}", f),
2443 &serde_json::to_vec(&f).unwrap_or_default(),
2444 )
2445 .await;
2446 }
2447 }
2448
2449 result
2450}
2451
2452#[allow(clippy::too_many_arguments)]
2453pub async fn persist_execution_event(
2454 handle: &PersistenceHandle,
2455 trace_id: &str,
2456 circuit: &str,
2457 schematic_version: &str,
2458 step: u64,
2459 node_id: Option<String>,
2460 outcome_kind: &str,
2461 payload: Option<serde_json::Value>,
2462) {
2463 let store = handle.store();
2464 let envelope = PersistenceEnvelope {
2465 trace_id: trace_id.to_string(),
2466 circuit: circuit.to_string(),
2467 schematic_version: schematic_version.to_string(),
2468 step,
2469 node_id,
2470 outcome_kind: outcome_kind.to_string(),
2471 timestamp_ms: now_ms(),
2472 payload_hash: None,
2473 payload,
2474 };
2475
2476 if let Err(err) = store.append(envelope).await {
2477 tracing::warn!(
2478 trace_id = %trace_id,
2479 circuit = %circuit,
2480 step,
2481 outcome_kind = %outcome_kind,
2482 error = %err,
2483 "Failed to append persistence envelope"
2484 );
2485 }
2486}
2487
2488async fn persist_completion(
2489 handle: &PersistenceHandle,
2490 trace_id: &str,
2491 completion: CompletionState,
2492) {
2493 let store = handle.store();
2494 if let Err(err) = store.complete(trace_id, completion).await {
2495 tracing::warn!(
2496 trace_id = %trace_id,
2497 error = %err,
2498 "Failed to complete persistence trace"
2499 );
2500 }
2501}
2502
2503fn compensation_idempotency_key(context: &CompensationContext) -> String {
2504 format!(
2505 "{}:{}:{}",
2506 context.trace_id, context.circuit, context.fault_kind
2507 )
2508}
2509
2510async fn run_compensation(
2511 handle: &CompensationHandle,
2512 context: CompensationContext,
2513 retry_policy: CompensationRetryPolicy,
2514 idempotency: Option<CompensationIdempotencyHandle>,
2515) -> bool {
2516 let hook = handle.hook();
2517 let key = compensation_idempotency_key(&context);
2518
2519 if let Some(store_handle) = idempotency.as_ref() {
2520 let store = store_handle.store();
2521 match store.was_compensated(&key).await {
2522 Ok(true) => {
2523 tracing::info!(
2524 trace_id = %context.trace_id,
2525 circuit = %context.circuit,
2526 key = %key,
2527 "Compensation already recorded; skipping duplicate hook execution"
2528 );
2529 return true;
2530 }
2531 Ok(false) => {}
2532 Err(err) => {
2533 tracing::warn!(
2534 trace_id = %context.trace_id,
2535 key = %key,
2536 error = %err,
2537 "Failed to check compensation idempotency state"
2538 );
2539 }
2540 }
2541 }
2542
2543 let max_attempts = retry_policy.max_attempts.max(1);
2544 for attempt in 1..=max_attempts {
2545 match hook.compensate(context.clone()).await {
2546 Ok(()) => {
2547 if let Some(store_handle) = idempotency.as_ref() {
2548 let store = store_handle.store();
2549 if let Err(err) = store.mark_compensated(&key).await {
2550 tracing::warn!(
2551 trace_id = %context.trace_id,
2552 key = %key,
2553 error = %err,
2554 "Failed to mark compensation idempotency state"
2555 );
2556 }
2557 }
2558 return true;
2559 }
2560 Err(err) => {
2561 let is_last = attempt == max_attempts;
2562 tracing::warn!(
2563 trace_id = %context.trace_id,
2564 circuit = %context.circuit,
2565 fault_kind = %context.fault_kind,
2566 fault_step = context.fault_step,
2567 attempt,
2568 max_attempts,
2569 error = %err,
2570 "Compensation hook attempt failed"
2571 );
2572 if !is_last && retry_policy.backoff_ms > 0 {
2573 tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
2574 .await;
2575 }
2576 }
2577 }
2578 }
2579 false
2580}
2581
2582fn ensure_timeline(bus: &mut Bus) -> bool {
2583 if bus.has::<Timeline>() {
2584 false
2585 } else {
2586 bus.insert(Timeline::new());
2587 true
2588 }
2589}
2590
2591fn should_attach_timeline(bus: &Bus) -> bool {
2592 if bus.has::<Timeline>() {
2594 return true;
2595 }
2596
2597 has_timeline_output_path()
2599}
2600
2601fn has_timeline_output_path() -> bool {
2602 std::env::var("RANVIER_TIMELINE_OUTPUT")
2603 .ok()
2604 .map(|v| !v.trim().is_empty())
2605 .unwrap_or(false)
2606}
2607
2608fn timeline_sample_rate() -> f64 {
2609 std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
2610 .ok()
2611 .and_then(|v| v.parse::<f64>().ok())
2612 .map(|v| v.clamp(0.0, 1.0))
2613 .unwrap_or(1.0)
2614}
2615
2616fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
2617 if rate <= 0.0 {
2618 return false;
2619 }
2620 if rate >= 1.0 {
2621 return true;
2622 }
2623 let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
2624 bucket < rate
2625}
2626
2627fn timeline_adaptive_policy() -> String {
2628 std::env::var("RANVIER_TIMELINE_ADAPTIVE")
2629 .unwrap_or_else(|_| "fault_branch".to_string())
2630 .to_ascii_lowercase()
2631}
2632
2633fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
2634 match policy {
2635 "off" => false,
2636 "fault_only" => matches!(outcome, Outcome::Fault(_)),
2637 "fault_branch_emit" => {
2638 matches!(
2639 outcome,
2640 Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
2641 )
2642 }
2643 _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
2644 }
2645}
2646
2647#[derive(Default, Clone)]
2648struct SamplingStats {
2649 total_decisions: u64,
2650 exported: u64,
2651 skipped: u64,
2652 sampled_exports: u64,
2653 forced_exports: u64,
2654 last_mode: String,
2655 last_policy: String,
2656 last_updated_ms: u64,
2657}
2658
2659static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
2660
2661fn stats_cell() -> &'static Mutex<SamplingStats> {
2662 TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
2663}
2664
2665fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
2666 let snapshot = {
2667 let mut stats = match stats_cell().lock() {
2668 Ok(guard) => guard,
2669 Err(_) => return,
2670 };
2671
2672 stats.total_decisions += 1;
2673 if exported {
2674 stats.exported += 1;
2675 } else {
2676 stats.skipped += 1;
2677 }
2678 if sampled && exported {
2679 stats.sampled_exports += 1;
2680 }
2681 if forced && exported {
2682 stats.forced_exports += 1;
2683 }
2684 stats.last_mode = mode.to_string();
2685 stats.last_policy = policy.to_string();
2686 stats.last_updated_ms = now_ms();
2687 stats.clone()
2688 };
2689
2690 tracing::debug!(
2691 ranvier.timeline.total_decisions = snapshot.total_decisions,
2692 ranvier.timeline.exported = snapshot.exported,
2693 ranvier.timeline.skipped = snapshot.skipped,
2694 ranvier.timeline.sampled_exports = snapshot.sampled_exports,
2695 ranvier.timeline.forced_exports = snapshot.forced_exports,
2696 ranvier.timeline.mode = %snapshot.last_mode,
2697 ranvier.timeline.policy = %snapshot.last_policy,
2698 "Timeline sampling stats updated"
2699 );
2700
2701 if let Some(path) = timeline_stats_output_path() {
2702 let payload = serde_json::json!({
2703 "total_decisions": snapshot.total_decisions,
2704 "exported": snapshot.exported,
2705 "skipped": snapshot.skipped,
2706 "sampled_exports": snapshot.sampled_exports,
2707 "forced_exports": snapshot.forced_exports,
2708 "last_mode": snapshot.last_mode,
2709 "last_policy": snapshot.last_policy,
2710 "last_updated_ms": snapshot.last_updated_ms
2711 });
2712 if let Some(parent) = Path::new(&path).parent() {
2713 let _ = fs::create_dir_all(parent);
2714 }
2715 if let Err(err) = fs::write(&path, payload.to_string()) {
2716 tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
2717 }
2718 }
2719}
2720
2721fn timeline_stats_output_path() -> Option<String> {
2722 std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
2723 .ok()
2724 .filter(|v| !v.trim().is_empty())
2725}
2726
2727fn write_timeline_with_policy(
2728 path: &str,
2729 mode: &str,
2730 mut timeline: Timeline,
2731) -> Result<(), String> {
2732 match mode {
2733 "append" => {
2734 if Path::new(path).exists() {
2735 let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
2736 match serde_json::from_str::<Timeline>(&content) {
2737 Ok(mut existing) => {
2738 existing.events.append(&mut timeline.events);
2739 existing.sort();
2740 if let Some(max_events) = max_events_limit() {
2741 truncate_timeline_events(&mut existing, max_events);
2742 }
2743 write_timeline_json(path, &existing)
2744 }
2745 Err(_) => {
2746 if let Some(max_events) = max_events_limit() {
2748 truncate_timeline_events(&mut timeline, max_events);
2749 }
2750 write_timeline_json(path, &timeline)
2751 }
2752 }
2753 } else {
2754 if let Some(max_events) = max_events_limit() {
2755 truncate_timeline_events(&mut timeline, max_events);
2756 }
2757 write_timeline_json(path, &timeline)
2758 }
2759 }
2760 "rotate" => {
2761 let rotated_path = rotated_path(path, now_ms());
2762 write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
2763 if let Some(keep) = rotate_keep_limit() {
2764 cleanup_rotated_files(path, keep)?;
2765 }
2766 Ok(())
2767 }
2768 _ => write_timeline_json(path, &timeline),
2769 }
2770}
2771
2772fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
2773 if let Some(parent) = Path::new(path).parent()
2774 && !parent.as_os_str().is_empty()
2775 {
2776 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
2777 }
2778 let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
2779 fs::write(path, json).map_err(|e| e.to_string())
2780}
2781
2782fn rotated_path(path: &str, suffix: u64) -> PathBuf {
2783 let p = Path::new(path);
2784 let parent = p.parent().unwrap_or_else(|| Path::new(""));
2785 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
2786 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
2787 parent.join(format!("{}_{}.{}", stem, suffix, ext))
2788}
2789
2790fn max_events_limit() -> Option<usize> {
2791 std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
2792 .ok()
2793 .and_then(|v| v.parse::<usize>().ok())
2794 .filter(|v| *v > 0)
2795}
2796
2797fn rotate_keep_limit() -> Option<usize> {
2798 std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
2799 .ok()
2800 .and_then(|v| v.parse::<usize>().ok())
2801 .filter(|v| *v > 0)
2802}
2803
2804fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
2805 let len = timeline.events.len();
2806 if len > max_events {
2807 let keep_from = len - max_events;
2808 timeline.events = timeline.events.split_off(keep_from);
2809 }
2810}
2811
2812fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
2813 let p = Path::new(base_path);
2814 let parent = p.parent().unwrap_or_else(|| Path::new("."));
2815 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
2816 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
2817 let prefix = format!("{}_", stem);
2818 let suffix = format!(".{}", ext);
2819
2820 let mut files = fs::read_dir(parent)
2821 .map_err(|e| e.to_string())?
2822 .filter_map(|entry| entry.ok())
2823 .filter(|entry| {
2824 let name = entry.file_name();
2825 let name = name.to_string_lossy();
2826 name.starts_with(&prefix) && name.ends_with(&suffix)
2827 })
2828 .map(|entry| {
2829 let modified = entry
2830 .metadata()
2831 .ok()
2832 .and_then(|m| m.modified().ok())
2833 .unwrap_or(SystemTime::UNIX_EPOCH);
2834 (entry.path(), modified)
2835 })
2836 .collect::<Vec<_>>();
2837
2838 files.sort_by(|a, b| b.1.cmp(&a.1));
2839 for (path, _) in files.into_iter().skip(keep) {
2840 let _ = fs::remove_file(path);
2841 }
2842 Ok(())
2843}
2844
2845fn bus_capability_schema_from_policy(
2846 policy: Option<ranvier_core::bus::BusAccessPolicy>,
2847) -> Option<BusCapabilitySchema> {
2848 let policy = policy?;
2849
2850 let mut allow = policy
2851 .allow
2852 .unwrap_or_default()
2853 .into_iter()
2854 .map(|entry| entry.type_name.to_string())
2855 .collect::<Vec<_>>();
2856 let mut deny = policy
2857 .deny
2858 .into_iter()
2859 .map(|entry| entry.type_name.to_string())
2860 .collect::<Vec<_>>();
2861 allow.sort();
2862 deny.sort();
2863
2864 if allow.is_empty() && deny.is_empty() {
2865 return None;
2866 }
2867
2868 Some(BusCapabilitySchema { allow, deny })
2869}
2870
2871fn now_ms() -> u64 {
2872 SystemTime::now()
2873 .duration_since(UNIX_EPOCH)
2874 .map(|d| d.as_millis() as u64)
2875 .unwrap_or(0)
2876}
2877
2878#[cfg(test)]
2879mod tests {
2880 use super::{
2881 Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
2882 should_force_export,
2883 };
2884 use crate::persistence::{
2885 CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
2886 CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
2887 InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
2888 PersistenceHandle, PersistenceStore, PersistenceTraceId,
2889 };
2890 use anyhow::Result;
2891 use async_trait::async_trait;
2892 use ranvier_audit::{AuditError, AuditEvent, AuditSink};
2893 use ranvier_core::event::{DlqPolicy, DlqSink};
2894 use ranvier_core::saga::SagaStack;
2895 use ranvier_core::timeline::{Timeline, TimelineEvent};
2896 use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
2897 use serde::{Deserialize, Serialize};
2898 use std::sync::Arc;
2899 use tokio::sync::Mutex;
2900 use uuid::Uuid;
2901
2902 struct MockAuditSink {
2903 events: Arc<Mutex<Vec<AuditEvent>>>,
2904 }
2905
2906 #[async_trait]
2907 impl AuditSink for MockAuditSink {
2908 async fn append(&self, event: &AuditEvent) -> Result<(), AuditError> {
2909 self.events.lock().await.push(event.clone());
2910 Ok(())
2911 }
2912 }
2913
2914 #[tokio::test]
2915 async fn execute_logs_audit_events_for_intervention() {
2916 use ranvier_inspector::StateInspector;
2917
2918 let trace_id = "test-audit-trace";
2919 let store_impl = InMemoryPersistenceStore::new();
2920 let events = Arc::new(Mutex::new(Vec::new()));
2921 let sink = MockAuditSink {
2922 events: events.clone(),
2923 };
2924
2925 let axon = Axon::<i32, i32, TestInfallible>::start("AuditTest")
2926 .then(AddOne)
2927 .with_persistence_store(store_impl.clone())
2928 .with_audit_sink(sink);
2929
2930 let mut bus = Bus::new();
2931 bus.insert(PersistenceHandle::from_arc(Arc::new(store_impl.clone())));
2932 bus.insert(PersistenceTraceId::new(trace_id));
2933 let target_node_id = axon.schematic.nodes[0].id.clone();
2934
2935 store_impl
2937 .append(crate::persistence::PersistenceEnvelope {
2938 trace_id: trace_id.to_string(),
2939 circuit: "AuditTest".to_string(),
2940 schematic_version: "v1.0".to_string(),
2941 step: 0,
2942 node_id: None,
2943 outcome_kind: "Next".to_string(),
2944 timestamp_ms: 0,
2945 payload_hash: None,
2946 payload: None,
2947 })
2948 .await
2949 .unwrap();
2950
2951 axon.force_resume(trace_id, &target_node_id, None)
2953 .await
2954 .unwrap();
2955
2956 axon.execute(10, &(), &mut bus).await;
2958
2959 let recorded = events.lock().await;
2960 assert_eq!(
2961 recorded.len(),
2962 2,
2963 "Should have 2 audit events: ForceResume and ApplyIntervention"
2964 );
2965 assert_eq!(recorded[0].action, "ForceResume");
2966 assert_eq!(recorded[0].target, trace_id);
2967 assert_eq!(recorded[1].action, "ApplyIntervention");
2968 assert_eq!(recorded[1].target, trace_id);
2969 }
2970
2971 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2972 pub enum TestInfallible {}
2973
2974 #[test]
2975 fn inspector_enabled_flag_matrix() {
2976 assert!(inspector_enabled_from_value(None));
2977 assert!(inspector_enabled_from_value(Some("1")));
2978 assert!(inspector_enabled_from_value(Some("true")));
2979 assert!(inspector_enabled_from_value(Some("on")));
2980 assert!(!inspector_enabled_from_value(Some("0")));
2981 assert!(!inspector_enabled_from_value(Some("false")));
2982 }
2983
2984 #[test]
2985 fn inspector_dev_mode_matrix() {
2986 assert!(inspector_dev_mode_from_value(None));
2987 assert!(inspector_dev_mode_from_value(Some("dev")));
2988 assert!(inspector_dev_mode_from_value(Some("staging")));
2989 assert!(!inspector_dev_mode_from_value(Some("prod")));
2990 assert!(!inspector_dev_mode_from_value(Some("production")));
2991 }
2992
2993 #[test]
2994 fn adaptive_policy_force_export_matrix() {
2995 let next = Outcome::<(), &'static str>::Next(());
2996 let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
2997 let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
2998 let fault = Outcome::<(), &'static str>::Fault("boom");
2999
3000 assert!(!should_force_export(&next, "off"));
3001 assert!(!should_force_export(&fault, "off"));
3002
3003 assert!(!should_force_export(&branch, "fault_only"));
3004 assert!(should_force_export(&fault, "fault_only"));
3005
3006 assert!(should_force_export(&branch, "fault_branch"));
3007 assert!(!should_force_export(&emit, "fault_branch"));
3008 assert!(should_force_export(&fault, "fault_branch"));
3009
3010 assert!(should_force_export(&branch, "fault_branch_emit"));
3011 assert!(should_force_export(&emit, "fault_branch_emit"));
3012 assert!(should_force_export(&fault, "fault_branch_emit"));
3013 }
3014
3015 #[test]
3016 fn sampling_and_adaptive_combination_decisions() {
3017 let bus_id = Uuid::nil();
3018 let next = Outcome::<(), &'static str>::Next(());
3019 let fault = Outcome::<(), &'static str>::Fault("boom");
3020
3021 let sampled_never = sampled_by_bus_id(bus_id, 0.0);
3022 assert!(!sampled_never);
3023 assert!(!(sampled_never || should_force_export(&next, "off")));
3024 assert!(sampled_never || should_force_export(&fault, "fault_only"));
3025
3026 let sampled_always = sampled_by_bus_id(bus_id, 1.0);
3027 assert!(sampled_always);
3028 assert!(sampled_always || should_force_export(&next, "off"));
3029 assert!(sampled_always || should_force_export(&fault, "off"));
3030 }
3031
3032 #[derive(Clone)]
3033 struct AddOne;
3034
3035 #[async_trait]
3036 impl Transition<i32, i32> for AddOne {
3037 type Error = TestInfallible;
3038 type Resources = ();
3039
3040 async fn run(
3041 &self,
3042 state: i32,
3043 _resources: &Self::Resources,
3044 _bus: &mut Bus,
3045 ) -> Outcome<i32, Self::Error> {
3046 Outcome::Next(state + 1)
3047 }
3048 }
3049
3050 #[derive(Clone)]
3051 struct AlwaysFault;
3052
3053 #[async_trait]
3054 impl Transition<i32, i32> for AlwaysFault {
3055 type Error = String;
3056 type Resources = ();
3057
3058 async fn run(
3059 &self,
3060 _state: i32,
3061 _resources: &Self::Resources,
3062 _bus: &mut Bus,
3063 ) -> Outcome<i32, Self::Error> {
3064 Outcome::Fault("boom".to_string())
3065 }
3066 }
3067
3068 #[derive(Clone)]
3069 struct CapabilityGuarded;
3070
3071 #[async_trait]
3072 impl Transition<(), ()> for CapabilityGuarded {
3073 type Error = String;
3074 type Resources = ();
3075
3076 fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
3077 Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
3078 }
3079
3080 async fn run(
3081 &self,
3082 _state: (),
3083 _resources: &Self::Resources,
3084 bus: &mut Bus,
3085 ) -> Outcome<(), Self::Error> {
3086 match bus.get::<String>() {
3087 Ok(_) => Outcome::Next(()),
3088 Err(err) => Outcome::Fault(err.to_string()),
3089 }
3090 }
3091 }
3092
3093 #[derive(Clone)]
3094 struct RecordingCompensationHook {
3095 calls: Arc<Mutex<Vec<CompensationContext>>>,
3096 should_fail: bool,
3097 }
3098
3099 #[async_trait]
3100 impl CompensationHook for RecordingCompensationHook {
3101 async fn compensate(&self, context: CompensationContext) -> Result<()> {
3102 self.calls.lock().await.push(context);
3103 if self.should_fail {
3104 return Err(anyhow::anyhow!("compensation failed"));
3105 }
3106 Ok(())
3107 }
3108 }
3109
3110 #[derive(Clone)]
3111 struct FlakyCompensationHook {
3112 calls: Arc<Mutex<u32>>,
3113 failures_remaining: Arc<Mutex<u32>>,
3114 }
3115
3116 #[async_trait]
3117 impl CompensationHook for FlakyCompensationHook {
3118 async fn compensate(&self, _context: CompensationContext) -> Result<()> {
3119 {
3120 let mut calls = self.calls.lock().await;
3121 *calls += 1;
3122 }
3123 let mut failures_remaining = self.failures_remaining.lock().await;
3124 if *failures_remaining > 0 {
3125 *failures_remaining -= 1;
3126 return Err(anyhow::anyhow!("transient compensation failure"));
3127 }
3128 Ok(())
3129 }
3130 }
3131
3132 #[derive(Clone)]
3133 struct FailingCompensationIdempotencyStore {
3134 read_calls: Arc<Mutex<u32>>,
3135 write_calls: Arc<Mutex<u32>>,
3136 }
3137
3138 #[async_trait]
3139 impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
3140 async fn was_compensated(&self, _key: &str) -> Result<bool> {
3141 let mut read_calls = self.read_calls.lock().await;
3142 *read_calls += 1;
3143 Err(anyhow::anyhow!("forced idempotency read failure"))
3144 }
3145
3146 async fn mark_compensated(&self, _key: &str) -> Result<()> {
3147 let mut write_calls = self.write_calls.lock().await;
3148 *write_calls += 1;
3149 Err(anyhow::anyhow!("forced idempotency write failure"))
3150 }
3151 }
3152
3153 #[tokio::test]
3154 async fn execute_persists_success_trace_when_handle_exists() {
3155 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3156 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3157
3158 let mut bus = Bus::new();
3159 bus.insert(PersistenceHandle::from_arc(store_dyn));
3160 bus.insert(PersistenceTraceId::new("trace-success"));
3161
3162 let axon = Axon::<i32, i32, TestInfallible>::start("PersistSuccess").then(AddOne);
3163 let outcome = axon.execute(41, &(), &mut bus).await;
3164 assert!(matches!(outcome, Outcome::Next(42)));
3165
3166 let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
3167 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[0].outcome_kind, "Enter");
3169 assert_eq!(persisted.events[1].outcome_kind, "Next"); assert_eq!(persisted.events[2].outcome_kind, "Next"); assert_eq!(persisted.completion, Some(CompletionState::Success));
3172 }
3173
3174 #[tokio::test]
3175 async fn execute_persists_fault_completion_state() {
3176 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3177 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3178
3179 let mut bus = Bus::new();
3180 bus.insert(PersistenceHandle::from_arc(store_dyn));
3181 bus.insert(PersistenceTraceId::new("trace-fault"));
3182
3183 let axon = Axon::<i32, i32, String>::start("PersistFault").then(AlwaysFault);
3184 let outcome = axon.execute(41, &(), &mut bus).await;
3185 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3186
3187 let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
3188 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[1].outcome_kind, "Fault"); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.completion, Some(CompletionState::Fault));
3192 }
3193
3194 #[tokio::test]
3195 async fn execute_respects_persistence_auto_complete_off() {
3196 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3197 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3198
3199 let mut bus = Bus::new();
3200 bus.insert(PersistenceHandle::from_arc(store_dyn));
3201 bus.insert(PersistenceTraceId::new("trace-no-complete"));
3202 bus.insert(PersistenceAutoComplete(false));
3203
3204 let axon = Axon::<i32, i32, TestInfallible>::start("PersistNoComplete").then(AddOne);
3205 let outcome = axon.execute(1, &(), &mut bus).await;
3206 assert!(matches!(outcome, Outcome::Next(2)));
3207
3208 let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
3209 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.completion, None);
3211 }
3212
3213 #[tokio::test]
3214 async fn fault_triggers_compensation_and_marks_compensated() {
3215 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3216 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3217 let calls = Arc::new(Mutex::new(Vec::new()));
3218 let compensation = RecordingCompensationHook {
3219 calls: calls.clone(),
3220 should_fail: false,
3221 };
3222
3223 let mut bus = Bus::new();
3224 bus.insert(PersistenceHandle::from_arc(store_dyn));
3225 bus.insert(PersistenceTraceId::new("trace-compensated"));
3226 bus.insert(CompensationHandle::from_hook(compensation));
3227
3228 let axon = Axon::<i32, i32, String>::start("CompensatedFault").then(AlwaysFault);
3229 let outcome = axon.execute(7, &(), &mut bus).await;
3230 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3231
3232 let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
3233 assert_eq!(persisted.events.len(), 4); assert_eq!(persisted.events[0].outcome_kind, "Enter");
3235 assert_eq!(persisted.events[1].outcome_kind, "Fault"); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.events[3].outcome_kind, "Compensated");
3238 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3239
3240 let recorded = calls.lock().await;
3241 assert_eq!(recorded.len(), 1);
3242 assert_eq!(recorded[0].trace_id, "trace-compensated");
3243 assert_eq!(recorded[0].fault_kind, "Fault");
3244 }
3245
3246 #[tokio::test]
3247 async fn failed_compensation_keeps_fault_completion() {
3248 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3249 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3250 let calls = Arc::new(Mutex::new(Vec::new()));
3251 let compensation = RecordingCompensationHook {
3252 calls: calls.clone(),
3253 should_fail: true,
3254 };
3255
3256 let mut bus = Bus::new();
3257 bus.insert(PersistenceHandle::from_arc(store_dyn));
3258 bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
3259 bus.insert(CompensationHandle::from_hook(compensation));
3260
3261 let axon = Axon::<i32, i32, String>::start("CompensationFails").then(AlwaysFault);
3262 let outcome = axon.execute(7, &(), &mut bus).await;
3263 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3264
3265 let persisted = store_impl
3266 .load("trace-compensation-failed")
3267 .await
3268 .unwrap()
3269 .unwrap();
3270 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.completion, Some(CompletionState::Fault));
3273
3274 let recorded = calls.lock().await;
3275 assert_eq!(recorded.len(), 1);
3276 }
3277
3278 #[tokio::test]
3279 async fn compensation_retry_policy_succeeds_after_retries() {
3280 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3281 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3282 let calls = Arc::new(Mutex::new(0u32));
3283 let failures_remaining = Arc::new(Mutex::new(2u32));
3284 let compensation = FlakyCompensationHook {
3285 calls: calls.clone(),
3286 failures_remaining,
3287 };
3288
3289 let mut bus = Bus::new();
3290 bus.insert(PersistenceHandle::from_arc(store_dyn));
3291 bus.insert(PersistenceTraceId::new("trace-retry-success"));
3292 bus.insert(CompensationHandle::from_hook(compensation));
3293 bus.insert(CompensationRetryPolicy {
3294 max_attempts: 3,
3295 backoff_ms: 0,
3296 });
3297
3298 let axon = Axon::<i32, i32, String>::start("CompensationRetry").then(AlwaysFault);
3299 let outcome = axon.execute(7, &(), &mut bus).await;
3300 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3301
3302 let persisted = store_impl
3303 .load("trace-retry-success")
3304 .await
3305 .unwrap()
3306 .unwrap();
3307 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3308 assert_eq!(
3309 persisted.events.last().map(|e| e.outcome_kind.as_str()),
3310 Some("Compensated")
3311 );
3312
3313 let attempt_count = calls.lock().await;
3314 assert_eq!(*attempt_count, 3);
3315 }
3316
3317 #[tokio::test]
3318 async fn compensation_idempotency_skips_duplicate_hook_execution() {
3319 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3320 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3321 let calls = Arc::new(Mutex::new(Vec::new()));
3322 let compensation = RecordingCompensationHook {
3323 calls: calls.clone(),
3324 should_fail: false,
3325 };
3326 let idempotency = InMemoryCompensationIdempotencyStore::new();
3327
3328 let mut bus = Bus::new();
3329 bus.insert(PersistenceHandle::from_arc(store_dyn));
3330 bus.insert(PersistenceTraceId::new("trace-idempotent"));
3331 bus.insert(PersistenceAutoComplete(false));
3332 bus.insert(CompensationHandle::from_hook(compensation));
3333 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3334
3335 let axon = Axon::<i32, i32, String>::start("CompensationIdempotency").then(AlwaysFault);
3336
3337 let outcome1 = axon.execute(7, &(), &mut bus).await;
3338 let outcome2 = axon.execute(8, &(), &mut bus).await;
3339 assert!(matches!(outcome1, Outcome::Fault(msg) if msg == "boom"));
3340 assert!(matches!(outcome2, Outcome::Fault(msg) if msg == "boom"));
3341
3342 let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
3343 assert_eq!(persisted.completion, None);
3344 let compensated_count = persisted
3346 .events
3347 .iter()
3348 .filter(|e| e.outcome_kind == "Compensated")
3349 .count();
3350 assert_eq!(
3351 compensated_count, 2,
3352 "Should have 2 Compensated events (one per execution)"
3353 );
3354
3355 let recorded = calls.lock().await;
3356 assert_eq!(recorded.len(), 1);
3357 }
3358
3359 #[tokio::test]
3360 async fn compensation_idempotency_store_failure_does_not_block_compensation() {
3361 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3362 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3363 let calls = Arc::new(Mutex::new(Vec::new()));
3364 let read_calls = Arc::new(Mutex::new(0u32));
3365 let write_calls = Arc::new(Mutex::new(0u32));
3366 let compensation = RecordingCompensationHook {
3367 calls: calls.clone(),
3368 should_fail: false,
3369 };
3370 let idempotency = FailingCompensationIdempotencyStore {
3371 read_calls: read_calls.clone(),
3372 write_calls: write_calls.clone(),
3373 };
3374
3375 let mut bus = Bus::new();
3376 bus.insert(PersistenceHandle::from_arc(store_dyn));
3377 bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
3378 bus.insert(CompensationHandle::from_hook(compensation));
3379 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3380
3381 let axon = Axon::<i32, i32, String>::start("IdempotencyStoreFailure").then(AlwaysFault);
3382 let outcome = axon.execute(9, &(), &mut bus).await;
3383 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3384
3385 let persisted = store_impl
3386 .load("trace-idempotency-store-failure")
3387 .await
3388 .unwrap()
3389 .unwrap();
3390 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3391 assert_eq!(
3392 persisted.events.last().map(|e| e.outcome_kind.as_str()),
3393 Some("Compensated")
3394 );
3395
3396 let recorded = calls.lock().await;
3397 assert_eq!(recorded.len(), 1);
3398 assert_eq!(*read_calls.lock().await, 1);
3399 assert_eq!(*write_calls.lock().await, 1);
3400 }
3401
3402 #[tokio::test]
3403 async fn transition_bus_policy_blocks_unauthorized_resource_access() {
3404 let mut bus = Bus::new();
3405 bus.insert(1_i32);
3406 bus.insert("secret".to_string());
3407
3408 let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
3409 let outcome = axon.execute((), &(), &mut bus).await;
3410
3411 match outcome {
3412 Outcome::Fault(msg) => {
3413 assert!(msg.contains("Bus access denied"), "{msg}");
3414 assert!(msg.contains("CapabilityGuarded"), "{msg}");
3415 assert!(msg.contains("alloc::string::String"), "{msg}");
3416 }
3417 other => panic!("expected fault, got {other:?}"),
3418 }
3419 }
3420
3421 #[tokio::test]
3422 async fn execute_fails_on_version_mismatch_without_migration() {
3423 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3424 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3425
3426 let trace_id = "v-mismatch";
3427 let old_envelope = crate::persistence::PersistenceEnvelope {
3429 trace_id: trace_id.to_string(),
3430 circuit: "TestCircuit".to_string(),
3431 schematic_version: "0.9".to_string(),
3432 step: 0,
3433 node_id: None,
3434 outcome_kind: "Enter".to_string(),
3435 timestamp_ms: 0,
3436 payload_hash: None,
3437 payload: None,
3438 };
3439 store_impl.append(old_envelope).await.unwrap();
3440
3441 let mut bus = Bus::new();
3442 bus.insert(PersistenceHandle::from_arc(store_dyn));
3443 bus.insert(PersistenceTraceId::new(trace_id));
3444
3445 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3447 let outcome = axon.execute(10, &(), &mut bus).await;
3448
3449 if let Outcome::Emit(kind, _) = outcome {
3450 assert_eq!(kind, "execution.resumption.version_mismatch_failed");
3451 } else {
3452 panic!("Expected version mismatch emission, got {:?}", outcome);
3453 }
3454 }
3455
3456 #[tokio::test]
3457 async fn execute_resumes_from_start_on_migration_strategy() {
3458 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3459 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3460
3461 let trace_id = "v-migration";
3462 let old_envelope = crate::persistence::PersistenceEnvelope {
3464 trace_id: trace_id.to_string(),
3465 circuit: "TestCircuit".to_string(),
3466 schematic_version: "0.9".to_string(),
3467 step: 5,
3468 node_id: None,
3469 outcome_kind: "Next".to_string(),
3470 timestamp_ms: 0,
3471 payload_hash: None,
3472 payload: None,
3473 };
3474 store_impl.append(old_envelope).await.unwrap();
3475
3476 let mut registry = ranvier_core::schematic::MigrationRegistry::new("TestCircuit");
3477 registry.register(ranvier_core::schematic::SnapshotMigration {
3478 name: Some("v0.9 to v1.0".to_string()),
3479 from_version: "0.9".to_string(),
3480 to_version: "1.0".to_string(),
3481 default_strategy: ranvier_core::schematic::MigrationStrategy::ResumeFromStart,
3482 node_mapping: std::collections::HashMap::new(),
3483 payload_mapper: None,
3484 });
3485
3486 let mut bus = Bus::new();
3487 bus.insert(PersistenceHandle::from_arc(store_dyn));
3488 bus.insert(PersistenceTraceId::new(trace_id));
3489 bus.insert(registry);
3490
3491 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3492 let outcome = axon.execute(10, &(), &mut bus).await;
3493
3494 assert!(matches!(outcome, Outcome::Next(11)));
3496
3497 let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3499 assert_eq!(persisted.schematic_version, "1.0");
3500 }
3501
3502 #[tokio::test]
3503 async fn execute_applies_manual_intervention_jump_and_payload() {
3504 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3505 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3506
3507 let trace_id = "intervention-test";
3508 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit")
3510 .then(AddOne)
3511 .then(AddOne);
3512
3513 let mut bus = Bus::new();
3514 bus.insert(PersistenceHandle::from_arc(store_dyn));
3515 bus.insert(PersistenceTraceId::new(trace_id));
3516
3517 let _target_node_label = "AddOne";
3522 let target_node_id = axon.schematic.nodes[2].id.clone();
3524
3525 store_impl
3527 .append(crate::persistence::PersistenceEnvelope {
3528 trace_id: trace_id.to_string(),
3529 circuit: "TestCircuit".to_string(),
3530 schematic_version: "1.0".to_string(),
3531 step: 0,
3532 node_id: None,
3533 outcome_kind: "Enter".to_string(),
3534 timestamp_ms: 0,
3535 payload_hash: None,
3536 payload: None,
3537 })
3538 .await
3539 .unwrap();
3540
3541 store_impl
3542 .save_intervention(
3543 trace_id,
3544 crate::persistence::Intervention {
3545 target_node: target_node_id.clone(),
3546 payload_override: Some(serde_json::json!(100)),
3547 timestamp_ms: 0,
3548 },
3549 )
3550 .await
3551 .unwrap();
3552
3553 let outcome = axon.execute(10, &(), &mut bus).await;
3556
3557 match outcome {
3558 Outcome::Next(val) => assert_eq!(val, 101, "Should have used payload 100 and added 1"),
3559 other => panic!("Expected Outcome::Next(101), got {:?}", other),
3560 }
3561
3562 let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3564 assert_eq!(persisted.interventions.len(), 1);
3566 assert_eq!(persisted.interventions[0].target_node, target_node_id);
3567 }
3568
3569 #[derive(Clone)]
3573 struct FailNThenSucceed {
3574 remaining: Arc<tokio::sync::Mutex<u32>>,
3575 }
3576
3577 #[async_trait]
3578 impl Transition<i32, i32> for FailNThenSucceed {
3579 type Error = String;
3580 type Resources = ();
3581
3582 async fn run(
3583 &self,
3584 state: i32,
3585 _resources: &Self::Resources,
3586 _bus: &mut Bus,
3587 ) -> Outcome<i32, Self::Error> {
3588 let mut rem = self.remaining.lock().await;
3589 if *rem > 0 {
3590 *rem -= 1;
3591 Outcome::Fault("transient failure".to_string())
3592 } else {
3593 Outcome::Next(state + 1)
3594 }
3595 }
3596 }
3597
3598 #[derive(Clone)]
3600 struct MockDlqSink {
3601 letters: Arc<tokio::sync::Mutex<Vec<String>>>,
3602 }
3603
3604 #[async_trait]
3605 impl DlqSink for MockDlqSink {
3606 async fn store_dead_letter(
3607 &self,
3608 workflow_id: &str,
3609 _circuit_label: &str,
3610 node_id: &str,
3611 error_msg: &str,
3612 _payload: &[u8],
3613 ) -> Result<(), String> {
3614 let entry = format!("{}:{}:{}", workflow_id, node_id, error_msg);
3615 self.letters.lock().await.push(entry);
3616 Ok(())
3617 }
3618 }
3619
3620 #[tokio::test]
3621 async fn retry_then_dlq_retries_and_succeeds_before_exhaustion() {
3622 let remaining = Arc::new(tokio::sync::Mutex::new(2u32));
3624 let trans = FailNThenSucceed { remaining };
3625
3626 let dlq_sink = MockDlqSink {
3627 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3628 };
3629
3630 let mut bus = Bus::new();
3631 bus.insert(Timeline::new());
3632
3633 let axon = Axon::<i32, i32, String>::start("RetrySucceed")
3634 .then(trans)
3635 .with_dlq_policy(DlqPolicy::RetryThenDlq {
3636 max_attempts: 5,
3637 backoff_ms: 1,
3638 })
3639 .with_dlq_sink(dlq_sink.clone());
3640 let outcome = axon.execute(10, &(), &mut bus).await;
3641
3642 assert!(
3644 matches!(outcome, Outcome::Next(11)),
3645 "Expected Next(11), got {:?}",
3646 outcome
3647 );
3648
3649 let letters = dlq_sink.letters.lock().await;
3651 assert!(
3652 letters.is_empty(),
3653 "Should have 0 dead letters, got {}",
3654 letters.len()
3655 );
3656
3657 let timeline = bus.read::<Timeline>().unwrap();
3659 let retry_count = timeline
3660 .events
3661 .iter()
3662 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3663 .count();
3664 assert_eq!(retry_count, 2, "Should have 2 retry events");
3665 }
3666
3667 #[tokio::test]
3668 async fn retry_then_dlq_exhausts_retries_and_sends_to_dlq() {
3669 let mut bus = Bus::new();
3671 bus.insert(Timeline::new());
3672
3673 let dlq_sink = MockDlqSink {
3674 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3675 };
3676
3677 let axon = Axon::<i32, i32, String>::start("RetryExhaust")
3678 .then(AlwaysFault)
3679 .with_dlq_policy(DlqPolicy::RetryThenDlq {
3680 max_attempts: 3,
3681 backoff_ms: 1,
3682 })
3683 .with_dlq_sink(dlq_sink.clone());
3684 let outcome = axon.execute(42, &(), &mut bus).await;
3685
3686 assert!(
3687 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
3688 "Expected Fault(boom), got {:?}",
3689 outcome
3690 );
3691
3692 let letters = dlq_sink.letters.lock().await;
3694 assert_eq!(letters.len(), 1, "Should have 1 dead letter");
3695
3696 let timeline = bus.read::<Timeline>().unwrap();
3698 let retry_count = timeline
3699 .events
3700 .iter()
3701 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3702 .count();
3703 let dlq_count = timeline
3704 .events
3705 .iter()
3706 .filter(|e| matches!(e, TimelineEvent::DlqExhausted { .. }))
3707 .count();
3708 assert_eq!(
3709 retry_count, 2,
3710 "Should have 2 retry events (attempts 2 and 3)"
3711 );
3712 assert_eq!(dlq_count, 1, "Should have 1 DlqExhausted event");
3713 }
3714
3715 #[tokio::test]
3716 async fn send_to_dlq_policy_sends_immediately_without_retry() {
3717 let mut bus = Bus::new();
3718 bus.insert(Timeline::new());
3719
3720 let dlq_sink = MockDlqSink {
3721 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3722 };
3723
3724 let axon = Axon::<i32, i32, String>::start("SendDlq")
3725 .then(AlwaysFault)
3726 .with_dlq_policy(DlqPolicy::SendToDlq)
3727 .with_dlq_sink(dlq_sink.clone());
3728 let outcome = axon.execute(1, &(), &mut bus).await;
3729
3730 assert!(matches!(outcome, Outcome::Fault(_)));
3731
3732 let letters = dlq_sink.letters.lock().await;
3734 assert_eq!(letters.len(), 1);
3735
3736 let timeline = bus.read::<Timeline>().unwrap();
3738 let retry_count = timeline
3739 .events
3740 .iter()
3741 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3742 .count();
3743 assert_eq!(retry_count, 0);
3744 }
3745
3746 #[tokio::test]
3747 async fn drop_policy_does_not_send_to_dlq() {
3748 let mut bus = Bus::new();
3749
3750 let dlq_sink = MockDlqSink {
3751 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3752 };
3753
3754 let axon = Axon::<i32, i32, String>::start("DropDlq")
3755 .then(AlwaysFault)
3756 .with_dlq_policy(DlqPolicy::Drop)
3757 .with_dlq_sink(dlq_sink.clone());
3758 let outcome = axon.execute(1, &(), &mut bus).await;
3759
3760 assert!(matches!(outcome, Outcome::Fault(_)));
3761
3762 let letters = dlq_sink.letters.lock().await;
3764 assert!(letters.is_empty());
3765 }
3766
3767 #[tokio::test]
3768 async fn dynamic_policy_hot_reload_changes_dlq_behavior() {
3769 use ranvier_core::policy::DynamicPolicy;
3770
3771 let (tx, dynamic) = DynamicPolicy::new(DlqPolicy::Drop);
3773 let dlq_sink = MockDlqSink {
3774 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3775 };
3776
3777 let axon = Axon::<i32, i32, String>::start("DynamicDlq")
3778 .then(AlwaysFault)
3779 .with_dynamic_dlq_policy(dynamic)
3780 .with_dlq_sink(dlq_sink.clone());
3781
3782 let mut bus = Bus::new();
3784 let outcome = axon.execute(1, &(), &mut bus).await;
3785 assert!(matches!(outcome, Outcome::Fault(_)));
3786 assert!(
3787 dlq_sink.letters.lock().await.is_empty(),
3788 "Drop policy should produce no DLQ entries"
3789 );
3790
3791 tx.send(DlqPolicy::SendToDlq).unwrap();
3793
3794 let mut bus2 = Bus::new();
3796 let outcome2 = axon.execute(2, &(), &mut bus2).await;
3797 assert!(matches!(outcome2, Outcome::Fault(_)));
3798 assert_eq!(
3799 dlq_sink.letters.lock().await.len(),
3800 1,
3801 "SendToDlq policy should produce 1 DLQ entry"
3802 );
3803 }
3804
3805 #[tokio::test]
3806 async fn dynamic_saga_policy_hot_reload() {
3807 use ranvier_core::policy::DynamicPolicy;
3808 use ranvier_core::saga::SagaPolicy;
3809
3810 let (tx, dynamic) = DynamicPolicy::new(SagaPolicy::Disabled);
3812
3813 let axon = Axon::<i32, i32, TestInfallible>::start("DynamicSaga")
3814 .then(AddOne)
3815 .with_dynamic_saga_policy(dynamic);
3816
3817 let mut bus = Bus::new();
3819 let _outcome = axon.execute(1, &(), &mut bus).await;
3820 assert!(
3821 bus.read::<SagaStack>().is_none() || bus.read::<SagaStack>().unwrap().is_empty(),
3822 "SagaStack should be absent or empty when disabled"
3823 );
3824
3825 tx.send(SagaPolicy::Enabled).unwrap();
3827
3828 let mut bus2 = Bus::new();
3830 let _outcome2 = axon.execute(10, &(), &mut bus2).await;
3831 assert!(
3832 bus2.read::<SagaStack>().is_some(),
3833 "SagaStack should exist when saga is enabled"
3834 );
3835 }
3836
3837 mod iam_tests {
3840 use super::*;
3841 use ranvier_core::iam::{IamError, IamIdentity, IamPolicy, IamToken, IamVerifier};
3842
3843 #[derive(Clone)]
3845 struct MockVerifier {
3846 identity: IamIdentity,
3847 should_fail: bool,
3848 }
3849
3850 #[async_trait]
3851 impl IamVerifier for MockVerifier {
3852 async fn verify(&self, _token: &str) -> Result<IamIdentity, IamError> {
3853 if self.should_fail {
3854 Err(IamError::InvalidToken("mock verification failure".into()))
3855 } else {
3856 Ok(self.identity.clone())
3857 }
3858 }
3859 }
3860
3861 #[tokio::test]
3862 async fn iam_require_identity_passes_with_valid_token() {
3863 let verifier = MockVerifier {
3864 identity: IamIdentity::new("alice").with_role("user"),
3865 should_fail: false,
3866 };
3867
3868 let axon = Axon::<i32, i32, TestInfallible>::new("IamTest")
3869 .with_iam(IamPolicy::RequireIdentity, verifier)
3870 .then(AddOne);
3871
3872 let mut bus = Bus::new();
3873 bus.insert(IamToken("valid-token".to_string()));
3874 let outcome = axon.execute(10, &(), &mut bus).await;
3875
3876 assert!(matches!(outcome, Outcome::Next(11)));
3877 let identity = bus
3879 .read::<IamIdentity>()
3880 .expect("IamIdentity should be in Bus");
3881 assert_eq!(identity.subject, "alice");
3882 }
3883
3884 #[tokio::test]
3885 async fn iam_require_identity_rejects_missing_token() {
3886 let verifier = MockVerifier {
3887 identity: IamIdentity::new("ignored"),
3888 should_fail: false,
3889 };
3890
3891 let axon = Axon::<i32, i32, TestInfallible>::new("IamNoToken")
3892 .with_iam(IamPolicy::RequireIdentity, verifier)
3893 .then(AddOne);
3894
3895 let mut bus = Bus::new();
3896 let outcome = axon.execute(10, &(), &mut bus).await;
3898
3899 match &outcome {
3901 Outcome::Emit(label, _) => {
3902 assert_eq!(label, "iam.missing_token");
3903 }
3904 other => panic!("Expected Emit(iam.missing_token), got {:?}", other),
3905 }
3906 }
3907
3908 #[tokio::test]
3909 async fn iam_rejects_failed_verification() {
3910 let verifier = MockVerifier {
3911 identity: IamIdentity::new("ignored"),
3912 should_fail: true,
3913 };
3914
3915 let axon = Axon::<i32, i32, TestInfallible>::new("IamBadToken")
3916 .with_iam(IamPolicy::RequireIdentity, verifier)
3917 .then(AddOne);
3918
3919 let mut bus = Bus::new();
3920 bus.insert(IamToken("bad-token".to_string()));
3921 let outcome = axon.execute(10, &(), &mut bus).await;
3922
3923 match &outcome {
3924 Outcome::Emit(label, _) => {
3925 assert_eq!(label, "iam.verification_failed");
3926 }
3927 other => panic!("Expected Emit(iam.verification_failed), got {:?}", other),
3928 }
3929 }
3930
3931 #[tokio::test]
3932 async fn iam_require_role_passes_with_matching_role() {
3933 let verifier = MockVerifier {
3934 identity: IamIdentity::new("bob").with_role("admin").with_role("user"),
3935 should_fail: false,
3936 };
3937
3938 let axon = Axon::<i32, i32, TestInfallible>::new("IamRole")
3939 .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
3940 .then(AddOne);
3941
3942 let mut bus = Bus::new();
3943 bus.insert(IamToken("token".to_string()));
3944 let outcome = axon.execute(5, &(), &mut bus).await;
3945
3946 assert!(matches!(outcome, Outcome::Next(6)));
3947 }
3948
3949 #[tokio::test]
3950 async fn iam_require_role_denies_without_role() {
3951 let verifier = MockVerifier {
3952 identity: IamIdentity::new("carol").with_role("user"),
3953 should_fail: false,
3954 };
3955
3956 let axon = Axon::<i32, i32, TestInfallible>::new("IamRoleDeny")
3957 .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
3958 .then(AddOne);
3959
3960 let mut bus = Bus::new();
3961 bus.insert(IamToken("token".to_string()));
3962 let outcome = axon.execute(5, &(), &mut bus).await;
3963
3964 match &outcome {
3965 Outcome::Emit(label, _) => {
3966 assert_eq!(label, "iam.policy_denied");
3967 }
3968 other => panic!("Expected Emit(iam.policy_denied), got {:?}", other),
3969 }
3970 }
3971
3972 #[tokio::test]
3973 async fn iam_policy_none_skips_verification() {
3974 let verifier = MockVerifier {
3975 identity: IamIdentity::new("ignored"),
3976 should_fail: true, };
3978
3979 let axon = Axon::<i32, i32, TestInfallible>::new("IamNone")
3980 .with_iam(IamPolicy::None, verifier)
3981 .then(AddOne);
3982
3983 let mut bus = Bus::new();
3984 let outcome = axon.execute(10, &(), &mut bus).await;
3986
3987 assert!(matches!(outcome, Outcome::Next(11)));
3988 }
3989 }
3990
3991 #[derive(Clone)]
3994 struct SchemaTransition;
3995
3996 #[async_trait]
3997 impl Transition<String, String> for SchemaTransition {
3998 type Error = String;
3999 type Resources = ();
4000
4001 fn input_schema(&self) -> Option<serde_json::Value> {
4002 Some(serde_json::json!({
4003 "type": "object",
4004 "required": ["name"],
4005 "properties": {
4006 "name": { "type": "string" }
4007 }
4008 }))
4009 }
4010
4011 async fn run(
4012 &self,
4013 state: String,
4014 _resources: &Self::Resources,
4015 _bus: &mut Bus,
4016 ) -> Outcome<String, Self::Error> {
4017 Outcome::Next(state)
4018 }
4019 }
4020
4021 #[test]
4022 fn then_auto_populates_input_schema_from_transition() {
4023 let axon = Axon::<String, String, String>::new("SchemaTest").then(SchemaTransition);
4024
4025 let last_node = axon.schematic.nodes.last().unwrap();
4027 assert!(last_node.input_schema.is_some());
4028 let schema = last_node.input_schema.as_ref().unwrap();
4029 assert_eq!(schema["type"], "object");
4030 assert_eq!(schema["required"][0], "name");
4031 }
4032
4033 #[test]
4034 fn then_leaves_input_schema_none_when_not_provided() {
4035 let axon = Axon::<i32, i32, TestInfallible>::new("NoSchema").then(AddOne);
4036
4037 let last_node = axon.schematic.nodes.last().unwrap();
4038 assert!(last_node.input_schema.is_none());
4039 }
4040
4041 #[test]
4042 fn with_input_schema_value_sets_on_last_node() {
4043 let schema = serde_json::json!({"type": "integer"});
4044 let axon = Axon::<i32, i32, TestInfallible>::new("ManualSchema")
4045 .then(AddOne)
4046 .with_input_schema_value(schema.clone());
4047
4048 let last_node = axon.schematic.nodes.last().unwrap();
4049 assert_eq!(last_node.input_schema.as_ref().unwrap(), &schema);
4050 }
4051
4052 #[test]
4053 fn with_output_schema_value_sets_on_last_node() {
4054 let schema = serde_json::json!({"type": "integer"});
4055 let axon = Axon::<i32, i32, TestInfallible>::new("OutputSchema")
4056 .then(AddOne)
4057 .with_output_schema_value(schema.clone());
4058
4059 let last_node = axon.schematic.nodes.last().unwrap();
4060 assert_eq!(last_node.output_schema.as_ref().unwrap(), &schema);
4061 }
4062
4063 #[test]
4064 fn schematic_export_includes_schema_fields() {
4065 let axon = Axon::<String, String, String>::new("ExportTest")
4066 .then(SchemaTransition)
4067 .with_output_schema_value(serde_json::json!({"type": "string"}));
4068
4069 let json = serde_json::to_value(&axon.schematic).unwrap();
4070 let nodes = json["nodes"].as_array().unwrap();
4071 let last = nodes.last().unwrap();
4073 assert!(last.get("input_schema").is_some());
4074 assert_eq!(last["input_schema"]["type"], "object");
4075 assert_eq!(last["output_schema"]["type"], "string");
4076 }
4077
4078 #[test]
4079 fn schematic_export_omits_schema_fields_when_none() {
4080 let axon = Axon::<i32, i32, TestInfallible>::new("NoSchemaExport").then(AddOne);
4081
4082 let json = serde_json::to_value(&axon.schematic).unwrap();
4083 let nodes = json["nodes"].as_array().unwrap();
4084 let last = nodes.last().unwrap();
4085 let obj = last.as_object().unwrap();
4086 assert!(!obj.contains_key("input_schema"));
4087 assert!(!obj.contains_key("output_schema"));
4088 }
4089
4090 #[test]
4091 fn schematic_json_roundtrip_preserves_schemas() {
4092 let axon = Axon::<String, String, String>::new("Roundtrip")
4093 .then(SchemaTransition)
4094 .with_output_schema_value(serde_json::json!({"type": "string"}));
4095
4096 let json_str = serde_json::to_string(&axon.schematic).unwrap();
4097 let deserialized: ranvier_core::schematic::Schematic =
4098 serde_json::from_str(&json_str).unwrap();
4099
4100 let last = deserialized.nodes.last().unwrap();
4101 assert!(last.input_schema.is_some());
4102 assert!(last.output_schema.is_some());
4103 assert_eq!(last.input_schema.as_ref().unwrap()["required"][0], "name");
4104 assert_eq!(last.output_schema.as_ref().unwrap()["type"], "string");
4105 }
4106
4107 #[derive(Clone)]
4109 struct MultiplyByTwo;
4110
4111 #[async_trait]
4112 impl Transition<i32, i32> for MultiplyByTwo {
4113 type Error = TestInfallible;
4114 type Resources = ();
4115
4116 async fn run(
4117 &self,
4118 state: i32,
4119 _resources: &Self::Resources,
4120 _bus: &mut Bus,
4121 ) -> Outcome<i32, Self::Error> {
4122 Outcome::Next(state * 2)
4123 }
4124 }
4125
4126 #[derive(Clone)]
4127 struct AddTen;
4128
4129 #[async_trait]
4130 impl Transition<i32, i32> for AddTen {
4131 type Error = TestInfallible;
4132 type Resources = ();
4133
4134 async fn run(
4135 &self,
4136 state: i32,
4137 _resources: &Self::Resources,
4138 _bus: &mut Bus,
4139 ) -> Outcome<i32, Self::Error> {
4140 Outcome::Next(state + 10)
4141 }
4142 }
4143
4144 #[derive(Clone)]
4145 struct AddOneString;
4146
4147 #[async_trait]
4148 impl Transition<i32, i32> for AddOneString {
4149 type Error = String;
4150 type Resources = ();
4151
4152 async fn run(
4153 &self,
4154 state: i32,
4155 _resources: &Self::Resources,
4156 _bus: &mut Bus,
4157 ) -> Outcome<i32, Self::Error> {
4158 Outcome::Next(state + 1)
4159 }
4160 }
4161
4162 #[derive(Clone)]
4163 struct AddTenString;
4164
4165 #[async_trait]
4166 impl Transition<i32, i32> for AddTenString {
4167 type Error = String;
4168 type Resources = ();
4169
4170 async fn run(
4171 &self,
4172 state: i32,
4173 _resources: &Self::Resources,
4174 _bus: &mut Bus,
4175 ) -> Outcome<i32, Self::Error> {
4176 Outcome::Next(state + 10)
4177 }
4178 }
4179
4180 #[tokio::test]
4181 async fn axon_single_step_chain_executes_and_returns_next() {
4182 let mut bus = Bus::new();
4183 let axon = Axon::<i32, i32, TestInfallible>::start("SingleStep").then(AddOne);
4184
4185 let outcome = axon.execute(5, &(), &mut bus).await;
4186 assert!(matches!(outcome, Outcome::Next(6)));
4187 }
4188
4189 #[tokio::test]
4190 async fn axon_three_step_chain_executes_in_order() {
4191 let mut bus = Bus::new();
4192 let axon = Axon::<i32, i32, TestInfallible>::start("ThreeStep")
4193 .then(AddOne)
4194 .then(MultiplyByTwo)
4195 .then(AddTen);
4196
4197 let outcome = axon.execute(5, &(), &mut bus).await;
4199 assert!(matches!(outcome, Outcome::Next(22)));
4200 }
4201
4202 #[tokio::test]
4203 async fn axon_with_fault_in_middle_step_propagates_error() {
4204 let mut bus = Bus::new();
4205
4206 let axon = Axon::<i32, i32, String>::start("FaultInMiddle")
4211 .then(AddOneString)
4212 .then(AlwaysFault)
4213 .then(AddTenString);
4214
4215 let outcome = axon.execute(5, &(), &mut bus).await;
4216 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
4217 }
4218
4219 #[test]
4220 fn axon_schematic_has_correct_node_count_after_chaining() {
4221 let axon = Axon::<i32, i32, TestInfallible>::start("NodeCount")
4222 .then(AddOne)
4223 .then(MultiplyByTwo)
4224 .then(AddTen);
4225
4226 assert_eq!(axon.schematic.nodes.len(), 4);
4228 assert_eq!(axon.schematic.name, "NodeCount");
4229 }
4230
4231 #[tokio::test]
4232 async fn axon_execution_records_timeline_events() {
4233 let mut bus = Bus::new();
4234 bus.insert(Timeline::new());
4235
4236 let axon = Axon::<i32, i32, TestInfallible>::start("TimelineTest")
4237 .then(AddOne)
4238 .then(MultiplyByTwo);
4239
4240 let outcome = axon.execute(3, &(), &mut bus).await;
4241 assert!(matches!(outcome, Outcome::Next(8))); let timeline = bus.read::<Timeline>().unwrap();
4244
4245 let enter_count = timeline
4247 .events
4248 .iter()
4249 .filter(|e| matches!(e, TimelineEvent::NodeEnter { .. }))
4250 .count();
4251 let exit_count = timeline
4252 .events
4253 .iter()
4254 .filter(|e| matches!(e, TimelineEvent::NodeExit { .. }))
4255 .count();
4256
4257 assert!(enter_count >= 1, "Should have at least 1 NodeEnter event");
4259 assert!(exit_count >= 1, "Should have at least 1 NodeExit event");
4260 }
4261}