tramli_plugins/eventstore/
mod.rs1use std::time::Instant;
2use tramli::{FlowState, InMemoryFlowStore, FlowInstance};
3
4#[derive(Debug, Clone, PartialEq, Eq)]
6pub enum EventType {
7 Transition,
8 Compensation,
9}
10
11#[derive(Debug, Clone)]
13pub struct VersionedTransitionEvent {
14 pub flow_id: String,
15 pub version: u32,
16 pub event_type: EventType,
17 pub from: Option<String>,
18 pub to: String,
19 pub trigger: String,
20 pub timestamp: Instant,
21 pub state_snapshot: String,
22}
23
24#[derive(Debug, Clone)]
26pub struct CompensationPlan {
27 pub action: String,
28 pub metadata: String,
29}
30
31pub type CompensationResolver =
33 Box<dyn Fn(&VersionedTransitionEvent, &str) -> Option<CompensationPlan> + Send + Sync>;
34
35pub trait ProjectionReducer<T> {
37 fn initial_state(&self) -> T;
38 fn apply(&self, state: T, event: &VersionedTransitionEvent) -> T;
39}
40
41pub struct EventLogStore<S: FlowState> {
43 pub delegate: InMemoryFlowStore<S>,
44 event_log: Vec<VersionedTransitionEvent>,
45 version_counters: std::collections::HashMap<String, u32>,
46}
47
48impl<S: FlowState> EventLogStore<S> {
49 pub fn new(delegate: InMemoryFlowStore<S>) -> Self {
50 Self {
51 delegate,
52 event_log: Vec::new(),
53 version_counters: std::collections::HashMap::new(),
54 }
55 }
56
57 pub fn create(&mut self, flow: FlowInstance<S>) {
58 self.delegate.create(flow);
59 }
60
61 pub fn get(&self, flow_id: &str) -> Option<&FlowInstance<S>> {
62 self.delegate.get(flow_id)
63 }
64
65 pub fn get_mut(&mut self, flow_id: &str) -> Option<&mut FlowInstance<S>> {
66 self.delegate.get_mut(flow_id)
67 }
68
69 pub fn record_transition(
70 &mut self, flow_id: &str, from: &str, to: &str, trigger: &str, snapshot: &str,
71 ) {
72 self.delegate.record_transition(flow_id, from, to, trigger);
73 let version = self.version_counters.entry(flow_id.to_string()).or_insert(0);
74 *version += 1;
75 self.event_log.push(VersionedTransitionEvent {
76 flow_id: flow_id.to_string(),
77 version: *version,
78 event_type: EventType::Transition,
79 from: Some(from.to_string()),
80 to: to.to_string(),
81 trigger: trigger.to_string(),
82 timestamp: Instant::now(),
83 state_snapshot: snapshot.to_string(),
84 });
85 }
86
87 pub fn append_compensation(&mut self, flow_id: &str, trigger: &str, metadata: &str) {
88 let version = self.version_counters.entry(flow_id.to_string()).or_insert(0);
89 *version += 1;
90 self.event_log.push(VersionedTransitionEvent {
91 flow_id: flow_id.to_string(),
92 version: *version,
93 event_type: EventType::Compensation,
94 from: None,
95 to: "COMPENSATED".to_string(),
96 trigger: trigger.to_string(),
97 timestamp: Instant::now(),
98 state_snapshot: metadata.to_string(),
99 });
100 }
101
102 pub fn events(&self) -> &[VersionedTransitionEvent] {
103 &self.event_log
104 }
105
106 pub fn events_for_flow(&self, flow_id: &str) -> Vec<&VersionedTransitionEvent> {
107 self.event_log.iter().filter(|e| e.flow_id == flow_id).collect()
108 }
109
110 pub fn transition_log(&self) -> &[tramli::TransitionRecord] {
111 self.delegate.transition_log()
112 }
113
114 pub fn clear(&mut self) {
115 self.delegate.clear();
116 self.event_log.clear();
117 self.version_counters.clear();
118 }
119}
120
121pub struct ReplayService;
123
124impl ReplayService {
125 pub fn state_at_version(events: &[VersionedTransitionEvent], flow_id: &str, target_version: u32) -> Option<String> {
126 let mut flow_events: Vec<&VersionedTransitionEvent> = events
127 .iter()
128 .filter(|e| e.flow_id == flow_id && e.event_type == EventType::Transition && e.version <= target_version)
129 .collect();
130 flow_events.sort_by_key(|e| e.version);
131 flow_events.last().map(|e| e.to.clone())
132 }
133}
134
135pub struct ProjectionReplayService;
137
138impl ProjectionReplayService {
139 pub fn state_at_version<T>(
140 events: &[VersionedTransitionEvent],
141 flow_id: &str,
142 target_version: u32,
143 reducer: &dyn ProjectionReducer<T>,
144 ) -> T {
145 let mut state = reducer.initial_state();
146 for event in events {
147 if event.flow_id == flow_id && event.version <= target_version {
148 state = reducer.apply(state, event);
149 }
150 }
151 state
152 }
153}
154
155pub struct CompensationService {
157 resolver: CompensationResolver,
158}
159
160impl CompensationService {
161 pub fn new(resolver: CompensationResolver) -> Self {
162 Self { resolver }
163 }
164
165 pub fn compensate<S: FlowState>(
166 &self, event: &VersionedTransitionEvent, cause: &str, store: &mut EventLogStore<S>,
167 ) -> bool {
168 if let Some(plan) = (self.resolver)(event, cause) {
169 store.append_compensation(&event.flow_id, &plan.action, &plan.metadata);
170 true
171 } else {
172 false
173 }
174 }
175}