Skip to main content

tramli_plugins/eventstore/
mod.rs

1use std::time::Instant;
2use tramli::{FlowState, InMemoryFlowStore, FlowInstance};
3
4/// Event type for the append-only log.
5#[derive(Debug, Clone, PartialEq, Eq)]
6pub enum EventType {
7    Transition,
8    Compensation,
9}
10
11/// Versioned transition event.
12#[derive(Debug, Clone)]
13pub struct VersionedTransitionEvent {
14    pub flow_id: String,
15    pub version: u32,
16    pub event_type: EventType,
17    pub from: Option<String>,
18    pub to: String,
19    pub trigger: String,
20    pub timestamp: Instant,
21    pub state_snapshot: String,
22}
23
24/// Compensation plan returned by resolver.
25#[derive(Debug, Clone)]
26pub struct CompensationPlan {
27    pub action: String,
28    pub metadata: String,
29}
30
31/// Compensation resolver function type.
32pub type CompensationResolver =
33    Box<dyn Fn(&VersionedTransitionEvent, &str) -> Option<CompensationPlan> + Send + Sync>;
34
35/// Projection reducer for materialized views.
36pub trait ProjectionReducer<T> {
37    fn initial_state(&self) -> T;
38    fn apply(&self, state: T, event: &VersionedTransitionEvent) -> T;
39}
40
41/// Event log store decorator — append-only transition log with replay.
42pub struct EventLogStore<S: FlowState> {
43    pub delegate: InMemoryFlowStore<S>,
44    event_log: Vec<VersionedTransitionEvent>,
45    version_counters: std::collections::HashMap<String, u32>,
46}
47
48impl<S: FlowState> EventLogStore<S> {
49    pub fn new(delegate: InMemoryFlowStore<S>) -> Self {
50        Self {
51            delegate,
52            event_log: Vec::new(),
53            version_counters: std::collections::HashMap::new(),
54        }
55    }
56
57    pub fn create(&mut self, flow: FlowInstance<S>) {
58        self.delegate.create(flow);
59    }
60
61    pub fn get(&self, flow_id: &str) -> Option<&FlowInstance<S>> {
62        self.delegate.get(flow_id)
63    }
64
65    pub fn get_mut(&mut self, flow_id: &str) -> Option<&mut FlowInstance<S>> {
66        self.delegate.get_mut(flow_id)
67    }
68
69    pub fn record_transition(
70        &mut self, flow_id: &str, from: &str, to: &str, trigger: &str, snapshot: &str,
71    ) {
72        self.delegate.record_transition(flow_id, from, to, trigger);
73        let version = self.version_counters.entry(flow_id.to_string()).or_insert(0);
74        *version += 1;
75        self.event_log.push(VersionedTransitionEvent {
76            flow_id: flow_id.to_string(),
77            version: *version,
78            event_type: EventType::Transition,
79            from: Some(from.to_string()),
80            to: to.to_string(),
81            trigger: trigger.to_string(),
82            timestamp: Instant::now(),
83            state_snapshot: snapshot.to_string(),
84        });
85    }
86
87    pub fn append_compensation(&mut self, flow_id: &str, trigger: &str, metadata: &str) {
88        let version = self.version_counters.entry(flow_id.to_string()).or_insert(0);
89        *version += 1;
90        self.event_log.push(VersionedTransitionEvent {
91            flow_id: flow_id.to_string(),
92            version: *version,
93            event_type: EventType::Compensation,
94            from: None,
95            to: "COMPENSATED".to_string(),
96            trigger: trigger.to_string(),
97            timestamp: Instant::now(),
98            state_snapshot: metadata.to_string(),
99        });
100    }
101
102    pub fn events(&self) -> &[VersionedTransitionEvent] {
103        &self.event_log
104    }
105
106    pub fn events_for_flow(&self, flow_id: &str) -> Vec<&VersionedTransitionEvent> {
107        self.event_log.iter().filter(|e| e.flow_id == flow_id).collect()
108    }
109
110    pub fn transition_log(&self) -> &[tramli::TransitionRecord] {
111        self.delegate.transition_log()
112    }
113
114    pub fn clear(&mut self) {
115        self.delegate.clear();
116        self.event_log.clear();
117        self.version_counters.clear();
118    }
119}
120
121/// Replay service — reconstruct state at a given version.
122pub struct ReplayService;
123
124impl ReplayService {
125    pub fn state_at_version(events: &[VersionedTransitionEvent], flow_id: &str, target_version: u32) -> Option<String> {
126        let mut flow_events: Vec<&VersionedTransitionEvent> = events
127            .iter()
128            .filter(|e| e.flow_id == flow_id && e.event_type == EventType::Transition && e.version <= target_version)
129            .collect();
130        flow_events.sort_by_key(|e| e.version);
131        flow_events.last().map(|e| e.to.clone())
132    }
133}
134
135/// Projection replay service — custom reducers for materialized views.
136pub struct ProjectionReplayService;
137
138impl ProjectionReplayService {
139    pub fn state_at_version<T>(
140        events: &[VersionedTransitionEvent],
141        flow_id: &str,
142        target_version: u32,
143        reducer: &dyn ProjectionReducer<T>,
144    ) -> T {
145        let mut state = reducer.initial_state();
146        for event in events {
147            if event.flow_id == flow_id && event.version <= target_version {
148                state = reducer.apply(state, event);
149            }
150        }
151        state
152    }
153}
154
155/// Compensation service — records compensation events for failed transitions.
156pub struct CompensationService {
157    resolver: CompensationResolver,
158}
159
160impl CompensationService {
161    pub fn new(resolver: CompensationResolver) -> Self {
162        Self { resolver }
163    }
164
165    pub fn compensate<S: FlowState>(
166        &self, event: &VersionedTransitionEvent, cause: &str, store: &mut EventLogStore<S>,
167    ) -> bool {
168        if let Some(plan) = (self.resolver)(event, cause) {
169            store.append_compensation(&event.flow_id, &plan.action, &plan.metadata);
170            true
171        } else {
172            false
173        }
174    }
175}