oxirs_stream/event_sourcing/
simple.rs1use std::collections::HashMap;
8
9#[derive(Debug, Clone, PartialEq, Eq)]
11pub struct SimpleEvent {
12 pub id: u64,
14 pub aggregate_id: String,
16 pub event_type: String,
18 pub payload: String,
20 pub version: u64,
22 pub timestamp: u64,
24}
25
26pub struct SimpleEventStore {
28 events: Vec<SimpleEvent>,
29 next_id: u64,
30 versions: HashMap<String, u64>,
31}
32
33impl SimpleEventStore {
34 pub fn new() -> Self {
36 Self {
37 events: Vec::new(),
38 next_id: 1,
39 versions: HashMap::new(),
40 }
41 }
42
43 pub fn append(
45 &mut self,
46 aggregate_id: impl Into<String>,
47 event_type: impl Into<String>,
48 payload: impl Into<String>,
49 ) -> SimpleEvent {
50 use std::time::{SystemTime, UNIX_EPOCH};
51 let aggregate_id = aggregate_id.into();
52 let version = self.versions.entry(aggregate_id.clone()).or_insert(0);
53 *version += 1;
54 let event = SimpleEvent {
55 id: self.next_id,
56 aggregate_id: aggregate_id.clone(),
57 event_type: event_type.into(),
58 payload: payload.into(),
59 version: *version,
60 timestamp: SystemTime::now()
61 .duration_since(UNIX_EPOCH)
62 .map(|d| d.as_secs())
63 .unwrap_or(0),
64 };
65 self.next_id += 1;
66 self.events.push(event.clone());
67 event
68 }
69
70 pub fn load_aggregate(&self, aggregate_id: &str) -> Vec<SimpleEvent> {
72 self.events
73 .iter()
74 .filter(|e| e.aggregate_id == aggregate_id)
75 .cloned()
76 .collect()
77 }
78
79 pub fn load_from_version(&self, aggregate_id: &str, from_version: u64) -> Vec<SimpleEvent> {
81 self.events
82 .iter()
83 .filter(|e| e.aggregate_id == aggregate_id && e.version >= from_version)
84 .cloned()
85 .collect()
86 }
87
88 pub fn load_all_events(&self) -> Vec<SimpleEvent> {
90 self.events.clone()
91 }
92
93 pub fn len(&self) -> usize {
95 self.events.len()
96 }
97
98 pub fn is_empty(&self) -> bool {
100 self.events.is_empty()
101 }
102
103 pub fn current_version(&self, aggregate_id: &str) -> u64 {
105 self.versions.get(aggregate_id).copied().unwrap_or(0)
106 }
107}
108
109impl Default for SimpleEventStore {
110 fn default() -> Self {
111 Self::new()
112 }
113}
114
115pub struct EventStreamIter {
117 events: Vec<SimpleEvent>,
118 position: usize,
119 filter_aggregate: Option<String>,
120 filter_type: Option<String>,
121}
122
123impl EventStreamIter {
124 pub fn new(events: Vec<SimpleEvent>) -> Self {
126 Self {
127 events,
128 position: 0,
129 filter_aggregate: None,
130 filter_type: None,
131 }
132 }
133
134 pub fn for_aggregate(mut self, aggregate_id: impl Into<String>) -> Self {
136 self.filter_aggregate = Some(aggregate_id.into());
137 self
138 }
139
140 pub fn for_type(mut self, event_type: impl Into<String>) -> Self {
142 self.filter_type = Some(event_type.into());
143 self
144 }
145
146 fn matches(&self, event: &SimpleEvent) -> bool {
147 if let Some(ref agg) = self.filter_aggregate {
148 if &event.aggregate_id != agg {
149 return false;
150 }
151 }
152 if let Some(ref et) = self.filter_type {
153 if &event.event_type != et {
154 return false;
155 }
156 }
157 true
158 }
159}
160
161impl Iterator for EventStreamIter {
162 type Item = SimpleEvent;
163
164 fn next(&mut self) -> Option<Self::Item> {
165 while self.position < self.events.len() {
166 let ev = &self.events[self.position];
167 self.position += 1;
168 if self.matches(ev) {
169 return Some(ev.clone());
170 }
171 }
172 None
173 }
174}
175
176#[derive(Debug, Clone, PartialEq, Eq)]
178pub struct SimpleSnapshot {
179 pub aggregate_id: String,
181 pub state: String,
183 pub version: u64,
185}
186
187pub struct SimpleSnapshotStore {
189 snapshots: HashMap<String, SimpleSnapshot>,
190}
191
192impl SimpleSnapshotStore {
193 pub fn new() -> Self {
195 Self {
196 snapshots: HashMap::new(),
197 }
198 }
199
200 pub fn save(&mut self, snapshot: SimpleSnapshot) {
202 self.snapshots
203 .insert(snapshot.aggregate_id.clone(), snapshot);
204 }
205
206 pub fn load_snapshot(&self, aggregate_id: &str) -> Option<&SimpleSnapshot> {
208 self.snapshots.get(aggregate_id)
209 }
210
211 pub fn delete(&mut self, aggregate_id: &str) -> bool {
213 self.snapshots.remove(aggregate_id).is_some()
214 }
215
216 pub fn len(&self) -> usize {
218 self.snapshots.len()
219 }
220
221 pub fn is_empty(&self) -> bool {
223 self.snapshots.is_empty()
224 }
225}
226
227impl Default for SimpleSnapshotStore {
228 fn default() -> Self {
229 Self::new()
230 }
231}
232
233pub type SimpleEventHandler = std::sync::Arc<dyn Fn(&SimpleEvent) + Send + Sync>;
235
236pub struct SimpleEventBus {
238 subscriptions: HashMap<String, Vec<SimpleEventHandler>>,
239 wildcard: Vec<SimpleEventHandler>,
240}
241
242impl SimpleEventBus {
243 pub fn new() -> Self {
245 Self {
246 subscriptions: HashMap::new(),
247 wildcard: Vec::new(),
248 }
249 }
250
251 pub fn subscribe(&mut self, event_type: impl Into<String>, handler: SimpleEventHandler) {
253 let key = event_type.into();
254 if key == "*" {
255 self.wildcard.push(handler);
256 } else {
257 self.subscriptions.entry(key).or_default().push(handler);
258 }
259 }
260
261 pub fn publish(&self, event: &SimpleEvent) {
263 if let Some(handlers) = self.subscriptions.get(&event.event_type) {
264 for handler in handlers {
265 handler(event);
266 }
267 }
268 for handler in &self.wildcard {
269 handler(event);
270 }
271 }
272
273 pub fn subscription_count(&self) -> usize {
275 self.subscriptions.values().map(|v| v.len()).sum()
276 }
277
278 pub fn wildcard_count(&self) -> usize {
280 self.wildcard.len()
281 }
282}
283
284impl Default for SimpleEventBus {
285 fn default() -> Self {
286 Self::new()
287 }
288}
289
290pub struct ProjectionRunner {
292 pub name: String,
294 processed: u64,
296}
297
298impl ProjectionRunner {
299 pub fn new(name: impl Into<String>) -> Self {
301 Self {
302 name: name.into(),
303 processed: 0,
304 }
305 }
306
307 pub fn run<S, F>(&mut self, store: &SimpleEventStore, initial: S, mut handler: F) -> S
309 where
310 F: FnMut(S, &SimpleEvent) -> S,
311 {
312 let mut state = initial;
313 for event in store.load_all_events() {
314 state = handler(state, &event);
315 self.processed += 1;
316 }
317 state
318 }
319
320 pub fn run_for_aggregate<S, F>(
322 &mut self,
323 store: &SimpleEventStore,
324 aggregate_id: &str,
325 initial: S,
326 mut handler: F,
327 ) -> S
328 where
329 F: FnMut(S, &SimpleEvent) -> S,
330 {
331 let mut state = initial;
332 for event in store.load_aggregate(aggregate_id) {
333 state = handler(state, &event);
334 self.processed += 1;
335 }
336 state
337 }
338
339 pub fn processed_count(&self) -> u64 {
341 self.processed
342 }
343}