Skip to main content

tramli_plugins/eventstore/
mod.rs

1use std::time::Instant;
2use tramli::{FlowState, InMemoryFlowStore, FlowInstance};
3
4/// Event type for the append-only log.
5#[derive(Debug, Clone, PartialEq, Eq)]
6pub enum EventType {
7    Transition,
8    Compensation,
9}
10
11/// Versioned transition event.
12#[derive(Debug, Clone)]
13pub struct VersionedTransitionEvent {
14    pub flow_id: String,
15    pub version: u32,
16    pub event_type: EventType,
17    pub from: Option<String>,
18    pub to: String,
19    pub trigger: String,
20    pub timestamp: Instant,
21    pub state_snapshot: String,
22}
23
24/// Compensation plan returned by resolver.
25#[derive(Debug, Clone)]
26pub struct CompensationPlan {
27    pub action: String,
28    pub metadata: String,
29}
30
31/// Compensation resolver function type.
32pub type CompensationResolver =
33    Box<dyn Fn(&VersionedTransitionEvent, &str) -> Option<CompensationPlan> + Send + Sync>;
34
35/// Projection reducer for materialized views.
36pub trait ProjectionReducer<T> {
37    fn initial_state(&self) -> T;
38    fn apply(&self, state: T, event: &VersionedTransitionEvent) -> T;
39}
40
41/// Event log store decorator — append-only transition log with replay.
42pub struct EventLogStore<S: FlowState> {
43    pub delegate: InMemoryFlowStore<S>,
44    event_log: Vec<VersionedTransitionEvent>,
45    version_counters: std::collections::HashMap<String, u32>,
46}
47
48impl<S: FlowState> EventLogStore<S> {
49    pub fn new(delegate: InMemoryFlowStore<S>) -> Self {
50        Self {
51            delegate,
52            event_log: Vec::new(),
53            version_counters: std::collections::HashMap::new(),
54        }
55    }
56
57    pub fn create(&mut self, flow: FlowInstance<S>) {
58        self.delegate.create(flow);
59    }
60
61    pub fn get(&self, flow_id: &str) -> Option<&FlowInstance<S>> {
62        self.delegate.get(flow_id)
63    }
64
65    pub fn get_mut(&mut self, flow_id: &str) -> Option<&mut FlowInstance<S>> {
66        self.delegate.get_mut(flow_id)
67    }
68
69    pub fn record_transition(
70        &mut self, flow_id: &str, from: &str, to: &str, trigger: &str, snapshot: &str,
71    ) {
72        self.delegate.record_transition(flow_id, from, to, trigger);
73        let version = self.version_counters.entry(flow_id.to_string()).or_insert(0);
74        *version += 1;
75        self.event_log.push(VersionedTransitionEvent {
76            flow_id: flow_id.to_string(),
77            version: *version,
78            event_type: EventType::Transition,
79            from: Some(from.to_string()),
80            to: to.to_string(),
81            trigger: trigger.to_string(),
82            timestamp: Instant::now(),
83            state_snapshot: snapshot.to_string(),
84        });
85    }
86
87    pub fn append_compensation(&mut self, flow_id: &str, trigger: &str, metadata: &str) {
88        let version = self.version_counters.entry(flow_id.to_string()).or_insert(0);
89        *version += 1;
90        self.event_log.push(VersionedTransitionEvent {
91            flow_id: flow_id.to_string(),
92            version: *version,
93            event_type: EventType::Compensation,
94            from: None,
95            to: "COMPENSATED".to_string(),
96            trigger: trigger.to_string(),
97            timestamp: Instant::now(),
98            state_snapshot: metadata.to_string(),
99        });
100    }
101
102    pub fn events(&self) -> &[VersionedTransitionEvent] {
103        &self.event_log
104    }
105
106    pub fn events_for_flow(&self, flow_id: &str) -> Vec<&VersionedTransitionEvent> {
107        self.event_log.iter().filter(|e| e.flow_id == flow_id).collect()
108    }
109
110    pub fn transition_log(&self) -> &[tramli::TransitionRecord] {
111        self.delegate.transition_log()
112    }
113
114    pub fn clear(&mut self) {
115        self.delegate.clear();
116        self.event_log.clear();
117        self.version_counters.clear();
118    }
119}
120
121/// Replay service — reconstructs flow state at any version.
122///
123/// Assumes each TRANSITION event stores a full snapshot of the state.
124/// Returns the latest matching state at or before the requested version.
125///
126/// If the event log is later changed to store diffs instead of full snapshots,
127/// use [`ProjectionReplayService`] with a fold/reducer instead.
128pub struct ReplayService;
129
130impl ReplayService {
131    pub fn state_at_version(events: &[VersionedTransitionEvent], flow_id: &str, target_version: u32) -> Option<String> {
132        let mut flow_events: Vec<&VersionedTransitionEvent> = events
133            .iter()
134            .filter(|e| e.flow_id == flow_id && e.event_type == EventType::Transition && e.version <= target_version)
135            .collect();
136        flow_events.sort_by_key(|e| e.version);
137        flow_events.last().map(|e| e.to.clone())
138    }
139}
140
141/// Projection replay service — fold/reducer model for custom aggregations.
142///
143/// Unlike [`ReplayService`] which assumes full snapshots,
144/// this service supports both full-snapshot and diff-based event logs.
145/// `reducer.initial_state()` returns the empty starting state,
146/// `reducer.apply(state, event)` accumulates each event.
147///
148/// Use for custom aggregations (transition count, cumulative metrics)
149/// or when the event log stores diffs rather than full snapshots.
150pub struct ProjectionReplayService;
151
152impl ProjectionReplayService {
153    pub fn state_at_version<T>(
154        events: &[VersionedTransitionEvent],
155        flow_id: &str,
156        target_version: u32,
157        reducer: &dyn ProjectionReducer<T>,
158    ) -> T {
159        let mut state = reducer.initial_state();
160        for event in events {
161            if event.flow_id == flow_id && event.version <= target_version {
162                state = reducer.apply(state, event);
163            }
164        }
165        state
166    }
167}
168
169/// Compensation service — records compensation events for failed transitions.
170pub struct CompensationService {
171    resolver: CompensationResolver,
172}
173
174impl CompensationService {
175    pub fn new(resolver: CompensationResolver) -> Self {
176        Self { resolver }
177    }
178
179    pub fn compensate<S: FlowState>(
180        &self, event: &VersionedTransitionEvent, cause: &str, store: &mut EventLogStore<S>,
181    ) -> bool {
182        if let Some(plan) = (self.resolver)(event, cause) {
183            store.append_compensation(&event.flow_id, &plan.action, &plan.metadata);
184            true
185        } else {
186            false
187        }
188    }
189}