1use crate::persistence::{
15 CompensationAutoTrigger, CompensationContext, CompensationHandle,
16 CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17 PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use async_trait::async_trait;
20use ranvier_audit::{AuditEvent, AuditSink};
21use ranvier_core::bus::Bus;
22use ranvier_core::cluster::DistributedLock;
23use ranvier_core::event::{DlqPolicy, DlqSink};
24use ranvier_core::outcome::Outcome;
25use ranvier_core::policy::DynamicPolicy;
26use ranvier_core::saga::{SagaPolicy, SagaStack};
27use ranvier_core::schematic::{
28 BusCapabilitySchema, Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation,
29};
30use ranvier_core::timeline::{Timeline, TimelineEvent};
31use ranvier_core::transition::Transition;
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use std::any::type_name;
35use std::ffi::OsString;
36use std::fs;
37use std::future::Future;
38use std::panic::Location;
39use std::path::{Path, PathBuf};
40use std::pin::Pin;
41use std::sync::{Arc, Mutex, OnceLock};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tracing::Instrument;
44
45#[derive(Clone)]
47pub enum ExecutionMode {
48 Local,
50 Singleton {
52 lock_key: String,
53 ttl_ms: u64,
54 lock_provider: Arc<dyn DistributedLock>,
55 },
56}
57
58pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
60
61pub type Executor<In, Out, E, Res> =
65 Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
66
67#[derive(Debug, Clone)]
69pub struct ManualJump {
70 pub target_node: String,
71 pub payload_override: Option<serde_json::Value>,
72}
73
74#[derive(Debug, Clone, Copy)]
76struct StartStep(u64);
77
78#[derive(Debug, Clone)]
80struct ResumptionState {
81 payload: Option<serde_json::Value>,
82}
83
84fn type_name_of<T: ?Sized>() -> String {
86 let full = type_name::<T>();
87 full.split("::").last().unwrap_or(full).to_string()
88}
89
90pub struct Axon<In, Out, E, Res = ()> {
110 pub schematic: Schematic,
112 executor: Executor<In, Out, E, Res>,
114 pub execution_mode: ExecutionMode,
116 pub persistence_store: Option<Arc<dyn crate::persistence::PersistenceStore>>,
118 pub audit_sink: Option<Arc<dyn AuditSink>>,
120 pub dlq_sink: Option<Arc<dyn DlqSink>>,
122 pub dlq_policy: DlqPolicy,
124 pub dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>,
126 pub saga_policy: SagaPolicy,
128 pub dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>,
130 pub saga_compensation_registry:
132 Arc<std::sync::RwLock<ranvier_core::saga::SagaCompensationRegistry<E, Res>>>,
133 pub iam_handle: Option<ranvier_core::iam::IamHandle>,
135}
136
137#[derive(Debug, Clone)]
139pub struct SchematicExportRequest {
140 pub output: Option<PathBuf>,
142}
143
144impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
145 fn clone(&self) -> Self {
146 Self {
147 schematic: self.schematic.clone(),
148 executor: self.executor.clone(),
149 execution_mode: self.execution_mode.clone(),
150 persistence_store: self.persistence_store.clone(),
151 audit_sink: self.audit_sink.clone(),
152 dlq_sink: self.dlq_sink.clone(),
153 dlq_policy: self.dlq_policy.clone(),
154 dynamic_dlq_policy: self.dynamic_dlq_policy.clone(),
155 saga_policy: self.saga_policy.clone(),
156 dynamic_saga_policy: self.dynamic_saga_policy.clone(),
157 saga_compensation_registry: self.saga_compensation_registry.clone(),
158 iam_handle: self.iam_handle.clone(),
159 }
160 }
161}
162
163impl<In, E, Res> Axon<In, In, E, Res>
164where
165 In: Send + Sync + Serialize + DeserializeOwned + 'static,
166 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
167 Res: ranvier_core::transition::ResourceRequirement,
168{
169 #[track_caller]
172 pub fn new(label: &str) -> Self {
173 let caller = Location::caller();
174 Self::start_with_source(label, caller)
175 }
176
177 #[track_caller]
180 pub fn start(label: &str) -> Self {
181 let caller = Location::caller();
182 Self::start_with_source(label, caller)
183 }
184
185 fn start_with_source(label: &str, caller: &'static Location<'static>) -> Self {
186 let node_id = uuid::Uuid::new_v4().to_string();
187 let node = Node {
188 id: node_id,
189 kind: NodeKind::Ingress,
190 label: label.to_string(),
191 description: None,
192 input_type: "void".to_string(),
193 output_type: type_name_of::<In>(),
194 resource_type: type_name_of::<Res>(),
195 metadata: Default::default(),
196 bus_capability: None,
197 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
198 position: None,
199 compensation_node_id: None,
200 input_schema: None,
201 output_schema: None,
202 };
203
204 let mut schematic = Schematic::new(label);
205 schematic.nodes.push(node);
206
207 let executor: Executor<In, In, E, Res> =
208 Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
209
210 Self {
211 schematic,
212 executor,
213 execution_mode: ExecutionMode::Local,
214 persistence_store: None,
215 audit_sink: None,
216 dlq_sink: None,
217 dlq_policy: DlqPolicy::default(),
218 dynamic_dlq_policy: None,
219 saga_policy: SagaPolicy::default(),
220 dynamic_saga_policy: None,
221 saga_compensation_registry: Arc::new(std::sync::RwLock::new(
222 ranvier_core::saga::SagaCompensationRegistry::new(),
223 )),
224 iam_handle: None,
225 }
226 }
227}
228
229impl<In, Out, E, Res> Axon<In, Out, E, Res>
230where
231 In: Send + Sync + Serialize + DeserializeOwned + 'static,
232 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
233 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
234 Res: ranvier_core::transition::ResourceRequirement,
235{
236 pub fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
238 self.execution_mode = mode;
239 self
240 }
241
242 pub fn with_version(mut self, version: impl Into<String>) -> Self {
244 self.schematic.schema_version = version.into();
245 self
246 }
247
248 pub fn with_persistence_store<S>(mut self, store: S) -> Self
250 where
251 S: crate::persistence::PersistenceStore + 'static,
252 {
253 self.persistence_store = Some(Arc::new(store));
254 self
255 }
256
257 pub fn with_audit_sink<S>(mut self, sink: S) -> Self
259 where
260 S: AuditSink + 'static,
261 {
262 self.audit_sink = Some(Arc::new(sink));
263 self
264 }
265
266 pub fn with_dlq_sink<S>(mut self, sink: S) -> Self
268 where
269 S: DlqSink + 'static,
270 {
271 self.dlq_sink = Some(Arc::new(sink));
272 self
273 }
274
275 pub fn with_dlq_policy(mut self, policy: DlqPolicy) -> Self {
277 self.dlq_policy = policy;
278 self
279 }
280
281 pub fn with_saga_policy(mut self, policy: SagaPolicy) -> Self {
283 self.saga_policy = policy;
284 self
285 }
286
287 pub fn with_dynamic_dlq_policy(mut self, dynamic: DynamicPolicy<DlqPolicy>) -> Self {
290 self.dynamic_dlq_policy = Some(dynamic);
291 self
292 }
293
294 pub fn with_dynamic_saga_policy(mut self, dynamic: DynamicPolicy<SagaPolicy>) -> Self {
297 self.dynamic_saga_policy = Some(dynamic);
298 self
299 }
300
301 pub fn with_iam(
309 mut self,
310 policy: ranvier_core::iam::IamPolicy,
311 verifier: impl ranvier_core::iam::IamVerifier + 'static,
312 ) -> Self {
313 self.iam_handle = Some(ranvier_core::iam::IamHandle::new(
314 policy,
315 Arc::new(verifier),
316 ));
317 self
318 }
319
320 #[cfg(feature = "schema")]
331 pub fn with_input_schema<T>(mut self) -> Self
332 where
333 T: schemars::JsonSchema,
334 {
335 if let Some(last_node) = self.schematic.nodes.last_mut() {
336 let schema = schemars::schema_for!(T);
337 last_node.input_schema =
338 Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
339 }
340 self
341 }
342
343 #[cfg(feature = "schema")]
347 pub fn with_output_schema<T>(mut self) -> Self
348 where
349 T: schemars::JsonSchema,
350 {
351 if let Some(last_node) = self.schematic.nodes.last_mut() {
352 let schema = schemars::schema_for!(T);
353 last_node.output_schema =
354 Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
355 }
356 self
357 }
358
359 pub fn with_input_schema_value(mut self, schema: serde_json::Value) -> Self {
363 if let Some(last_node) = self.schematic.nodes.last_mut() {
364 last_node.input_schema = Some(schema);
365 }
366 self
367 }
368
369 pub fn with_output_schema_value(mut self, schema: serde_json::Value) -> Self {
371 if let Some(last_node) = self.schematic.nodes.last_mut() {
372 last_node.output_schema = Some(schema);
373 }
374 self
375 }
376}
377
378#[async_trait]
379impl<In, Out, E, Res> ranvier_inspector::StateInspector for Axon<In, Out, E, Res>
380where
381 In: Send + Sync + Serialize + DeserializeOwned + 'static,
382 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
383 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
384 Res: ranvier_core::transition::ResourceRequirement,
385{
386 async fn get_state(&self, trace_id: &str) -> Option<serde_json::Value> {
387 let store = self.persistence_store.as_ref()?;
388 let trace = store.load(trace_id).await.ok().flatten()?;
389 Some(serde_json::to_value(trace).unwrap_or(serde_json::Value::Null))
390 }
391
392 async fn force_resume(
393 &self,
394 trace_id: &str,
395 target_node: &str,
396 payload_override: Option<Value>,
397 ) -> Result<(), String> {
398 let store = self
399 .persistence_store
400 .as_ref()
401 .ok_or("No persistence store attached to Axon")?;
402
403 let intervention = crate::persistence::Intervention {
404 target_node: target_node.to_string(),
405 payload_override,
406 timestamp_ms: now_ms(),
407 };
408
409 store
410 .save_intervention(trace_id, intervention)
411 .await
412 .map_err(|e| format!("Failed to save intervention: {}", e))?;
413
414 if let Some(sink) = self.audit_sink.as_ref() {
415 let event = AuditEvent::new(
416 uuid::Uuid::new_v4().to_string(),
417 "Inspector".to_string(),
418 "ForceResume".to_string(),
419 trace_id.to_string(),
420 )
421 .with_metadata("target_node", target_node);
422
423 let _ = sink.append(&event).await;
424 }
425
426 tracing::info!(trace_id = %trace_id, target_node = %target_node, "Force resume requested via Inspector");
427 Ok(())
428 }
429}
430
431impl<In, Out, E, Res> Axon<In, Out, E, Res>
432where
433 In: Send + Sync + Serialize + DeserializeOwned + 'static,
434 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
435 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
436 Res: ranvier_core::transition::ResourceRequirement,
437{
438 #[track_caller]
442 pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
443 where
444 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
445 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
446 {
447 let caller = Location::caller();
448 let Axon {
450 mut schematic,
451 executor: prev_executor,
452 execution_mode,
453 persistence_store,
454 audit_sink,
455 dlq_sink,
456 dlq_policy,
457 dynamic_dlq_policy,
458 saga_policy,
459 dynamic_saga_policy,
460 saga_compensation_registry,
461 iam_handle,
462 } = self;
463
464 let next_node_id = uuid::Uuid::new_v4().to_string();
466 let next_node = Node {
467 id: next_node_id.clone(),
468 kind: NodeKind::Atom,
469 label: transition.label(),
470 description: transition.description(),
471 input_type: type_name_of::<Out>(),
472 output_type: type_name_of::<Next>(),
473 resource_type: type_name_of::<Res>(),
474 metadata: Default::default(),
475 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
476 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
477 position: transition
478 .position()
479 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
480 compensation_node_id: None,
481 input_schema: transition.input_schema(),
482 output_schema: None,
483 };
484
485 let last_node_id = schematic
486 .nodes
487 .last()
488 .map(|n| n.id.clone())
489 .unwrap_or_default();
490
491 schematic.nodes.push(next_node);
492 schematic.edges.push(Edge {
493 from: last_node_id,
494 to: next_node_id.clone(),
495 kind: EdgeType::Linear,
496 label: Some("Next".to_string()),
497 });
498
499 let node_id_for_exec = next_node_id.clone();
501 let node_label_for_exec = transition.label();
502 let bus_policy_for_exec = transition.bus_access_policy();
503 let bus_policy_clone = bus_policy_for_exec.clone();
504 let current_step_idx = schematic.nodes.len() as u64 - 1;
505 let next_executor: Executor<In, Next, E, Res> = Arc::new(
506 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
507 let prev = prev_executor.clone();
508 let trans = transition.clone();
509 let timeline_node_id = node_id_for_exec.clone();
510 let timeline_node_label = node_label_for_exec.clone();
511 let transition_bus_policy = bus_policy_clone.clone();
512 let step_idx = current_step_idx;
513
514 Box::pin(async move {
515 if let Some(jump) = bus.read::<ManualJump>()
517 && (jump.target_node == timeline_node_id
518 || jump.target_node == timeline_node_label)
519 {
520 tracing::info!(
521 node_id = %timeline_node_id,
522 node_label = %timeline_node_label,
523 "Manual jump target reached; skipping previous steps"
524 );
525
526 let state = if let Some(ow) = jump.payload_override.clone() {
527 match serde_json::from_value::<Out>(ow) {
528 Ok(s) => s,
529 Err(e) => {
530 tracing::error!(
531 "Payload override deserialization failed: {}",
532 e
533 );
534 return Outcome::emit(
535 "execution.jump.payload_error",
536 Some(serde_json::json!({"error": e.to_string()})),
537 )
538 .map(|_: ()| unreachable!());
539 }
540 }
541 } else {
542 return Outcome::emit(
546 "execution.jump.missing_payload",
547 Some(serde_json::json!({"node_id": timeline_node_id})),
548 );
549 };
550
551 return run_this_step::<Out, Next, E, Res>(
553 &trans,
554 state,
555 res,
556 bus,
557 &timeline_node_id,
558 &timeline_node_label,
559 &transition_bus_policy,
560 step_idx,
561 )
562 .await;
563 }
564
565 if let Some(start) = bus.read::<StartStep>()
567 && step_idx == start.0
568 && bus.read::<ResumptionState>().is_some()
569 {
570 let fresh_state = serde_json::to_value(&input)
574 .ok()
575 .and_then(|v| serde_json::from_value::<Out>(v).ok());
576 let persisted_state = bus
577 .read::<ResumptionState>()
578 .and_then(|r| r.payload.clone())
579 .and_then(|p| serde_json::from_value::<Out>(p).ok());
580
581 if let Some(s) = fresh_state.or(persisted_state) {
582 tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint");
583 return run_this_step::<Out, Next, E, Res>(
584 &trans,
585 s,
586 res,
587 bus,
588 &timeline_node_id,
589 &timeline_node_label,
590 &transition_bus_policy,
591 step_idx,
592 )
593 .await;
594 }
595
596 return Outcome::emit(
597 "execution.resumption.payload_error",
598 Some(serde_json::json!({"error": "no compatible resumption state"})),
599 )
600 .map(|_: ()| unreachable!());
601 }
602
603 let prev_result = prev(input, res, bus).await;
605
606 let state = match prev_result {
608 Outcome::Next(t) => t,
609 other => return other.map(|_| unreachable!()),
610 };
611
612 run_this_step::<Out, Next, E, Res>(
613 &trans,
614 state,
615 res,
616 bus,
617 &timeline_node_id,
618 &timeline_node_label,
619 &transition_bus_policy,
620 step_idx,
621 )
622 .await
623 })
624 },
625 );
626 Axon {
627 schematic,
628 executor: next_executor,
629 execution_mode,
630 persistence_store,
631 audit_sink,
632 dlq_sink,
633 dlq_policy,
634 dynamic_dlq_policy,
635 saga_policy,
636 dynamic_saga_policy,
637 saga_compensation_registry,
638 iam_handle,
639 }
640 }
641
642 #[track_caller]
647 pub fn then_compensated<Next, Trans, Comp>(
648 self,
649 transition: Trans,
650 compensation: Comp,
651 ) -> Axon<In, Next, E, Res>
652 where
653 Out: Clone,
654 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
655 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
656 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
657 {
658 let caller = Location::caller();
659 let Axon {
660 mut schematic,
661 executor: prev_executor,
662 execution_mode,
663 persistence_store,
664 audit_sink,
665 dlq_sink,
666 dlq_policy,
667 dynamic_dlq_policy,
668 saga_policy,
669 dynamic_saga_policy,
670 saga_compensation_registry,
671 iam_handle,
672 } = self;
673
674 let next_node_id = uuid::Uuid::new_v4().to_string();
676 let comp_node_id = uuid::Uuid::new_v4().to_string();
677
678 let next_node = Node {
679 id: next_node_id.clone(),
680 kind: NodeKind::Atom,
681 label: transition.label(),
682 description: transition.description(),
683 input_type: type_name_of::<Out>(),
684 output_type: type_name_of::<Next>(),
685 resource_type: type_name_of::<Res>(),
686 metadata: Default::default(),
687 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
688 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
689 position: transition
690 .position()
691 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
692 compensation_node_id: Some(comp_node_id.clone()),
693 input_schema: None,
694 output_schema: None,
695 };
696
697 let comp_node = Node {
699 id: comp_node_id.clone(),
700 kind: NodeKind::Atom,
701 label: format!("Compensate: {}", compensation.label()),
702 description: compensation.description(),
703 input_type: type_name_of::<Out>(),
704 output_type: "void".to_string(),
705 resource_type: type_name_of::<Res>(),
706 metadata: Default::default(),
707 bus_capability: None,
708 source_location: None,
709 position: compensation
710 .position()
711 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
712 compensation_node_id: None,
713 input_schema: None,
714 output_schema: None,
715 };
716
717 let last_node_id = schematic
718 .nodes
719 .last()
720 .map(|n| n.id.clone())
721 .unwrap_or_default();
722
723 schematic.nodes.push(next_node);
724 schematic.nodes.push(comp_node);
725 schematic.edges.push(Edge {
726 from: last_node_id,
727 to: next_node_id.clone(),
728 kind: EdgeType::Linear,
729 label: Some("Next".to_string()),
730 });
731
732 let node_id_for_exec = next_node_id.clone();
734 let comp_id_for_exec = comp_node_id.clone();
735 let node_label_for_exec = transition.label();
736 let bus_policy_for_exec = transition.bus_access_policy();
737 let step_idx_for_exec = schematic.nodes.len() as u64 - 2;
738 let comp_for_exec = compensation.clone();
739 let bus_policy_for_executor = bus_policy_for_exec.clone();
740 let bus_policy_for_registry = bus_policy_for_exec.clone();
741 let next_executor: Executor<In, Next, E, Res> = Arc::new(
742 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
743 let prev = prev_executor.clone();
744 let trans = transition.clone();
745 let comp = comp_for_exec.clone();
746 let timeline_node_id = node_id_for_exec.clone();
747 let timeline_comp_id = comp_id_for_exec.clone();
748 let timeline_node_label = node_label_for_exec.clone();
749 let transition_bus_policy = bus_policy_for_executor.clone();
750 let step_idx = step_idx_for_exec;
751
752 Box::pin(async move {
753 if let Some(jump) = bus.read::<ManualJump>()
755 && (jump.target_node == timeline_node_id
756 || jump.target_node == timeline_node_label)
757 {
758 tracing::info!(
759 node_id = %timeline_node_id,
760 node_label = %timeline_node_label,
761 "Manual jump target reached (compensated); skipping previous steps"
762 );
763
764 let state = if let Some(ow) = jump.payload_override.clone() {
765 match serde_json::from_value::<Out>(ow) {
766 Ok(s) => s,
767 Err(e) => {
768 tracing::error!(
769 "Payload override deserialization failed: {}",
770 e
771 );
772 return Outcome::emit(
773 "execution.jump.payload_error",
774 Some(serde_json::json!({"error": e.to_string()})),
775 )
776 .map(|_: ()| unreachable!());
777 }
778 }
779 } else {
780 return Outcome::emit(
781 "execution.jump.missing_payload",
782 Some(serde_json::json!({"node_id": timeline_node_id})),
783 )
784 .map(|_: ()| unreachable!());
785 };
786
787 return run_this_compensated_step::<Out, Next, E, Res, Comp>(
789 &trans,
790 &comp,
791 state,
792 res,
793 bus,
794 &timeline_node_id,
795 &timeline_comp_id,
796 &timeline_node_label,
797 &transition_bus_policy,
798 step_idx,
799 )
800 .await;
801 }
802
803 if let Some(start) = bus.read::<StartStep>()
805 && step_idx == start.0
806 && bus.read::<ResumptionState>().is_some()
807 {
808 let fresh_state = serde_json::to_value(&input)
809 .ok()
810 .and_then(|v| serde_json::from_value::<Out>(v).ok());
811 let persisted_state = bus
812 .read::<ResumptionState>()
813 .and_then(|r| r.payload.clone())
814 .and_then(|p| serde_json::from_value::<Out>(p).ok());
815
816 if let Some(s) = fresh_state.or(persisted_state) {
817 tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint (compensated)");
818 return run_this_compensated_step::<Out, Next, E, Res, Comp>(
819 &trans,
820 &comp,
821 s,
822 res,
823 bus,
824 &timeline_node_id,
825 &timeline_comp_id,
826 &timeline_node_label,
827 &transition_bus_policy,
828 step_idx,
829 )
830 .await;
831 }
832
833 return Outcome::emit(
834 "execution.resumption.payload_error",
835 Some(serde_json::json!({"error": "no compatible resumption state"})),
836 )
837 .map(|_: ()| unreachable!());
838 }
839
840 let prev_result = prev(input, res, bus).await;
842
843 let state = match prev_result {
845 Outcome::Next(t) => t,
846 other => return other.map(|_| unreachable!()),
847 };
848
849 run_this_compensated_step::<Out, Next, E, Res, Comp>(
850 &trans,
851 &comp,
852 state,
853 res,
854 bus,
855 &timeline_node_id,
856 &timeline_comp_id,
857 &timeline_node_label,
858 &transition_bus_policy,
859 step_idx,
860 )
861 .await
862 })
863 },
864 );
865 {
867 let mut registry = saga_compensation_registry.write().unwrap();
868 let comp_fn = compensation.clone();
869 let transition_bus_policy = bus_policy_for_registry.clone();
870
871 let handler: ranvier_core::saga::SagaCompensationFn<E, Res> =
872 Arc::new(move |input_data, res, bus| {
873 let comp = comp_fn.clone();
874 let bus_policy = transition_bus_policy.clone();
875 Box::pin(async move {
876 let input: Out = serde_json::from_slice(&input_data).unwrap();
877 bus.set_access_policy(comp.label(), bus_policy);
878 let res = comp.run(input, res, bus).await;
879 bus.clear_access_policy();
880 res
881 })
882 });
883 registry.register(next_node_id.clone(), handler);
884 }
885
886 Axon {
887 schematic,
888 executor: next_executor,
889 execution_mode,
890 persistence_store,
891 audit_sink,
892 dlq_sink,
893 dlq_policy,
894 dynamic_dlq_policy,
895 saga_policy,
896 dynamic_saga_policy,
897 saga_compensation_registry,
898 iam_handle,
899 }
900 }
901
902 #[track_caller]
905 pub fn compensate_with<Comp>(mut self, transition: Comp) -> Self
906 where
907 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
908 {
909 let caller = Location::caller();
912 let comp_node_id = uuid::Uuid::new_v4().to_string();
913
914 let comp_node = Node {
915 id: comp_node_id.clone(),
916 kind: NodeKind::Atom,
917 label: transition.label(),
918 description: transition.description(),
919 input_type: type_name_of::<Out>(),
920 output_type: "void".to_string(),
921 resource_type: type_name_of::<Res>(),
922 metadata: Default::default(),
923 bus_capability: None,
924 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
925 position: transition
926 .position()
927 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
928 compensation_node_id: None,
929 input_schema: None,
930 output_schema: None,
931 };
932
933 if let Some(last_node) = self.schematic.nodes.last_mut() {
934 last_node.compensation_node_id = Some(comp_node_id.clone());
935 }
936
937 self.schematic.nodes.push(comp_node);
938 self
939 }
940
941 #[track_caller]
943 pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
944 let caller = Location::caller();
945 let branch_id_str = branch_id.into();
946 let last_node_id = self
947 .schematic
948 .nodes
949 .last()
950 .map(|n| n.id.clone())
951 .unwrap_or_default();
952
953 let branch_node = Node {
954 id: uuid::Uuid::new_v4().to_string(),
955 kind: NodeKind::Synapse,
956 label: label.to_string(),
957 description: None,
958 input_type: type_name_of::<Out>(),
959 output_type: type_name_of::<Out>(),
960 resource_type: type_name_of::<Res>(),
961 metadata: Default::default(),
962 bus_capability: None,
963 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
964 position: None,
965 compensation_node_id: None,
966 input_schema: None,
967 output_schema: None,
968 };
969
970 self.schematic.nodes.push(branch_node);
971 self.schematic.edges.push(Edge {
972 from: last_node_id,
973 to: branch_id_str.clone(),
974 kind: EdgeType::Branch(branch_id_str),
975 label: Some("Branch".to_string()),
976 });
977
978 self
979 }
980
981 pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
983 if let ExecutionMode::Singleton {
984 lock_key,
985 ttl_ms,
986 lock_provider,
987 } = &self.execution_mode
988 {
989 let trace_span = tracing::info_span!("Singleton Execution", key = %lock_key);
990 let _enter = trace_span.enter();
991 match lock_provider.try_acquire(lock_key, *ttl_ms).await {
992 Ok(true) => {
993 tracing::debug!("Successfully acquired singleton lock: {}", lock_key);
994 }
995 Ok(false) => {
996 tracing::debug!(
997 "Singleton lock {} already held, aborting execution.",
998 lock_key
999 );
1000 return Outcome::emit(
1002 "execution.skipped.lock_held",
1003 Some(serde_json::json!({
1004 "lock_key": lock_key
1005 })),
1006 );
1007 }
1008 Err(e) => {
1009 tracing::error!("Failed to check singleton lock {}: {:?}", lock_key, e);
1010 return Outcome::emit(
1011 "execution.skipped.lock_error",
1012 Some(serde_json::json!({
1013 "error": e.to_string()
1014 })),
1015 );
1016 }
1017 }
1018 }
1019
1020 if let Some(iam) = &self.iam_handle {
1022 use ranvier_core::iam::{IamPolicy, IamToken, enforce_policy};
1023
1024 if matches!(iam.policy, IamPolicy::None) {
1025 } else {
1027 let token = bus.read::<IamToken>().map(|t| t.0.clone());
1028
1029 match token {
1030 Some(raw_token) => {
1031 match iam.verifier.verify(&raw_token).await {
1032 Ok(identity) => {
1033 if let Err(e) = enforce_policy(&iam.policy, &identity) {
1034 tracing::warn!(
1035 policy = ?iam.policy,
1036 subject = %identity.subject,
1037 "IAM policy enforcement failed: {}",
1038 e
1039 );
1040 return Outcome::emit(
1041 "iam.policy_denied",
1042 Some(serde_json::json!({
1043 "error": e.to_string(),
1044 "subject": identity.subject,
1045 })),
1046 );
1047 }
1048 bus.insert(identity);
1050 }
1051 Err(e) => {
1052 tracing::warn!("IAM token verification failed: {}", e);
1053 return Outcome::emit(
1054 "iam.verification_failed",
1055 Some(serde_json::json!({
1056 "error": e.to_string()
1057 })),
1058 );
1059 }
1060 }
1061 }
1062 None => {
1063 tracing::warn!("IAM policy requires token but none found in Bus");
1064 return Outcome::emit("iam.missing_token", None);
1065 }
1066 }
1067 }
1068 }
1069
1070 let trace_id = persistence_trace_id(bus);
1071 let label = self.schematic.name.clone();
1072
1073 if let Some(sink) = &self.dlq_sink {
1075 bus.insert(sink.clone());
1076 }
1077 let effective_dlq_policy = self
1079 .dynamic_dlq_policy
1080 .as_ref()
1081 .map(|d| d.current())
1082 .unwrap_or_else(|| self.dlq_policy.clone());
1083 bus.insert(effective_dlq_policy);
1084 bus.insert(self.schematic.clone());
1085 let effective_saga_policy = self
1086 .dynamic_saga_policy
1087 .as_ref()
1088 .map(|d| d.current())
1089 .unwrap_or_else(|| self.saga_policy.clone());
1090 bus.insert(effective_saga_policy.clone());
1091
1092 if effective_saga_policy == SagaPolicy::Enabled && bus.read::<SagaStack>().is_none() {
1094 bus.insert(SagaStack::new());
1095 }
1096
1097 let persistence_handle = bus.read::<PersistenceHandle>().cloned();
1098 let compensation_handle = bus.read::<CompensationHandle>().cloned();
1099 let compensation_retry_policy = compensation_retry_policy(bus);
1100 let compensation_idempotency = bus.read::<CompensationIdempotencyHandle>().cloned();
1101 let version = self.schematic.schema_version.clone();
1102 let migration_registry = bus
1103 .read::<ranvier_core::schematic::MigrationRegistry>()
1104 .cloned();
1105
1106 let persistence_start_step = if let Some(handle) = persistence_handle.as_ref() {
1107 let (mut start_step, trace_version, intervention, last_node_id, mut last_payload) =
1108 load_persistence_version(handle, &trace_id).await;
1109
1110 if let Some(interv) = intervention {
1111 tracing::info!(
1112 trace_id = %trace_id,
1113 target_node = %interv.target_node,
1114 "Applying manual intervention command"
1115 );
1116
1117 if let Some(target_idx) = self
1119 .schematic
1120 .nodes
1121 .iter()
1122 .position(|n| n.id == interv.target_node || n.label == interv.target_node)
1123 {
1124 tracing::info!(
1125 trace_id = %trace_id,
1126 target_node = %interv.target_node,
1127 target_step = target_idx,
1128 "Intervention: Jumping to target node"
1129 );
1130 start_step = target_idx as u64;
1131
1132 bus.insert(ManualJump {
1134 target_node: interv.target_node.clone(),
1135 payload_override: interv.payload_override.clone(),
1136 });
1137
1138 if let Some(sink) = self.audit_sink.as_ref() {
1140 let event = AuditEvent::new(
1141 uuid::Uuid::new_v4().to_string(),
1142 "System".to_string(),
1143 "ApplyIntervention".to_string(),
1144 trace_id.to_string(),
1145 )
1146 .with_metadata("target_node", interv.target_node.clone())
1147 .with_metadata("target_step", target_idx);
1148
1149 let _ = sink.append(&event).await;
1150 }
1151 } else {
1152 tracing::warn!(
1153 trace_id = %trace_id,
1154 target_node = %interv.target_node,
1155 "Intervention target node not found in schematic; ignoring jump"
1156 );
1157 }
1158 }
1159
1160 if let Some(old_version) = trace_version
1161 && old_version != version
1162 {
1163 tracing::info!(
1164 trace_id = %trace_id,
1165 old_version = %old_version,
1166 current_version = %version,
1167 "Version mismatch detected during resumption"
1168 );
1169
1170 let migration_path = migration_registry
1172 .as_ref()
1173 .and_then(|r| r.find_migration_path(&old_version, &version));
1174
1175 let (final_migration, mapped_payload) = if let Some(path) = migration_path {
1176 if path.is_empty() {
1177 (None, last_payload.clone())
1178 } else {
1179 let mut payload = last_payload.clone();
1181 for hop in &path {
1182 if let (Some(mapper), Some(p)) = (&hop.payload_mapper, payload.as_ref())
1183 {
1184 match mapper.map_state(p) {
1185 Ok(mapped) => payload = Some(mapped),
1186 Err(e) => {
1187 tracing::error!(
1188 trace_id = %trace_id,
1189 from = %hop.from_version,
1190 to = %hop.to_version,
1191 error = %e,
1192 "Payload migration mapper failed"
1193 );
1194 return Outcome::emit(
1195 "execution.resumption.payload_migration_failed",
1196 Some(serde_json::json!({
1197 "trace_id": trace_id,
1198 "from": hop.from_version,
1199 "to": hop.to_version,
1200 "error": e.to_string()
1201 })),
1202 );
1203 }
1204 }
1205 }
1206 }
1207 let hops: Vec<String> = path
1208 .iter()
1209 .map(|h| format!("{}->{}", h.from_version, h.to_version))
1210 .collect();
1211 tracing::info!(trace_id = %trace_id, hops = ?hops, "Applied multi-hop migration path");
1212 (path.last().copied(), payload)
1213 }
1214 } else {
1215 (None, last_payload.clone())
1216 };
1217
1218 let migration = final_migration.or_else(|| {
1220 migration_registry
1221 .as_ref()
1222 .and_then(|r| r.find_migration(&old_version, &version))
1223 });
1224
1225 if mapped_payload.is_some() {
1227 last_payload = mapped_payload;
1228 }
1229
1230 let strategy = if let (Some(m), Some(node_id)) = (migration, last_node_id.as_ref())
1231 {
1232 m.node_mapping
1233 .get(node_id)
1234 .cloned()
1235 .unwrap_or(m.default_strategy.clone())
1236 } else {
1237 migration
1238 .map(|m| m.default_strategy.clone())
1239 .unwrap_or(ranvier_core::schematic::MigrationStrategy::Fail)
1240 };
1241
1242 match strategy {
1243 ranvier_core::schematic::MigrationStrategy::ResumeFromStart => {
1244 tracing::info!(trace_id = %trace_id, "Applying ResumeFromStart migration strategy");
1245 start_step = 0;
1246 }
1247 ranvier_core::schematic::MigrationStrategy::MigrateActiveNode {
1248 new_node_id,
1249 ..
1250 } => {
1251 tracing::info!(trace_id = %trace_id, to_node = %new_node_id, "Applying MigrateActiveNode strategy");
1252 if let Some(target_idx) = self
1253 .schematic
1254 .nodes
1255 .iter()
1256 .position(|n| n.id == new_node_id || n.label == new_node_id)
1257 {
1258 start_step = target_idx as u64;
1259 } else {
1260 tracing::warn!(trace_id = %trace_id, "MigrateActiveNode: target node {} not found", new_node_id);
1261 return Outcome::emit(
1262 "execution.resumption.migration_target_not_found",
1263 Some(serde_json::json!({ "node_id": new_node_id })),
1264 );
1265 }
1266 }
1267 ranvier_core::schematic::MigrationStrategy::FallbackToNode(node_id) => {
1268 tracing::info!(trace_id = %trace_id, to_node = %node_id, "Applying FallbackToNode strategy");
1269 if let Some(target_idx) = self
1270 .schematic
1271 .nodes
1272 .iter()
1273 .position(|n| n.id == node_id || n.label == node_id)
1274 {
1275 start_step = target_idx as u64;
1276 } else {
1277 tracing::warn!(trace_id = %trace_id, "FallbackToNode: node {} not found", node_id);
1278 return Outcome::emit(
1279 "execution.resumption.migration_target_not_found",
1280 Some(serde_json::json!({ "node_id": node_id })),
1281 );
1282 }
1283 }
1284 ranvier_core::schematic::MigrationStrategy::Fail => {
1285 tracing::error!(trace_id = %trace_id, "Version mismatch: no migration path found. Failing resumption.");
1286 return Outcome::emit(
1287 "execution.resumption.version_mismatch_failed",
1288 Some(serde_json::json!({
1289 "trace_id": trace_id,
1290 "old_version": old_version,
1291 "current_version": version
1292 })),
1293 );
1294 }
1295 _ => {
1296 tracing::error!(trace_id = %trace_id, "Unsupported migration strategy: {:?}", strategy);
1297 return Outcome::emit(
1298 "execution.resumption.unsupported_migration",
1299 Some(serde_json::json!({
1300 "trace_id": trace_id,
1301 "strategy": format!("{:?}", strategy)
1302 })),
1303 );
1304 }
1305 }
1306 }
1307
1308 let ingress_node_id = self.schematic.nodes.first().map(|n| n.id.clone());
1309 persist_execution_event(
1310 handle,
1311 &trace_id,
1312 &label,
1313 &version,
1314 start_step,
1315 ingress_node_id,
1316 "Enter",
1317 None,
1318 )
1319 .await;
1320
1321 bus.insert(StartStep(start_step));
1322 if start_step > 0 {
1323 bus.insert(ResumptionState {
1324 payload: last_payload,
1325 });
1326 }
1327
1328 Some(start_step)
1329 } else {
1330 None
1331 };
1332
1333 let should_capture = should_attach_timeline(bus);
1334 let inserted_timeline = if should_capture {
1335 ensure_timeline(bus)
1336 } else {
1337 false
1338 };
1339 let ingress_started = std::time::Instant::now();
1340 let ingress_enter_ts = now_ms();
1341 if should_capture
1342 && let (Some(timeline), Some(ingress)) =
1343 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1344 {
1345 timeline.push(TimelineEvent::NodeEnter {
1346 node_id: ingress.id.clone(),
1347 node_label: ingress.label.clone(),
1348 timestamp: ingress_enter_ts,
1349 });
1350 }
1351
1352 let circuit_span = tracing::info_span!(
1353 "Circuit",
1354 ranvier.circuit = %label,
1355 ranvier.outcome_kind = tracing::field::Empty,
1356 ranvier.outcome_target = tracing::field::Empty
1357 );
1358 let outcome = (self.executor)(input, resources, bus)
1359 .instrument(circuit_span.clone())
1360 .await;
1361 circuit_span.record("ranvier.outcome_kind", outcome_kind_name(&outcome));
1362 if let Some(target) = outcome_target(&outcome) {
1363 circuit_span.record("ranvier.outcome_target", tracing::field::display(&target));
1364 }
1365
1366 if matches!(outcome, Outcome::Fault(_)) && self.saga_policy == SagaPolicy::Enabled {
1368 while let Some(task) = {
1369 let mut stack = bus.read_mut::<SagaStack>();
1370 stack.as_mut().and_then(|s| s.pop())
1371 } {
1372 tracing::info!(trace_id = %trace_id, node_id = %task.node_id, "Compensating step: {}", task.node_label);
1373
1374 let handler = {
1375 let registry = self.saga_compensation_registry.read().unwrap();
1376 registry.get(&task.node_id)
1377 };
1378 if let Some(handler) = handler {
1379 let res = handler(task.input_snapshot, resources, bus).await;
1380 if let Outcome::Fault(e) = res {
1381 tracing::error!(trace_id = %trace_id, node_id = %task.node_id, "Saga compensation FAILED: {:?}", e);
1382 }
1383 } else {
1384 tracing::warn!(trace_id = %trace_id, node_id = %task.node_id, "No compensation handler found in registry for saga rollback");
1385 }
1386 }
1387 tracing::info!(trace_id = %trace_id, "Saga automated rollback completed");
1388 }
1389
1390 let ingress_exit_ts = now_ms();
1391 if should_capture
1392 && let (Some(timeline), Some(ingress)) =
1393 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1394 {
1395 timeline.push(TimelineEvent::NodeExit {
1396 node_id: ingress.id.clone(),
1397 outcome_type: outcome_type_name(&outcome),
1398 duration_ms: ingress_started.elapsed().as_millis() as u64,
1399 timestamp: ingress_exit_ts,
1400 });
1401 }
1402
1403 if let Some(handle) = persistence_handle.as_ref() {
1404 let fault_step = persistence_start_step.map(|s| s + 1).unwrap_or(1);
1405 persist_execution_event(
1406 handle,
1407 &trace_id,
1408 &label,
1409 &version,
1410 fault_step,
1411 None, outcome_kind_name(&outcome),
1413 Some(outcome.to_json_value()),
1414 )
1415 .await;
1416
1417 let mut completion = completion_from_outcome(&outcome);
1418 if matches!(outcome, Outcome::Fault(_))
1419 && let Some(compensation) = compensation_handle.as_ref()
1420 && compensation_auto_trigger(bus)
1421 {
1422 let context = CompensationContext {
1423 trace_id: trace_id.clone(),
1424 circuit: label.clone(),
1425 fault_kind: outcome_kind_name(&outcome).to_string(),
1426 fault_step,
1427 timestamp_ms: now_ms(),
1428 };
1429
1430 if run_compensation(
1431 compensation,
1432 context,
1433 compensation_retry_policy,
1434 compensation_idempotency.clone(),
1435 )
1436 .await
1437 {
1438 persist_execution_event(
1439 handle,
1440 &trace_id,
1441 &label,
1442 &version,
1443 fault_step.saturating_add(1),
1444 None,
1445 "Compensated",
1446 None,
1447 )
1448 .await;
1449 completion = CompletionState::Compensated;
1450 }
1451 }
1452
1453 if persistence_auto_complete(bus) {
1454 persist_completion(handle, &trace_id, completion).await;
1455 }
1456 }
1457
1458 if should_capture {
1459 maybe_export_timeline(bus, &outcome);
1460 }
1461 if inserted_timeline {
1462 let _ = bus.remove::<Timeline>();
1463 }
1464
1465 outcome
1466 }
1467
1468 pub fn serve_inspector(self, port: u16) -> Self {
1471 if !inspector_dev_mode_from_env() {
1472 tracing::info!("Inspector disabled because RANVIER_MODE is production");
1473 return self;
1474 }
1475 if !inspector_enabled_from_env() {
1476 tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
1477 return self;
1478 }
1479
1480 let schematic = self.schematic.clone();
1481 let axon_inspector = Arc::new(self.clone());
1482 tokio::spawn(async move {
1483 if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
1484 .with_projection_files_from_env()
1485 .with_mode_from_env()
1486 .with_auth_policy_from_env()
1487 .with_state_inspector(axon_inspector)
1488 .serve()
1489 .await
1490 {
1491 tracing::error!("Inspector server failed: {}", e);
1492 }
1493 });
1494 self
1495 }
1496
1497 pub fn schematic(&self) -> &Schematic {
1499 &self.schematic
1500 }
1501
1502 pub fn into_schematic(self) -> Schematic {
1504 self.schematic
1505 }
1506
1507 pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
1518 schematic_export_request_from_process()
1519 }
1520
1521 pub fn maybe_export_and_exit(&self) -> anyhow::Result<bool> {
1533 self.maybe_export_and_exit_with(|_| ())
1534 }
1535
1536 pub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> anyhow::Result<bool>
1541 where
1542 F: FnOnce(&SchematicExportRequest),
1543 {
1544 let Some(request) = self.schematic_export_request() else {
1545 return Ok(false);
1546 };
1547 on_before_exit(&request);
1548 self.export_schematic(&request)?;
1549 Ok(true)
1550 }
1551
1552 pub fn export_schematic(&self, request: &SchematicExportRequest) -> anyhow::Result<()> {
1554 let json = serde_json::to_string_pretty(self.schematic())?;
1555 if let Some(path) = &request.output {
1556 if let Some(parent) = path.parent()
1557 && !parent.as_os_str().is_empty()
1558 {
1559 fs::create_dir_all(parent)?;
1560 }
1561 fs::write(path, json.as_bytes())?;
1562 return Ok(());
1563 }
1564 println!("{}", json);
1565 Ok(())
1566 }
1567}
1568
1569fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
1570 let args: Vec<OsString> = std::env::args_os().skip(1).collect();
1571 let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
1572 let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
1573
1574 let mut i = 0;
1575 while i < args.len() {
1576 let arg = args[i].to_string_lossy();
1577
1578 if arg == "--schematic" {
1579 enabled = true;
1580 i += 1;
1581 continue;
1582 }
1583
1584 if arg == "--schematic-output" || arg == "--output" {
1585 if let Some(next) = args.get(i + 1) {
1586 output = Some(PathBuf::from(next));
1587 i += 2;
1588 continue;
1589 }
1590 } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
1591 output = Some(PathBuf::from(value));
1592 } else if let Some(value) = arg.strip_prefix("--output=") {
1593 output = Some(PathBuf::from(value));
1594 }
1595
1596 i += 1;
1597 }
1598
1599 if enabled {
1600 Some(SchematicExportRequest { output })
1601 } else {
1602 None
1603 }
1604}
1605
1606fn env_flag_is_true(key: &str) -> bool {
1607 match std::env::var(key) {
1608 Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
1609 Err(_) => false,
1610 }
1611}
1612
1613fn inspector_enabled_from_env() -> bool {
1614 let raw = std::env::var("RANVIER_INSPECTOR").ok();
1615 inspector_enabled_from_value(raw.as_deref())
1616}
1617
1618fn inspector_enabled_from_value(value: Option<&str>) -> bool {
1619 match value {
1620 Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
1621 None => true,
1622 }
1623}
1624
1625fn inspector_dev_mode_from_env() -> bool {
1626 let raw = std::env::var("RANVIER_MODE").ok();
1627 inspector_dev_mode_from_value(raw.as_deref())
1628}
1629
1630fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
1631 !matches!(
1632 value.map(|v| v.to_ascii_lowercase()),
1633 Some(mode) if mode == "prod" || mode == "production"
1634 )
1635}
1636
1637fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
1638 let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
1639 Ok(v) if !v.trim().is_empty() => v,
1640 _ => return,
1641 };
1642
1643 let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
1644 let policy = timeline_adaptive_policy();
1645 let forced = should_force_export(outcome, &policy);
1646 let should_export = sampled || forced;
1647 if !should_export {
1648 record_sampling_stats(false, sampled, forced, "none", &policy);
1649 return;
1650 }
1651
1652 let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
1653 timeline.sort();
1654
1655 let mode = std::env::var("RANVIER_TIMELINE_MODE")
1656 .unwrap_or_else(|_| "overwrite".to_string())
1657 .to_ascii_lowercase();
1658
1659 if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
1660 tracing::warn!(
1661 "Failed to persist timeline file {} (mode={}): {}",
1662 path,
1663 mode,
1664 err
1665 );
1666 record_sampling_stats(false, sampled, forced, &mode, &policy);
1667 } else {
1668 record_sampling_stats(true, sampled, forced, &mode, &policy);
1669 }
1670}
1671
1672fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
1673 match outcome {
1674 Outcome::Next(_) => "Next".to_string(),
1675 Outcome::Branch(id, _) => format!("Branch:{}", id),
1676 Outcome::Jump(id, _) => format!("Jump:{}", id),
1677 Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
1678 Outcome::Fault(_) => "Fault".to_string(),
1679 }
1680}
1681
1682fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
1683 match outcome {
1684 Outcome::Next(_) => "Next",
1685 Outcome::Branch(_, _) => "Branch",
1686 Outcome::Jump(_, _) => "Jump",
1687 Outcome::Emit(_, _) => "Emit",
1688 Outcome::Fault(_) => "Fault",
1689 }
1690}
1691
1692fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
1693 match outcome {
1694 Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
1695 Outcome::Jump(node_id, _) => Some(node_id.to_string()),
1696 Outcome::Emit(event_type, _) => Some(event_type.clone()),
1697 Outcome::Next(_) | Outcome::Fault(_) => None,
1698 }
1699}
1700
1701fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
1702 match outcome {
1703 Outcome::Fault(_) => CompletionState::Fault,
1704 _ => CompletionState::Success,
1705 }
1706}
1707
1708fn persistence_trace_id(bus: &Bus) -> String {
1709 if let Some(explicit) = bus.read::<PersistenceTraceId>() {
1710 explicit.0.clone()
1711 } else {
1712 format!("{}:{}", bus.id, now_ms())
1713 }
1714}
1715
1716fn persistence_auto_complete(bus: &Bus) -> bool {
1717 bus.read::<PersistenceAutoComplete>()
1718 .map(|v| v.0)
1719 .unwrap_or(true)
1720}
1721
1722fn compensation_auto_trigger(bus: &Bus) -> bool {
1723 bus.read::<CompensationAutoTrigger>()
1724 .map(|v| v.0)
1725 .unwrap_or(true)
1726}
1727
1728fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
1729 bus.read::<CompensationRetryPolicy>()
1730 .copied()
1731 .unwrap_or_default()
1732}
1733
1734fn unwrap_outcome_payload(payload: Option<&serde_json::Value>) -> Option<serde_json::Value> {
1740 payload.map(|p| {
1741 p.get("Next")
1742 .or_else(|| p.get("Branch"))
1743 .or_else(|| p.get("Jump"))
1744 .cloned()
1745 .unwrap_or_else(|| p.clone())
1746 })
1747}
1748
1749async fn load_persistence_version(
1750 handle: &PersistenceHandle,
1751 trace_id: &str,
1752) -> (
1753 u64,
1754 Option<String>,
1755 Option<crate::persistence::Intervention>,
1756 Option<String>,
1757 Option<serde_json::Value>,
1758) {
1759 let store = handle.store();
1760 match store.load(trace_id).await {
1761 Ok(Some(trace)) => {
1762 let (next_step, last_node_id, last_payload) =
1763 if let Some(resume_from_step) = trace.resumed_from_step {
1764 let anchor_event = trace
1765 .events
1766 .iter()
1767 .rev()
1768 .find(|event| {
1769 event.step <= resume_from_step
1770 && event.outcome_kind == "Next"
1771 && event.payload.is_some()
1772 })
1773 .or_else(|| {
1774 trace.events.iter().rev().find(|event| {
1775 event.step <= resume_from_step
1776 && event.outcome_kind != "Fault"
1777 && event.payload.is_some()
1778 })
1779 })
1780 .or_else(|| {
1781 trace.events.iter().rev().find(|event| {
1782 event.step <= resume_from_step && event.payload.is_some()
1783 })
1784 })
1785 .or_else(|| trace.events.last());
1786
1787 (
1788 resume_from_step.saturating_add(1),
1789 anchor_event.and_then(|event| event.node_id.clone()),
1790 anchor_event.and_then(|event| {
1791 unwrap_outcome_payload(event.payload.as_ref())
1792 }),
1793 )
1794 } else {
1795 let last_event = trace.events.last();
1796 (
1797 last_event
1798 .map(|event| event.step.saturating_add(1))
1799 .unwrap_or(0),
1800 last_event.and_then(|event| event.node_id.clone()),
1801 last_event.and_then(|event| {
1802 unwrap_outcome_payload(event.payload.as_ref())
1803 }),
1804 )
1805 };
1806
1807 (
1808 next_step,
1809 Some(trace.schematic_version),
1810 trace.interventions.last().cloned(),
1811 last_node_id,
1812 last_payload,
1813 )
1814 }
1815 Ok(None) => (0, None, None, None, None),
1816 Err(err) => {
1817 tracing::warn!(
1818 trace_id = %trace_id,
1819 error = %err,
1820 "Failed to load persistence trace; falling back to step=0"
1821 );
1822 (0, None, None, None, None)
1823 }
1824 }
1825}
1826
1827#[allow(clippy::too_many_arguments)]
1828async fn run_this_step<In, Out, E, Res>(
1829 trans: &(impl Transition<In, Out, Resources = Res, Error = E> + Clone + 'static),
1830 state: In,
1831 res: &Res,
1832 bus: &mut Bus,
1833 node_id: &str,
1834 node_label: &str,
1835 bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
1836 step_idx: u64,
1837) -> Outcome<Out, E>
1838where
1839 In: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
1840 Out: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
1841 E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
1842 Res: ranvier_core::transition::ResourceRequirement,
1843{
1844 let label = trans.label();
1845 let res_type = std::any::type_name::<Res>()
1846 .split("::")
1847 .last()
1848 .unwrap_or("unknown");
1849
1850 let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
1852 debug.should_pause(node_id)
1853 } else {
1854 false
1855 };
1856
1857 if should_pause {
1858 let trace_id = persistence_trace_id(bus);
1859 tracing::event!(
1860 target: "ranvier.debugger",
1861 tracing::Level::INFO,
1862 trace_id = %trace_id,
1863 node_id = %node_id,
1864 "Node paused"
1865 );
1866
1867 if let Some(timeline) = bus.read_mut::<Timeline>() {
1868 timeline.push(TimelineEvent::NodePaused {
1869 node_id: node_id.to_string(),
1870 timestamp: now_ms(),
1871 });
1872 }
1873 if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
1874 debug.wait().await;
1875 }
1876 }
1877
1878 let enter_ts = now_ms();
1879 if let Some(timeline) = bus.read_mut::<Timeline>() {
1880 timeline.push(TimelineEvent::NodeEnter {
1881 node_id: node_id.to_string(),
1882 node_label: node_label.to_string(),
1883 timestamp: enter_ts,
1884 });
1885 }
1886
1887 let dlq_retry_config = bus.read::<DlqPolicy>().and_then(|p| {
1889 if let DlqPolicy::RetryThenDlq {
1890 max_attempts,
1891 backoff_ms,
1892 } = p
1893 {
1894 Some((*max_attempts, *backoff_ms))
1895 } else {
1896 None
1897 }
1898 });
1899 let retry_state_snapshot = if dlq_retry_config.is_some() {
1900 serde_json::to_value(&state).ok()
1901 } else {
1902 None
1903 };
1904
1905 let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
1907 Some(serde_json::to_vec(&state).unwrap_or_default())
1908 } else {
1909 None
1910 };
1911
1912 let node_span = tracing::info_span!(
1913 "Node",
1914 ranvier.node = %label,
1915 ranvier.resource_type = %res_type,
1916 ranvier.outcome_kind = tracing::field::Empty,
1917 ranvier.outcome_target = tracing::field::Empty
1918 );
1919 let started = std::time::Instant::now();
1920 bus.set_access_policy(label.clone(), bus_policy.clone());
1921 let result = trans
1922 .run(state, res, bus)
1923 .instrument(node_span.clone())
1924 .await;
1925 bus.clear_access_policy();
1926
1927 let result = if let Outcome::Fault(_) = &result {
1930 if let (Some((max_attempts, backoff_ms)), Some(snapshot)) =
1931 (dlq_retry_config, &retry_state_snapshot)
1932 {
1933 let mut final_result = result;
1934 for attempt in 2..=max_attempts {
1936 let delay = backoff_ms.saturating_mul(2u64.saturating_pow(attempt - 2));
1937
1938 tracing::info!(
1939 ranvier.node = %label,
1940 attempt = attempt,
1941 max_attempts = max_attempts,
1942 backoff_ms = delay,
1943 "Retrying faulted node"
1944 );
1945
1946 if let Some(timeline) = bus.read_mut::<Timeline>() {
1947 timeline.push(TimelineEvent::NodeRetry {
1948 node_id: node_id.to_string(),
1949 attempt,
1950 max_attempts,
1951 backoff_ms: delay,
1952 timestamp: now_ms(),
1953 });
1954 }
1955
1956 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
1957
1958 if let Ok(retry_state) = serde_json::from_value::<In>(snapshot.clone()) {
1959 bus.set_access_policy(label.clone(), bus_policy.clone());
1960 let retry_result = trans
1961 .run(retry_state, res, bus)
1962 .instrument(tracing::info_span!(
1963 "NodeRetry",
1964 ranvier.node = %label,
1965 attempt = attempt
1966 ))
1967 .await;
1968 bus.clear_access_policy();
1969
1970 match &retry_result {
1971 Outcome::Fault(_) => {
1972 final_result = retry_result;
1973 }
1974 _ => {
1975 final_result = retry_result;
1976 break;
1977 }
1978 }
1979 }
1980 }
1981 final_result
1982 } else {
1983 result
1984 }
1985 } else {
1986 result
1987 };
1988
1989 node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
1990 if let Some(target) = outcome_target(&result) {
1991 node_span.record("ranvier.outcome_target", tracing::field::display(&target));
1992 }
1993 let duration_ms = started.elapsed().as_millis() as u64;
1994 let exit_ts = now_ms();
1995
1996 if let Some(timeline) = bus.read_mut::<Timeline>() {
1997 timeline.push(TimelineEvent::NodeExit {
1998 node_id: node_id.to_string(),
1999 outcome_type: outcome_type_name(&result),
2000 duration_ms,
2001 timestamp: exit_ts,
2002 });
2003
2004 if let Outcome::Branch(branch_id, _) = &result {
2005 timeline.push(TimelineEvent::Branchtaken {
2006 branch_id: branch_id.clone(),
2007 timestamp: exit_ts,
2008 });
2009 }
2010 }
2011
2012 if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot)
2014 && let Some(stack) = bus.read_mut::<SagaStack>()
2015 {
2016 stack.push(node_id.to_string(), label.clone(), snapshot);
2017 }
2018
2019 if let Some(handle) = bus.read::<PersistenceHandle>() {
2020 let trace_id = persistence_trace_id(bus);
2021 let circuit = bus
2022 .read::<ranvier_core::schematic::Schematic>()
2023 .map(|s| s.name.clone())
2024 .unwrap_or_default();
2025 let version = bus
2026 .read::<ranvier_core::schematic::Schematic>()
2027 .map(|s| s.schema_version.clone())
2028 .unwrap_or_default();
2029
2030 persist_execution_event(
2031 handle,
2032 &trace_id,
2033 &circuit,
2034 &version,
2035 step_idx,
2036 Some(node_id.to_string()),
2037 outcome_kind_name(&result),
2038 Some(result.to_json_value()),
2039 )
2040 .await;
2041 }
2042
2043 if let Outcome::Fault(f) = &result {
2046 let dlq_action = {
2048 let policy = bus.read::<DlqPolicy>();
2049 let sink = bus.read::<Arc<dyn DlqSink>>();
2050 match (sink, policy) {
2051 (Some(s), Some(p)) if !matches!(p, DlqPolicy::Drop) => Some(s.clone()),
2052 _ => None,
2053 }
2054 };
2055
2056 if let Some(sink) = dlq_action {
2057 if let Some((max_attempts, _)) = dlq_retry_config
2058 && let Some(timeline) = bus.read_mut::<Timeline>()
2059 {
2060 timeline.push(TimelineEvent::DlqExhausted {
2061 node_id: node_id.to_string(),
2062 total_attempts: max_attempts,
2063 timestamp: now_ms(),
2064 });
2065 }
2066
2067 let trace_id = persistence_trace_id(bus);
2068 let circuit = bus
2069 .read::<ranvier_core::schematic::Schematic>()
2070 .map(|s| s.name.clone())
2071 .unwrap_or_default();
2072 let _ = sink
2073 .store_dead_letter(
2074 &trace_id,
2075 &circuit,
2076 node_id,
2077 &format!("{:?}", f),
2078 &serde_json::to_vec(&f).unwrap_or_default(),
2079 )
2080 .await;
2081 }
2082 }
2083
2084 result
2085}
2086
2087#[allow(clippy::too_many_arguments)]
2088async fn run_this_compensated_step<Out, Next, E, Res, Comp>(
2089 trans: &(impl Transition<Out, Next, Resources = Res, Error = E> + Clone + 'static),
2090 comp: &Comp,
2091 state: Out,
2092 res: &Res,
2093 bus: &mut Bus,
2094 node_id: &str,
2095 comp_node_id: &str,
2096 node_label: &str,
2097 bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2098 step_idx: u64,
2099) -> Outcome<Next, E>
2100where
2101 Out: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
2102 Next: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2103 E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2104 Res: ranvier_core::transition::ResourceRequirement,
2105 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
2106{
2107 let label = trans.label();
2108
2109 let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2111 debug.should_pause(node_id)
2112 } else {
2113 false
2114 };
2115
2116 if should_pause {
2117 let trace_id = persistence_trace_id(bus);
2118 tracing::event!(
2119 target: "ranvier.debugger",
2120 tracing::Level::INFO,
2121 trace_id = %trace_id,
2122 node_id = %node_id,
2123 "Node paused (compensated)"
2124 );
2125
2126 if let Some(timeline) = bus.read_mut::<Timeline>() {
2127 timeline.push(TimelineEvent::NodePaused {
2128 node_id: node_id.to_string(),
2129 timestamp: now_ms(),
2130 });
2131 }
2132 if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2133 debug.wait().await;
2134 }
2135 }
2136
2137 let enter_ts = now_ms();
2138 if let Some(timeline) = bus.read_mut::<Timeline>() {
2139 timeline.push(TimelineEvent::NodeEnter {
2140 node_id: node_id.to_string(),
2141 node_label: node_label.to_string(),
2142 timestamp: enter_ts,
2143 });
2144 }
2145
2146 let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2148 Some(serde_json::to_vec(&state).unwrap_or_default())
2149 } else {
2150 None
2151 };
2152
2153 let node_span = tracing::info_span!("Node", ranvier.node = %label);
2154 bus.set_access_policy(label.clone(), bus_policy.clone());
2155 let result = trans
2156 .run(state.clone(), res, bus)
2157 .instrument(node_span)
2158 .await;
2159 bus.clear_access_policy();
2160
2161 let duration_ms = 0; let exit_ts = now_ms();
2163
2164 if let Some(timeline) = bus.read_mut::<Timeline>() {
2165 timeline.push(TimelineEvent::NodeExit {
2166 node_id: node_id.to_string(),
2167 outcome_type: outcome_type_name(&result),
2168 duration_ms,
2169 timestamp: exit_ts,
2170 });
2171 }
2172
2173 if let Outcome::Fault(ref err) = result {
2175 if compensation_auto_trigger(bus) {
2176 tracing::info!(
2177 ranvier.node = %label,
2178 ranvier.compensation.trigger = "saga",
2179 error = ?err,
2180 "Saga compensation triggered"
2181 );
2182
2183 if let Some(timeline) = bus.read_mut::<Timeline>() {
2184 timeline.push(TimelineEvent::NodeEnter {
2185 node_id: comp_node_id.to_string(),
2186 node_label: format!("Compensate: {}", comp.label()),
2187 timestamp: exit_ts,
2188 });
2189 }
2190
2191 let _ = comp.run(state, res, bus).await;
2193
2194 if let Some(timeline) = bus.read_mut::<Timeline>() {
2195 timeline.push(TimelineEvent::NodeExit {
2196 node_id: comp_node_id.to_string(),
2197 outcome_type: "Compensated".to_string(),
2198 duration_ms: 0,
2199 timestamp: now_ms(),
2200 });
2201 }
2202
2203 if let Some(handle) = bus.read::<PersistenceHandle>() {
2204 let trace_id = persistence_trace_id(bus);
2205 let circuit = bus
2206 .read::<ranvier_core::schematic::Schematic>()
2207 .map(|s| s.name.clone())
2208 .unwrap_or_default();
2209 let version = bus
2210 .read::<ranvier_core::schematic::Schematic>()
2211 .map(|s| s.schema_version.clone())
2212 .unwrap_or_default();
2213
2214 persist_execution_event(
2215 handle,
2216 &trace_id,
2217 &circuit,
2218 &version,
2219 step_idx + 1, Some(comp_node_id.to_string()),
2221 "Compensated",
2222 None,
2223 )
2224 .await;
2225 }
2226 }
2227 } else if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot) {
2228 if let Some(stack) = bus.read_mut::<SagaStack>() {
2230 stack.push(node_id.to_string(), label.clone(), snapshot);
2231 }
2232
2233 if let Some(handle) = bus.read::<PersistenceHandle>() {
2234 let trace_id = persistence_trace_id(bus);
2235 let circuit = bus
2236 .read::<ranvier_core::schematic::Schematic>()
2237 .map(|s| s.name.clone())
2238 .unwrap_or_default();
2239 let version = bus
2240 .read::<ranvier_core::schematic::Schematic>()
2241 .map(|s| s.schema_version.clone())
2242 .unwrap_or_default();
2243
2244 persist_execution_event(
2245 handle,
2246 &trace_id,
2247 &circuit,
2248 &version,
2249 step_idx,
2250 Some(node_id.to_string()),
2251 outcome_kind_name(&result),
2252 Some(result.to_json_value()),
2253 )
2254 .await;
2255 }
2256 }
2257
2258 if let Outcome::Fault(f) = &result
2260 && let (Some(sink), Some(policy)) =
2261 (bus.read::<Arc<dyn DlqSink>>(), bus.read::<DlqPolicy>())
2262 {
2263 let should_dlq = !matches!(policy, DlqPolicy::Drop);
2264 if should_dlq {
2265 let trace_id = persistence_trace_id(bus);
2266 let circuit = bus
2267 .read::<ranvier_core::schematic::Schematic>()
2268 .map(|s| s.name.clone())
2269 .unwrap_or_default();
2270 let _ = sink
2271 .store_dead_letter(
2272 &trace_id,
2273 &circuit,
2274 node_id,
2275 &format!("{:?}", f),
2276 &serde_json::to_vec(&f).unwrap_or_default(),
2277 )
2278 .await;
2279 }
2280 }
2281
2282 result
2283}
2284
2285#[allow(clippy::too_many_arguments)]
2286pub async fn persist_execution_event(
2287 handle: &PersistenceHandle,
2288 trace_id: &str,
2289 circuit: &str,
2290 schematic_version: &str,
2291 step: u64,
2292 node_id: Option<String>,
2293 outcome_kind: &str,
2294 payload: Option<serde_json::Value>,
2295) {
2296 let store = handle.store();
2297 let envelope = PersistenceEnvelope {
2298 trace_id: trace_id.to_string(),
2299 circuit: circuit.to_string(),
2300 schematic_version: schematic_version.to_string(),
2301 step,
2302 node_id,
2303 outcome_kind: outcome_kind.to_string(),
2304 timestamp_ms: now_ms(),
2305 payload_hash: None,
2306 payload,
2307 };
2308
2309 if let Err(err) = store.append(envelope).await {
2310 tracing::warn!(
2311 trace_id = %trace_id,
2312 circuit = %circuit,
2313 step,
2314 outcome_kind = %outcome_kind,
2315 error = %err,
2316 "Failed to append persistence envelope"
2317 );
2318 }
2319}
2320
2321async fn persist_completion(
2322 handle: &PersistenceHandle,
2323 trace_id: &str,
2324 completion: CompletionState,
2325) {
2326 let store = handle.store();
2327 if let Err(err) = store.complete(trace_id, completion).await {
2328 tracing::warn!(
2329 trace_id = %trace_id,
2330 error = %err,
2331 "Failed to complete persistence trace"
2332 );
2333 }
2334}
2335
2336fn compensation_idempotency_key(context: &CompensationContext) -> String {
2337 format!(
2338 "{}:{}:{}",
2339 context.trace_id, context.circuit, context.fault_kind
2340 )
2341}
2342
2343async fn run_compensation(
2344 handle: &CompensationHandle,
2345 context: CompensationContext,
2346 retry_policy: CompensationRetryPolicy,
2347 idempotency: Option<CompensationIdempotencyHandle>,
2348) -> bool {
2349 let hook = handle.hook();
2350 let key = compensation_idempotency_key(&context);
2351
2352 if let Some(store_handle) = idempotency.as_ref() {
2353 let store = store_handle.store();
2354 match store.was_compensated(&key).await {
2355 Ok(true) => {
2356 tracing::info!(
2357 trace_id = %context.trace_id,
2358 circuit = %context.circuit,
2359 key = %key,
2360 "Compensation already recorded; skipping duplicate hook execution"
2361 );
2362 return true;
2363 }
2364 Ok(false) => {}
2365 Err(err) => {
2366 tracing::warn!(
2367 trace_id = %context.trace_id,
2368 key = %key,
2369 error = %err,
2370 "Failed to check compensation idempotency state"
2371 );
2372 }
2373 }
2374 }
2375
2376 let max_attempts = retry_policy.max_attempts.max(1);
2377 for attempt in 1..=max_attempts {
2378 match hook.compensate(context.clone()).await {
2379 Ok(()) => {
2380 if let Some(store_handle) = idempotency.as_ref() {
2381 let store = store_handle.store();
2382 if let Err(err) = store.mark_compensated(&key).await {
2383 tracing::warn!(
2384 trace_id = %context.trace_id,
2385 key = %key,
2386 error = %err,
2387 "Failed to mark compensation idempotency state"
2388 );
2389 }
2390 }
2391 return true;
2392 }
2393 Err(err) => {
2394 let is_last = attempt == max_attempts;
2395 tracing::warn!(
2396 trace_id = %context.trace_id,
2397 circuit = %context.circuit,
2398 fault_kind = %context.fault_kind,
2399 fault_step = context.fault_step,
2400 attempt,
2401 max_attempts,
2402 error = %err,
2403 "Compensation hook attempt failed"
2404 );
2405 if !is_last && retry_policy.backoff_ms > 0 {
2406 tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
2407 .await;
2408 }
2409 }
2410 }
2411 }
2412 false
2413}
2414
2415fn ensure_timeline(bus: &mut Bus) -> bool {
2416 if bus.has::<Timeline>() {
2417 false
2418 } else {
2419 bus.insert(Timeline::new());
2420 true
2421 }
2422}
2423
2424fn should_attach_timeline(bus: &Bus) -> bool {
2425 if bus.has::<Timeline>() {
2427 return true;
2428 }
2429
2430 has_timeline_output_path()
2432}
2433
2434fn has_timeline_output_path() -> bool {
2435 std::env::var("RANVIER_TIMELINE_OUTPUT")
2436 .ok()
2437 .map(|v| !v.trim().is_empty())
2438 .unwrap_or(false)
2439}
2440
2441fn timeline_sample_rate() -> f64 {
2442 std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
2443 .ok()
2444 .and_then(|v| v.parse::<f64>().ok())
2445 .map(|v| v.clamp(0.0, 1.0))
2446 .unwrap_or(1.0)
2447}
2448
2449fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
2450 if rate <= 0.0 {
2451 return false;
2452 }
2453 if rate >= 1.0 {
2454 return true;
2455 }
2456 let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
2457 bucket < rate
2458}
2459
2460fn timeline_adaptive_policy() -> String {
2461 std::env::var("RANVIER_TIMELINE_ADAPTIVE")
2462 .unwrap_or_else(|_| "fault_branch".to_string())
2463 .to_ascii_lowercase()
2464}
2465
2466fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
2467 match policy {
2468 "off" => false,
2469 "fault_only" => matches!(outcome, Outcome::Fault(_)),
2470 "fault_branch_emit" => {
2471 matches!(
2472 outcome,
2473 Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
2474 )
2475 }
2476 _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
2477 }
2478}
2479
2480#[derive(Default, Clone)]
2481struct SamplingStats {
2482 total_decisions: u64,
2483 exported: u64,
2484 skipped: u64,
2485 sampled_exports: u64,
2486 forced_exports: u64,
2487 last_mode: String,
2488 last_policy: String,
2489 last_updated_ms: u64,
2490}
2491
2492static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
2493
2494fn stats_cell() -> &'static Mutex<SamplingStats> {
2495 TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
2496}
2497
2498fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
2499 let snapshot = {
2500 let mut stats = match stats_cell().lock() {
2501 Ok(guard) => guard,
2502 Err(_) => return,
2503 };
2504
2505 stats.total_decisions += 1;
2506 if exported {
2507 stats.exported += 1;
2508 } else {
2509 stats.skipped += 1;
2510 }
2511 if sampled && exported {
2512 stats.sampled_exports += 1;
2513 }
2514 if forced && exported {
2515 stats.forced_exports += 1;
2516 }
2517 stats.last_mode = mode.to_string();
2518 stats.last_policy = policy.to_string();
2519 stats.last_updated_ms = now_ms();
2520 stats.clone()
2521 };
2522
2523 tracing::debug!(
2524 ranvier.timeline.total_decisions = snapshot.total_decisions,
2525 ranvier.timeline.exported = snapshot.exported,
2526 ranvier.timeline.skipped = snapshot.skipped,
2527 ranvier.timeline.sampled_exports = snapshot.sampled_exports,
2528 ranvier.timeline.forced_exports = snapshot.forced_exports,
2529 ranvier.timeline.mode = %snapshot.last_mode,
2530 ranvier.timeline.policy = %snapshot.last_policy,
2531 "Timeline sampling stats updated"
2532 );
2533
2534 if let Some(path) = timeline_stats_output_path() {
2535 let payload = serde_json::json!({
2536 "total_decisions": snapshot.total_decisions,
2537 "exported": snapshot.exported,
2538 "skipped": snapshot.skipped,
2539 "sampled_exports": snapshot.sampled_exports,
2540 "forced_exports": snapshot.forced_exports,
2541 "last_mode": snapshot.last_mode,
2542 "last_policy": snapshot.last_policy,
2543 "last_updated_ms": snapshot.last_updated_ms
2544 });
2545 if let Some(parent) = Path::new(&path).parent() {
2546 let _ = fs::create_dir_all(parent);
2547 }
2548 if let Err(err) = fs::write(&path, payload.to_string()) {
2549 tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
2550 }
2551 }
2552}
2553
2554fn timeline_stats_output_path() -> Option<String> {
2555 std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
2556 .ok()
2557 .filter(|v| !v.trim().is_empty())
2558}
2559
2560fn write_timeline_with_policy(
2561 path: &str,
2562 mode: &str,
2563 mut timeline: Timeline,
2564) -> Result<(), String> {
2565 match mode {
2566 "append" => {
2567 if Path::new(path).exists() {
2568 let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
2569 match serde_json::from_str::<Timeline>(&content) {
2570 Ok(mut existing) => {
2571 existing.events.append(&mut timeline.events);
2572 existing.sort();
2573 if let Some(max_events) = max_events_limit() {
2574 truncate_timeline_events(&mut existing, max_events);
2575 }
2576 write_timeline_json(path, &existing)
2577 }
2578 Err(_) => {
2579 if let Some(max_events) = max_events_limit() {
2581 truncate_timeline_events(&mut timeline, max_events);
2582 }
2583 write_timeline_json(path, &timeline)
2584 }
2585 }
2586 } else {
2587 if let Some(max_events) = max_events_limit() {
2588 truncate_timeline_events(&mut timeline, max_events);
2589 }
2590 write_timeline_json(path, &timeline)
2591 }
2592 }
2593 "rotate" => {
2594 let rotated_path = rotated_path(path, now_ms());
2595 write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
2596 if let Some(keep) = rotate_keep_limit() {
2597 cleanup_rotated_files(path, keep)?;
2598 }
2599 Ok(())
2600 }
2601 _ => write_timeline_json(path, &timeline),
2602 }
2603}
2604
2605fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
2606 if let Some(parent) = Path::new(path).parent()
2607 && !parent.as_os_str().is_empty()
2608 {
2609 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
2610 }
2611 let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
2612 fs::write(path, json).map_err(|e| e.to_string())
2613}
2614
2615fn rotated_path(path: &str, suffix: u64) -> PathBuf {
2616 let p = Path::new(path);
2617 let parent = p.parent().unwrap_or_else(|| Path::new(""));
2618 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
2619 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
2620 parent.join(format!("{}_{}.{}", stem, suffix, ext))
2621}
2622
2623fn max_events_limit() -> Option<usize> {
2624 std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
2625 .ok()
2626 .and_then(|v| v.parse::<usize>().ok())
2627 .filter(|v| *v > 0)
2628}
2629
2630fn rotate_keep_limit() -> Option<usize> {
2631 std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
2632 .ok()
2633 .and_then(|v| v.parse::<usize>().ok())
2634 .filter(|v| *v > 0)
2635}
2636
2637fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
2638 let len = timeline.events.len();
2639 if len > max_events {
2640 let keep_from = len - max_events;
2641 timeline.events = timeline.events.split_off(keep_from);
2642 }
2643}
2644
2645fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
2646 let p = Path::new(base_path);
2647 let parent = p.parent().unwrap_or_else(|| Path::new("."));
2648 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
2649 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
2650 let prefix = format!("{}_", stem);
2651 let suffix = format!(".{}", ext);
2652
2653 let mut files = fs::read_dir(parent)
2654 .map_err(|e| e.to_string())?
2655 .filter_map(|entry| entry.ok())
2656 .filter(|entry| {
2657 let name = entry.file_name();
2658 let name = name.to_string_lossy();
2659 name.starts_with(&prefix) && name.ends_with(&suffix)
2660 })
2661 .map(|entry| {
2662 let modified = entry
2663 .metadata()
2664 .ok()
2665 .and_then(|m| m.modified().ok())
2666 .unwrap_or(SystemTime::UNIX_EPOCH);
2667 (entry.path(), modified)
2668 })
2669 .collect::<Vec<_>>();
2670
2671 files.sort_by(|a, b| b.1.cmp(&a.1));
2672 for (path, _) in files.into_iter().skip(keep) {
2673 let _ = fs::remove_file(path);
2674 }
2675 Ok(())
2676}
2677
2678fn bus_capability_schema_from_policy(
2679 policy: Option<ranvier_core::bus::BusAccessPolicy>,
2680) -> Option<BusCapabilitySchema> {
2681 let policy = policy?;
2682
2683 let mut allow = policy
2684 .allow
2685 .unwrap_or_default()
2686 .into_iter()
2687 .map(|entry| entry.type_name.to_string())
2688 .collect::<Vec<_>>();
2689 let mut deny = policy
2690 .deny
2691 .into_iter()
2692 .map(|entry| entry.type_name.to_string())
2693 .collect::<Vec<_>>();
2694 allow.sort();
2695 deny.sort();
2696
2697 if allow.is_empty() && deny.is_empty() {
2698 return None;
2699 }
2700
2701 Some(BusCapabilitySchema { allow, deny })
2702}
2703
2704fn now_ms() -> u64 {
2705 SystemTime::now()
2706 .duration_since(UNIX_EPOCH)
2707 .map(|d| d.as_millis() as u64)
2708 .unwrap_or(0)
2709}
2710
2711#[cfg(test)]
2712mod tests {
2713 use super::{
2714 Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
2715 should_force_export,
2716 };
2717 use crate::persistence::{
2718 CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
2719 CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
2720 InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
2721 PersistenceHandle, PersistenceStore, PersistenceTraceId,
2722 };
2723 use anyhow::Result;
2724 use async_trait::async_trait;
2725 use ranvier_audit::{AuditError, AuditEvent, AuditSink};
2726 use ranvier_core::event::{DlqPolicy, DlqSink};
2727 use ranvier_core::saga::SagaStack;
2728 use ranvier_core::timeline::{Timeline, TimelineEvent};
2729 use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
2730 use serde::{Deserialize, Serialize};
2731 use std::sync::Arc;
2732 use tokio::sync::Mutex;
2733 use uuid::Uuid;
2734
2735 struct MockAuditSink {
2736 events: Arc<Mutex<Vec<AuditEvent>>>,
2737 }
2738
2739 #[async_trait]
2740 impl AuditSink for MockAuditSink {
2741 async fn append(&self, event: &AuditEvent) -> Result<(), AuditError> {
2742 self.events.lock().await.push(event.clone());
2743 Ok(())
2744 }
2745 }
2746
2747 #[tokio::test]
2748 async fn execute_logs_audit_events_for_intervention() {
2749 use ranvier_inspector::StateInspector;
2750
2751 let trace_id = "test-audit-trace";
2752 let store_impl = InMemoryPersistenceStore::new();
2753 let events = Arc::new(Mutex::new(Vec::new()));
2754 let sink = MockAuditSink {
2755 events: events.clone(),
2756 };
2757
2758 let axon = Axon::<i32, i32, TestInfallible>::start("AuditTest")
2759 .then(AddOne)
2760 .with_persistence_store(store_impl.clone())
2761 .with_audit_sink(sink);
2762
2763 let mut bus = Bus::new();
2764 bus.insert(PersistenceHandle::from_arc(Arc::new(store_impl.clone())));
2765 bus.insert(PersistenceTraceId::new(trace_id));
2766 let target_node_id = axon.schematic.nodes[0].id.clone();
2767
2768 store_impl
2770 .append(crate::persistence::PersistenceEnvelope {
2771 trace_id: trace_id.to_string(),
2772 circuit: "AuditTest".to_string(),
2773 schematic_version: "v1.0".to_string(),
2774 step: 0,
2775 node_id: None,
2776 outcome_kind: "Next".to_string(),
2777 timestamp_ms: 0,
2778 payload_hash: None,
2779 payload: None,
2780 })
2781 .await
2782 .unwrap();
2783
2784 axon.force_resume(trace_id, &target_node_id, None)
2786 .await
2787 .unwrap();
2788
2789 axon.execute(10, &(), &mut bus).await;
2791
2792 let recorded = events.lock().await;
2793 assert_eq!(
2794 recorded.len(),
2795 2,
2796 "Should have 2 audit events: ForceResume and ApplyIntervention"
2797 );
2798 assert_eq!(recorded[0].action, "ForceResume");
2799 assert_eq!(recorded[0].target, trace_id);
2800 assert_eq!(recorded[1].action, "ApplyIntervention");
2801 assert_eq!(recorded[1].target, trace_id);
2802 }
2803
2804 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2805 pub enum TestInfallible {}
2806
2807 #[test]
2808 fn inspector_enabled_flag_matrix() {
2809 assert!(inspector_enabled_from_value(None));
2810 assert!(inspector_enabled_from_value(Some("1")));
2811 assert!(inspector_enabled_from_value(Some("true")));
2812 assert!(inspector_enabled_from_value(Some("on")));
2813 assert!(!inspector_enabled_from_value(Some("0")));
2814 assert!(!inspector_enabled_from_value(Some("false")));
2815 }
2816
2817 #[test]
2818 fn inspector_dev_mode_matrix() {
2819 assert!(inspector_dev_mode_from_value(None));
2820 assert!(inspector_dev_mode_from_value(Some("dev")));
2821 assert!(inspector_dev_mode_from_value(Some("staging")));
2822 assert!(!inspector_dev_mode_from_value(Some("prod")));
2823 assert!(!inspector_dev_mode_from_value(Some("production")));
2824 }
2825
2826 #[test]
2827 fn adaptive_policy_force_export_matrix() {
2828 let next = Outcome::<(), &'static str>::Next(());
2829 let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
2830 let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
2831 let fault = Outcome::<(), &'static str>::Fault("boom");
2832
2833 assert!(!should_force_export(&next, "off"));
2834 assert!(!should_force_export(&fault, "off"));
2835
2836 assert!(!should_force_export(&branch, "fault_only"));
2837 assert!(should_force_export(&fault, "fault_only"));
2838
2839 assert!(should_force_export(&branch, "fault_branch"));
2840 assert!(!should_force_export(&emit, "fault_branch"));
2841 assert!(should_force_export(&fault, "fault_branch"));
2842
2843 assert!(should_force_export(&branch, "fault_branch_emit"));
2844 assert!(should_force_export(&emit, "fault_branch_emit"));
2845 assert!(should_force_export(&fault, "fault_branch_emit"));
2846 }
2847
2848 #[test]
2849 fn sampling_and_adaptive_combination_decisions() {
2850 let bus_id = Uuid::nil();
2851 let next = Outcome::<(), &'static str>::Next(());
2852 let fault = Outcome::<(), &'static str>::Fault("boom");
2853
2854 let sampled_never = sampled_by_bus_id(bus_id, 0.0);
2855 assert!(!sampled_never);
2856 assert!(!(sampled_never || should_force_export(&next, "off")));
2857 assert!(sampled_never || should_force_export(&fault, "fault_only"));
2858
2859 let sampled_always = sampled_by_bus_id(bus_id, 1.0);
2860 assert!(sampled_always);
2861 assert!(sampled_always || should_force_export(&next, "off"));
2862 assert!(sampled_always || should_force_export(&fault, "off"));
2863 }
2864
2865 #[derive(Clone)]
2866 struct AddOne;
2867
2868 #[async_trait]
2869 impl Transition<i32, i32> for AddOne {
2870 type Error = TestInfallible;
2871 type Resources = ();
2872
2873 async fn run(
2874 &self,
2875 state: i32,
2876 _resources: &Self::Resources,
2877 _bus: &mut Bus,
2878 ) -> Outcome<i32, Self::Error> {
2879 Outcome::Next(state + 1)
2880 }
2881 }
2882
2883 #[derive(Clone)]
2884 struct AlwaysFault;
2885
2886 #[async_trait]
2887 impl Transition<i32, i32> for AlwaysFault {
2888 type Error = String;
2889 type Resources = ();
2890
2891 async fn run(
2892 &self,
2893 _state: i32,
2894 _resources: &Self::Resources,
2895 _bus: &mut Bus,
2896 ) -> Outcome<i32, Self::Error> {
2897 Outcome::Fault("boom".to_string())
2898 }
2899 }
2900
2901 #[derive(Clone)]
2902 struct CapabilityGuarded;
2903
2904 #[async_trait]
2905 impl Transition<(), ()> for CapabilityGuarded {
2906 type Error = String;
2907 type Resources = ();
2908
2909 fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
2910 Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
2911 }
2912
2913 async fn run(
2914 &self,
2915 _state: (),
2916 _resources: &Self::Resources,
2917 bus: &mut Bus,
2918 ) -> Outcome<(), Self::Error> {
2919 match bus.get::<String>() {
2920 Ok(_) => Outcome::Next(()),
2921 Err(err) => Outcome::Fault(err.to_string()),
2922 }
2923 }
2924 }
2925
2926 #[derive(Clone)]
2927 struct RecordingCompensationHook {
2928 calls: Arc<Mutex<Vec<CompensationContext>>>,
2929 should_fail: bool,
2930 }
2931
2932 #[async_trait]
2933 impl CompensationHook for RecordingCompensationHook {
2934 async fn compensate(&self, context: CompensationContext) -> Result<()> {
2935 self.calls.lock().await.push(context);
2936 if self.should_fail {
2937 return Err(anyhow::anyhow!("compensation failed"));
2938 }
2939 Ok(())
2940 }
2941 }
2942
2943 #[derive(Clone)]
2944 struct FlakyCompensationHook {
2945 calls: Arc<Mutex<u32>>,
2946 failures_remaining: Arc<Mutex<u32>>,
2947 }
2948
2949 #[async_trait]
2950 impl CompensationHook for FlakyCompensationHook {
2951 async fn compensate(&self, _context: CompensationContext) -> Result<()> {
2952 {
2953 let mut calls = self.calls.lock().await;
2954 *calls += 1;
2955 }
2956 let mut failures_remaining = self.failures_remaining.lock().await;
2957 if *failures_remaining > 0 {
2958 *failures_remaining -= 1;
2959 return Err(anyhow::anyhow!("transient compensation failure"));
2960 }
2961 Ok(())
2962 }
2963 }
2964
2965 #[derive(Clone)]
2966 struct FailingCompensationIdempotencyStore {
2967 read_calls: Arc<Mutex<u32>>,
2968 write_calls: Arc<Mutex<u32>>,
2969 }
2970
2971 #[async_trait]
2972 impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
2973 async fn was_compensated(&self, _key: &str) -> Result<bool> {
2974 let mut read_calls = self.read_calls.lock().await;
2975 *read_calls += 1;
2976 Err(anyhow::anyhow!("forced idempotency read failure"))
2977 }
2978
2979 async fn mark_compensated(&self, _key: &str) -> Result<()> {
2980 let mut write_calls = self.write_calls.lock().await;
2981 *write_calls += 1;
2982 Err(anyhow::anyhow!("forced idempotency write failure"))
2983 }
2984 }
2985
2986 #[tokio::test]
2987 async fn execute_persists_success_trace_when_handle_exists() {
2988 let store_impl = Arc::new(InMemoryPersistenceStore::new());
2989 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
2990
2991 let mut bus = Bus::new();
2992 bus.insert(PersistenceHandle::from_arc(store_dyn));
2993 bus.insert(PersistenceTraceId::new("trace-success"));
2994
2995 let axon = Axon::<i32, i32, TestInfallible>::start("PersistSuccess").then(AddOne);
2996 let outcome = axon.execute(41, &(), &mut bus).await;
2997 assert!(matches!(outcome, Outcome::Next(42)));
2998
2999 let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
3000 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[0].outcome_kind, "Enter");
3002 assert_eq!(persisted.events[1].outcome_kind, "Next"); assert_eq!(persisted.events[2].outcome_kind, "Next"); assert_eq!(persisted.completion, Some(CompletionState::Success));
3005 }
3006
3007 #[tokio::test]
3008 async fn execute_persists_fault_completion_state() {
3009 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3010 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3011
3012 let mut bus = Bus::new();
3013 bus.insert(PersistenceHandle::from_arc(store_dyn));
3014 bus.insert(PersistenceTraceId::new("trace-fault"));
3015
3016 let axon = Axon::<i32, i32, String>::start("PersistFault").then(AlwaysFault);
3017 let outcome = axon.execute(41, &(), &mut bus).await;
3018 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3019
3020 let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
3021 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[1].outcome_kind, "Fault"); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.completion, Some(CompletionState::Fault));
3025 }
3026
3027 #[tokio::test]
3028 async fn execute_respects_persistence_auto_complete_off() {
3029 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3030 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3031
3032 let mut bus = Bus::new();
3033 bus.insert(PersistenceHandle::from_arc(store_dyn));
3034 bus.insert(PersistenceTraceId::new("trace-no-complete"));
3035 bus.insert(PersistenceAutoComplete(false));
3036
3037 let axon = Axon::<i32, i32, TestInfallible>::start("PersistNoComplete").then(AddOne);
3038 let outcome = axon.execute(1, &(), &mut bus).await;
3039 assert!(matches!(outcome, Outcome::Next(2)));
3040
3041 let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
3042 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.completion, None);
3044 }
3045
3046 #[tokio::test]
3047 async fn fault_triggers_compensation_and_marks_compensated() {
3048 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3049 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3050 let calls = Arc::new(Mutex::new(Vec::new()));
3051 let compensation = RecordingCompensationHook {
3052 calls: calls.clone(),
3053 should_fail: false,
3054 };
3055
3056 let mut bus = Bus::new();
3057 bus.insert(PersistenceHandle::from_arc(store_dyn));
3058 bus.insert(PersistenceTraceId::new("trace-compensated"));
3059 bus.insert(CompensationHandle::from_hook(compensation));
3060
3061 let axon = Axon::<i32, i32, String>::start("CompensatedFault").then(AlwaysFault);
3062 let outcome = axon.execute(7, &(), &mut bus).await;
3063 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3064
3065 let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
3066 assert_eq!(persisted.events.len(), 4); assert_eq!(persisted.events[0].outcome_kind, "Enter");
3068 assert_eq!(persisted.events[1].outcome_kind, "Fault"); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.events[3].outcome_kind, "Compensated");
3071 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3072
3073 let recorded = calls.lock().await;
3074 assert_eq!(recorded.len(), 1);
3075 assert_eq!(recorded[0].trace_id, "trace-compensated");
3076 assert_eq!(recorded[0].fault_kind, "Fault");
3077 }
3078
3079 #[tokio::test]
3080 async fn failed_compensation_keeps_fault_completion() {
3081 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3082 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3083 let calls = Arc::new(Mutex::new(Vec::new()));
3084 let compensation = RecordingCompensationHook {
3085 calls: calls.clone(),
3086 should_fail: true,
3087 };
3088
3089 let mut bus = Bus::new();
3090 bus.insert(PersistenceHandle::from_arc(store_dyn));
3091 bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
3092 bus.insert(CompensationHandle::from_hook(compensation));
3093
3094 let axon = Axon::<i32, i32, String>::start("CompensationFails").then(AlwaysFault);
3095 let outcome = axon.execute(7, &(), &mut bus).await;
3096 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3097
3098 let persisted = store_impl
3099 .load("trace-compensation-failed")
3100 .await
3101 .unwrap()
3102 .unwrap();
3103 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.completion, Some(CompletionState::Fault));
3106
3107 let recorded = calls.lock().await;
3108 assert_eq!(recorded.len(), 1);
3109 }
3110
3111 #[tokio::test]
3112 async fn compensation_retry_policy_succeeds_after_retries() {
3113 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3114 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3115 let calls = Arc::new(Mutex::new(0u32));
3116 let failures_remaining = Arc::new(Mutex::new(2u32));
3117 let compensation = FlakyCompensationHook {
3118 calls: calls.clone(),
3119 failures_remaining,
3120 };
3121
3122 let mut bus = Bus::new();
3123 bus.insert(PersistenceHandle::from_arc(store_dyn));
3124 bus.insert(PersistenceTraceId::new("trace-retry-success"));
3125 bus.insert(CompensationHandle::from_hook(compensation));
3126 bus.insert(CompensationRetryPolicy {
3127 max_attempts: 3,
3128 backoff_ms: 0,
3129 });
3130
3131 let axon = Axon::<i32, i32, String>::start("CompensationRetry").then(AlwaysFault);
3132 let outcome = axon.execute(7, &(), &mut bus).await;
3133 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3134
3135 let persisted = store_impl
3136 .load("trace-retry-success")
3137 .await
3138 .unwrap()
3139 .unwrap();
3140 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3141 assert_eq!(
3142 persisted.events.last().map(|e| e.outcome_kind.as_str()),
3143 Some("Compensated")
3144 );
3145
3146 let attempt_count = calls.lock().await;
3147 assert_eq!(*attempt_count, 3);
3148 }
3149
3150 #[tokio::test]
3151 async fn compensation_idempotency_skips_duplicate_hook_execution() {
3152 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3153 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3154 let calls = Arc::new(Mutex::new(Vec::new()));
3155 let compensation = RecordingCompensationHook {
3156 calls: calls.clone(),
3157 should_fail: false,
3158 };
3159 let idempotency = InMemoryCompensationIdempotencyStore::new();
3160
3161 let mut bus = Bus::new();
3162 bus.insert(PersistenceHandle::from_arc(store_dyn));
3163 bus.insert(PersistenceTraceId::new("trace-idempotent"));
3164 bus.insert(PersistenceAutoComplete(false));
3165 bus.insert(CompensationHandle::from_hook(compensation));
3166 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3167
3168 let axon = Axon::<i32, i32, String>::start("CompensationIdempotency").then(AlwaysFault);
3169
3170 let outcome1 = axon.execute(7, &(), &mut bus).await;
3171 let outcome2 = axon.execute(8, &(), &mut bus).await;
3172 assert!(matches!(outcome1, Outcome::Fault(msg) if msg == "boom"));
3173 assert!(matches!(outcome2, Outcome::Fault(msg) if msg == "boom"));
3174
3175 let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
3176 assert_eq!(persisted.completion, None);
3177 let compensated_count = persisted
3179 .events
3180 .iter()
3181 .filter(|e| e.outcome_kind == "Compensated")
3182 .count();
3183 assert_eq!(
3184 compensated_count, 2,
3185 "Should have 2 Compensated events (one per execution)"
3186 );
3187
3188 let recorded = calls.lock().await;
3189 assert_eq!(recorded.len(), 1);
3190 }
3191
3192 #[tokio::test]
3193 async fn compensation_idempotency_store_failure_does_not_block_compensation() {
3194 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3195 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3196 let calls = Arc::new(Mutex::new(Vec::new()));
3197 let read_calls = Arc::new(Mutex::new(0u32));
3198 let write_calls = Arc::new(Mutex::new(0u32));
3199 let compensation = RecordingCompensationHook {
3200 calls: calls.clone(),
3201 should_fail: false,
3202 };
3203 let idempotency = FailingCompensationIdempotencyStore {
3204 read_calls: read_calls.clone(),
3205 write_calls: write_calls.clone(),
3206 };
3207
3208 let mut bus = Bus::new();
3209 bus.insert(PersistenceHandle::from_arc(store_dyn));
3210 bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
3211 bus.insert(CompensationHandle::from_hook(compensation));
3212 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3213
3214 let axon = Axon::<i32, i32, String>::start("IdempotencyStoreFailure").then(AlwaysFault);
3215 let outcome = axon.execute(9, &(), &mut bus).await;
3216 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3217
3218 let persisted = store_impl
3219 .load("trace-idempotency-store-failure")
3220 .await
3221 .unwrap()
3222 .unwrap();
3223 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3224 assert_eq!(
3225 persisted.events.last().map(|e| e.outcome_kind.as_str()),
3226 Some("Compensated")
3227 );
3228
3229 let recorded = calls.lock().await;
3230 assert_eq!(recorded.len(), 1);
3231 assert_eq!(*read_calls.lock().await, 1);
3232 assert_eq!(*write_calls.lock().await, 1);
3233 }
3234
3235 #[tokio::test]
3236 async fn transition_bus_policy_blocks_unauthorized_resource_access() {
3237 let mut bus = Bus::new();
3238 bus.insert(1_i32);
3239 bus.insert("secret".to_string());
3240
3241 let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
3242 let outcome = axon.execute((), &(), &mut bus).await;
3243
3244 match outcome {
3245 Outcome::Fault(msg) => {
3246 assert!(msg.contains("Bus access denied"), "{msg}");
3247 assert!(msg.contains("CapabilityGuarded"), "{msg}");
3248 assert!(msg.contains("alloc::string::String"), "{msg}");
3249 }
3250 other => panic!("expected fault, got {other:?}"),
3251 }
3252 }
3253
3254 #[tokio::test]
3255 async fn execute_fails_on_version_mismatch_without_migration() {
3256 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3257 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3258
3259 let trace_id = "v-mismatch";
3260 let old_envelope = crate::persistence::PersistenceEnvelope {
3262 trace_id: trace_id.to_string(),
3263 circuit: "TestCircuit".to_string(),
3264 schematic_version: "0.9".to_string(),
3265 step: 0,
3266 node_id: None,
3267 outcome_kind: "Enter".to_string(),
3268 timestamp_ms: 0,
3269 payload_hash: None,
3270 payload: None,
3271 };
3272 store_impl.append(old_envelope).await.unwrap();
3273
3274 let mut bus = Bus::new();
3275 bus.insert(PersistenceHandle::from_arc(store_dyn));
3276 bus.insert(PersistenceTraceId::new(trace_id));
3277
3278 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3280 let outcome = axon.execute(10, &(), &mut bus).await;
3281
3282 if let Outcome::Emit(kind, _) = outcome {
3283 assert_eq!(kind, "execution.resumption.version_mismatch_failed");
3284 } else {
3285 panic!("Expected version mismatch emission, got {:?}", outcome);
3286 }
3287 }
3288
3289 #[tokio::test]
3290 async fn execute_resumes_from_start_on_migration_strategy() {
3291 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3292 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3293
3294 let trace_id = "v-migration";
3295 let old_envelope = crate::persistence::PersistenceEnvelope {
3297 trace_id: trace_id.to_string(),
3298 circuit: "TestCircuit".to_string(),
3299 schematic_version: "0.9".to_string(),
3300 step: 5,
3301 node_id: None,
3302 outcome_kind: "Next".to_string(),
3303 timestamp_ms: 0,
3304 payload_hash: None,
3305 payload: None,
3306 };
3307 store_impl.append(old_envelope).await.unwrap();
3308
3309 let mut registry = ranvier_core::schematic::MigrationRegistry::new("TestCircuit");
3310 registry.register(ranvier_core::schematic::SnapshotMigration {
3311 name: Some("v0.9 to v1.0".to_string()),
3312 from_version: "0.9".to_string(),
3313 to_version: "1.0".to_string(),
3314 default_strategy: ranvier_core::schematic::MigrationStrategy::ResumeFromStart,
3315 node_mapping: std::collections::HashMap::new(),
3316 payload_mapper: None,
3317 });
3318
3319 let mut bus = Bus::new();
3320 bus.insert(PersistenceHandle::from_arc(store_dyn));
3321 bus.insert(PersistenceTraceId::new(trace_id));
3322 bus.insert(registry);
3323
3324 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3325 let outcome = axon.execute(10, &(), &mut bus).await;
3326
3327 assert!(matches!(outcome, Outcome::Next(11)));
3329
3330 let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3332 assert_eq!(persisted.schematic_version, "1.0");
3333 }
3334
3335 #[tokio::test]
3336 async fn execute_applies_manual_intervention_jump_and_payload() {
3337 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3338 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3339
3340 let trace_id = "intervention-test";
3341 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit")
3343 .then(AddOne)
3344 .then(AddOne);
3345
3346 let mut bus = Bus::new();
3347 bus.insert(PersistenceHandle::from_arc(store_dyn));
3348 bus.insert(PersistenceTraceId::new(trace_id));
3349
3350 let _target_node_label = "AddOne";
3355 let target_node_id = axon.schematic.nodes[2].id.clone();
3357
3358 store_impl
3360 .append(crate::persistence::PersistenceEnvelope {
3361 trace_id: trace_id.to_string(),
3362 circuit: "TestCircuit".to_string(),
3363 schematic_version: "1.0".to_string(),
3364 step: 0,
3365 node_id: None,
3366 outcome_kind: "Enter".to_string(),
3367 timestamp_ms: 0,
3368 payload_hash: None,
3369 payload: None,
3370 })
3371 .await
3372 .unwrap();
3373
3374 store_impl
3375 .save_intervention(
3376 trace_id,
3377 crate::persistence::Intervention {
3378 target_node: target_node_id.clone(),
3379 payload_override: Some(serde_json::json!(100)),
3380 timestamp_ms: 0,
3381 },
3382 )
3383 .await
3384 .unwrap();
3385
3386 let outcome = axon.execute(10, &(), &mut bus).await;
3389
3390 match outcome {
3391 Outcome::Next(val) => assert_eq!(val, 101, "Should have used payload 100 and added 1"),
3392 other => panic!("Expected Outcome::Next(101), got {:?}", other),
3393 }
3394
3395 let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3397 assert_eq!(persisted.interventions.len(), 1);
3399 assert_eq!(persisted.interventions[0].target_node, target_node_id);
3400 }
3401
3402 #[derive(Clone)]
3406 struct FailNThenSucceed {
3407 remaining: Arc<tokio::sync::Mutex<u32>>,
3408 }
3409
3410 #[async_trait]
3411 impl Transition<i32, i32> for FailNThenSucceed {
3412 type Error = String;
3413 type Resources = ();
3414
3415 async fn run(
3416 &self,
3417 state: i32,
3418 _resources: &Self::Resources,
3419 _bus: &mut Bus,
3420 ) -> Outcome<i32, Self::Error> {
3421 let mut rem = self.remaining.lock().await;
3422 if *rem > 0 {
3423 *rem -= 1;
3424 Outcome::Fault("transient failure".to_string())
3425 } else {
3426 Outcome::Next(state + 1)
3427 }
3428 }
3429 }
3430
3431 #[derive(Clone)]
3433 struct MockDlqSink {
3434 letters: Arc<tokio::sync::Mutex<Vec<String>>>,
3435 }
3436
3437 #[async_trait]
3438 impl DlqSink for MockDlqSink {
3439 async fn store_dead_letter(
3440 &self,
3441 workflow_id: &str,
3442 _circuit_label: &str,
3443 node_id: &str,
3444 error_msg: &str,
3445 _payload: &[u8],
3446 ) -> Result<(), String> {
3447 let entry = format!("{}:{}:{}", workflow_id, node_id, error_msg);
3448 self.letters.lock().await.push(entry);
3449 Ok(())
3450 }
3451 }
3452
3453 #[tokio::test]
3454 async fn retry_then_dlq_retries_and_succeeds_before_exhaustion() {
3455 let remaining = Arc::new(tokio::sync::Mutex::new(2u32));
3457 let trans = FailNThenSucceed { remaining };
3458
3459 let dlq_sink = MockDlqSink {
3460 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3461 };
3462
3463 let mut bus = Bus::new();
3464 bus.insert(Timeline::new());
3465
3466 let axon = Axon::<i32, i32, String>::start("RetrySucceed")
3467 .then(trans)
3468 .with_dlq_policy(DlqPolicy::RetryThenDlq {
3469 max_attempts: 5,
3470 backoff_ms: 1,
3471 })
3472 .with_dlq_sink(dlq_sink.clone());
3473 let outcome = axon.execute(10, &(), &mut bus).await;
3474
3475 assert!(
3477 matches!(outcome, Outcome::Next(11)),
3478 "Expected Next(11), got {:?}",
3479 outcome
3480 );
3481
3482 let letters = dlq_sink.letters.lock().await;
3484 assert!(
3485 letters.is_empty(),
3486 "Should have 0 dead letters, got {}",
3487 letters.len()
3488 );
3489
3490 let timeline = bus.read::<Timeline>().unwrap();
3492 let retry_count = timeline
3493 .events
3494 .iter()
3495 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3496 .count();
3497 assert_eq!(retry_count, 2, "Should have 2 retry events");
3498 }
3499
3500 #[tokio::test]
3501 async fn retry_then_dlq_exhausts_retries_and_sends_to_dlq() {
3502 let mut bus = Bus::new();
3504 bus.insert(Timeline::new());
3505
3506 let dlq_sink = MockDlqSink {
3507 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3508 };
3509
3510 let axon = Axon::<i32, i32, String>::start("RetryExhaust")
3511 .then(AlwaysFault)
3512 .with_dlq_policy(DlqPolicy::RetryThenDlq {
3513 max_attempts: 3,
3514 backoff_ms: 1,
3515 })
3516 .with_dlq_sink(dlq_sink.clone());
3517 let outcome = axon.execute(42, &(), &mut bus).await;
3518
3519 assert!(
3520 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
3521 "Expected Fault(boom), got {:?}",
3522 outcome
3523 );
3524
3525 let letters = dlq_sink.letters.lock().await;
3527 assert_eq!(letters.len(), 1, "Should have 1 dead letter");
3528
3529 let timeline = bus.read::<Timeline>().unwrap();
3531 let retry_count = timeline
3532 .events
3533 .iter()
3534 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3535 .count();
3536 let dlq_count = timeline
3537 .events
3538 .iter()
3539 .filter(|e| matches!(e, TimelineEvent::DlqExhausted { .. }))
3540 .count();
3541 assert_eq!(
3542 retry_count, 2,
3543 "Should have 2 retry events (attempts 2 and 3)"
3544 );
3545 assert_eq!(dlq_count, 1, "Should have 1 DlqExhausted event");
3546 }
3547
3548 #[tokio::test]
3549 async fn send_to_dlq_policy_sends_immediately_without_retry() {
3550 let mut bus = Bus::new();
3551 bus.insert(Timeline::new());
3552
3553 let dlq_sink = MockDlqSink {
3554 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3555 };
3556
3557 let axon = Axon::<i32, i32, String>::start("SendDlq")
3558 .then(AlwaysFault)
3559 .with_dlq_policy(DlqPolicy::SendToDlq)
3560 .with_dlq_sink(dlq_sink.clone());
3561 let outcome = axon.execute(1, &(), &mut bus).await;
3562
3563 assert!(matches!(outcome, Outcome::Fault(_)));
3564
3565 let letters = dlq_sink.letters.lock().await;
3567 assert_eq!(letters.len(), 1);
3568
3569 let timeline = bus.read::<Timeline>().unwrap();
3571 let retry_count = timeline
3572 .events
3573 .iter()
3574 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3575 .count();
3576 assert_eq!(retry_count, 0);
3577 }
3578
3579 #[tokio::test]
3580 async fn drop_policy_does_not_send_to_dlq() {
3581 let mut bus = Bus::new();
3582
3583 let dlq_sink = MockDlqSink {
3584 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3585 };
3586
3587 let axon = Axon::<i32, i32, String>::start("DropDlq")
3588 .then(AlwaysFault)
3589 .with_dlq_policy(DlqPolicy::Drop)
3590 .with_dlq_sink(dlq_sink.clone());
3591 let outcome = axon.execute(1, &(), &mut bus).await;
3592
3593 assert!(matches!(outcome, Outcome::Fault(_)));
3594
3595 let letters = dlq_sink.letters.lock().await;
3597 assert!(letters.is_empty());
3598 }
3599
3600 #[tokio::test]
3601 async fn dynamic_policy_hot_reload_changes_dlq_behavior() {
3602 use ranvier_core::policy::DynamicPolicy;
3603
3604 let (tx, dynamic) = DynamicPolicy::new(DlqPolicy::Drop);
3606 let dlq_sink = MockDlqSink {
3607 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3608 };
3609
3610 let axon = Axon::<i32, i32, String>::start("DynamicDlq")
3611 .then(AlwaysFault)
3612 .with_dynamic_dlq_policy(dynamic)
3613 .with_dlq_sink(dlq_sink.clone());
3614
3615 let mut bus = Bus::new();
3617 let outcome = axon.execute(1, &(), &mut bus).await;
3618 assert!(matches!(outcome, Outcome::Fault(_)));
3619 assert!(
3620 dlq_sink.letters.lock().await.is_empty(),
3621 "Drop policy should produce no DLQ entries"
3622 );
3623
3624 tx.send(DlqPolicy::SendToDlq).unwrap();
3626
3627 let mut bus2 = Bus::new();
3629 let outcome2 = axon.execute(2, &(), &mut bus2).await;
3630 assert!(matches!(outcome2, Outcome::Fault(_)));
3631 assert_eq!(
3632 dlq_sink.letters.lock().await.len(),
3633 1,
3634 "SendToDlq policy should produce 1 DLQ entry"
3635 );
3636 }
3637
3638 #[tokio::test]
3639 async fn dynamic_saga_policy_hot_reload() {
3640 use ranvier_core::policy::DynamicPolicy;
3641 use ranvier_core::saga::SagaPolicy;
3642
3643 let (tx, dynamic) = DynamicPolicy::new(SagaPolicy::Disabled);
3645
3646 let axon = Axon::<i32, i32, TestInfallible>::start("DynamicSaga")
3647 .then(AddOne)
3648 .with_dynamic_saga_policy(dynamic);
3649
3650 let mut bus = Bus::new();
3652 let _outcome = axon.execute(1, &(), &mut bus).await;
3653 assert!(
3654 bus.read::<SagaStack>().is_none() || bus.read::<SagaStack>().unwrap().is_empty(),
3655 "SagaStack should be absent or empty when disabled"
3656 );
3657
3658 tx.send(SagaPolicy::Enabled).unwrap();
3660
3661 let mut bus2 = Bus::new();
3663 let _outcome2 = axon.execute(10, &(), &mut bus2).await;
3664 assert!(
3665 bus2.read::<SagaStack>().is_some(),
3666 "SagaStack should exist when saga is enabled"
3667 );
3668 }
3669
3670 mod iam_tests {
3673 use super::*;
3674 use ranvier_core::iam::{IamError, IamIdentity, IamPolicy, IamToken, IamVerifier};
3675
3676 #[derive(Clone)]
3678 struct MockVerifier {
3679 identity: IamIdentity,
3680 should_fail: bool,
3681 }
3682
3683 #[async_trait]
3684 impl IamVerifier for MockVerifier {
3685 async fn verify(&self, _token: &str) -> Result<IamIdentity, IamError> {
3686 if self.should_fail {
3687 Err(IamError::InvalidToken("mock verification failure".into()))
3688 } else {
3689 Ok(self.identity.clone())
3690 }
3691 }
3692 }
3693
3694 #[tokio::test]
3695 async fn iam_require_identity_passes_with_valid_token() {
3696 let verifier = MockVerifier {
3697 identity: IamIdentity::new("alice").with_role("user"),
3698 should_fail: false,
3699 };
3700
3701 let axon = Axon::<i32, i32, TestInfallible>::new("IamTest")
3702 .with_iam(IamPolicy::RequireIdentity, verifier)
3703 .then(AddOne);
3704
3705 let mut bus = Bus::new();
3706 bus.insert(IamToken("valid-token".to_string()));
3707 let outcome = axon.execute(10, &(), &mut bus).await;
3708
3709 assert!(matches!(outcome, Outcome::Next(11)));
3710 let identity = bus
3712 .read::<IamIdentity>()
3713 .expect("IamIdentity should be in Bus");
3714 assert_eq!(identity.subject, "alice");
3715 }
3716
3717 #[tokio::test]
3718 async fn iam_require_identity_rejects_missing_token() {
3719 let verifier = MockVerifier {
3720 identity: IamIdentity::new("ignored"),
3721 should_fail: false,
3722 };
3723
3724 let axon = Axon::<i32, i32, TestInfallible>::new("IamNoToken")
3725 .with_iam(IamPolicy::RequireIdentity, verifier)
3726 .then(AddOne);
3727
3728 let mut bus = Bus::new();
3729 let outcome = axon.execute(10, &(), &mut bus).await;
3731
3732 match &outcome {
3734 Outcome::Emit(label, _) => {
3735 assert_eq!(label, "iam.missing_token");
3736 }
3737 other => panic!("Expected Emit(iam.missing_token), got {:?}", other),
3738 }
3739 }
3740
3741 #[tokio::test]
3742 async fn iam_rejects_failed_verification() {
3743 let verifier = MockVerifier {
3744 identity: IamIdentity::new("ignored"),
3745 should_fail: true,
3746 };
3747
3748 let axon = Axon::<i32, i32, TestInfallible>::new("IamBadToken")
3749 .with_iam(IamPolicy::RequireIdentity, verifier)
3750 .then(AddOne);
3751
3752 let mut bus = Bus::new();
3753 bus.insert(IamToken("bad-token".to_string()));
3754 let outcome = axon.execute(10, &(), &mut bus).await;
3755
3756 match &outcome {
3757 Outcome::Emit(label, _) => {
3758 assert_eq!(label, "iam.verification_failed");
3759 }
3760 other => panic!("Expected Emit(iam.verification_failed), got {:?}", other),
3761 }
3762 }
3763
3764 #[tokio::test]
3765 async fn iam_require_role_passes_with_matching_role() {
3766 let verifier = MockVerifier {
3767 identity: IamIdentity::new("bob").with_role("admin").with_role("user"),
3768 should_fail: false,
3769 };
3770
3771 let axon = Axon::<i32, i32, TestInfallible>::new("IamRole")
3772 .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
3773 .then(AddOne);
3774
3775 let mut bus = Bus::new();
3776 bus.insert(IamToken("token".to_string()));
3777 let outcome = axon.execute(5, &(), &mut bus).await;
3778
3779 assert!(matches!(outcome, Outcome::Next(6)));
3780 }
3781
3782 #[tokio::test]
3783 async fn iam_require_role_denies_without_role() {
3784 let verifier = MockVerifier {
3785 identity: IamIdentity::new("carol").with_role("user"),
3786 should_fail: false,
3787 };
3788
3789 let axon = Axon::<i32, i32, TestInfallible>::new("IamRoleDeny")
3790 .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
3791 .then(AddOne);
3792
3793 let mut bus = Bus::new();
3794 bus.insert(IamToken("token".to_string()));
3795 let outcome = axon.execute(5, &(), &mut bus).await;
3796
3797 match &outcome {
3798 Outcome::Emit(label, _) => {
3799 assert_eq!(label, "iam.policy_denied");
3800 }
3801 other => panic!("Expected Emit(iam.policy_denied), got {:?}", other),
3802 }
3803 }
3804
3805 #[tokio::test]
3806 async fn iam_policy_none_skips_verification() {
3807 let verifier = MockVerifier {
3808 identity: IamIdentity::new("ignored"),
3809 should_fail: true, };
3811
3812 let axon = Axon::<i32, i32, TestInfallible>::new("IamNone")
3813 .with_iam(IamPolicy::None, verifier)
3814 .then(AddOne);
3815
3816 let mut bus = Bus::new();
3817 let outcome = axon.execute(10, &(), &mut bus).await;
3819
3820 assert!(matches!(outcome, Outcome::Next(11)));
3821 }
3822 }
3823
3824 #[derive(Clone)]
3827 struct SchemaTransition;
3828
3829 #[async_trait]
3830 impl Transition<String, String> for SchemaTransition {
3831 type Error = String;
3832 type Resources = ();
3833
3834 fn input_schema(&self) -> Option<serde_json::Value> {
3835 Some(serde_json::json!({
3836 "type": "object",
3837 "required": ["name"],
3838 "properties": {
3839 "name": { "type": "string" }
3840 }
3841 }))
3842 }
3843
3844 async fn run(
3845 &self,
3846 state: String,
3847 _resources: &Self::Resources,
3848 _bus: &mut Bus,
3849 ) -> Outcome<String, Self::Error> {
3850 Outcome::Next(state)
3851 }
3852 }
3853
3854 #[test]
3855 fn then_auto_populates_input_schema_from_transition() {
3856 let axon = Axon::<String, String, String>::new("SchemaTest").then(SchemaTransition);
3857
3858 let last_node = axon.schematic.nodes.last().unwrap();
3860 assert!(last_node.input_schema.is_some());
3861 let schema = last_node.input_schema.as_ref().unwrap();
3862 assert_eq!(schema["type"], "object");
3863 assert_eq!(schema["required"][0], "name");
3864 }
3865
3866 #[test]
3867 fn then_leaves_input_schema_none_when_not_provided() {
3868 let axon = Axon::<i32, i32, TestInfallible>::new("NoSchema").then(AddOne);
3869
3870 let last_node = axon.schematic.nodes.last().unwrap();
3871 assert!(last_node.input_schema.is_none());
3872 }
3873
3874 #[test]
3875 fn with_input_schema_value_sets_on_last_node() {
3876 let schema = serde_json::json!({"type": "integer"});
3877 let axon = Axon::<i32, i32, TestInfallible>::new("ManualSchema")
3878 .then(AddOne)
3879 .with_input_schema_value(schema.clone());
3880
3881 let last_node = axon.schematic.nodes.last().unwrap();
3882 assert_eq!(last_node.input_schema.as_ref().unwrap(), &schema);
3883 }
3884
3885 #[test]
3886 fn with_output_schema_value_sets_on_last_node() {
3887 let schema = serde_json::json!({"type": "integer"});
3888 let axon = Axon::<i32, i32, TestInfallible>::new("OutputSchema")
3889 .then(AddOne)
3890 .with_output_schema_value(schema.clone());
3891
3892 let last_node = axon.schematic.nodes.last().unwrap();
3893 assert_eq!(last_node.output_schema.as_ref().unwrap(), &schema);
3894 }
3895
3896 #[test]
3897 fn schematic_export_includes_schema_fields() {
3898 let axon = Axon::<String, String, String>::new("ExportTest")
3899 .then(SchemaTransition)
3900 .with_output_schema_value(serde_json::json!({"type": "string"}));
3901
3902 let json = serde_json::to_value(&axon.schematic).unwrap();
3903 let nodes = json["nodes"].as_array().unwrap();
3904 let last = nodes.last().unwrap();
3906 assert!(last.get("input_schema").is_some());
3907 assert_eq!(last["input_schema"]["type"], "object");
3908 assert_eq!(last["output_schema"]["type"], "string");
3909 }
3910
3911 #[test]
3912 fn schematic_export_omits_schema_fields_when_none() {
3913 let axon = Axon::<i32, i32, TestInfallible>::new("NoSchemaExport").then(AddOne);
3914
3915 let json = serde_json::to_value(&axon.schematic).unwrap();
3916 let nodes = json["nodes"].as_array().unwrap();
3917 let last = nodes.last().unwrap();
3918 let obj = last.as_object().unwrap();
3919 assert!(!obj.contains_key("input_schema"));
3920 assert!(!obj.contains_key("output_schema"));
3921 }
3922
3923 #[test]
3924 fn schematic_json_roundtrip_preserves_schemas() {
3925 let axon = Axon::<String, String, String>::new("Roundtrip")
3926 .then(SchemaTransition)
3927 .with_output_schema_value(serde_json::json!({"type": "string"}));
3928
3929 let json_str = serde_json::to_string(&axon.schematic).unwrap();
3930 let deserialized: ranvier_core::schematic::Schematic =
3931 serde_json::from_str(&json_str).unwrap();
3932
3933 let last = deserialized.nodes.last().unwrap();
3934 assert!(last.input_schema.is_some());
3935 assert!(last.output_schema.is_some());
3936 assert_eq!(last.input_schema.as_ref().unwrap()["required"][0], "name");
3937 assert_eq!(last.output_schema.as_ref().unwrap()["type"], "string");
3938 }
3939}