oris_kernel/kernel/
event_store.rs1use std::collections::HashMap;
6use std::sync::RwLock;
7
8use crate::kernel::event::{Event, EventStore, KernelError, SequencedEvent};
9use crate::kernel::identity::{RunId, Seq};
10
11pub struct InMemoryEventStore {
13 logs: RwLock<HashMap<RunId, Vec<SequencedEvent>>>,
15}
16
17impl InMemoryEventStore {
18 pub fn new() -> Self {
19 Self {
20 logs: RwLock::new(HashMap::new()),
21 }
22 }
23
24 fn next_seq(log: &[SequencedEvent]) -> Seq {
25 log.last().map(|e| e.seq + 1).unwrap_or(1)
26 }
27}
28
29impl Default for InMemoryEventStore {
30 fn default() -> Self {
31 Self::new()
32 }
33}
34
35impl EventStore for InMemoryEventStore {
36 fn append(&self, run_id: &RunId, events: &[Event]) -> Result<Seq, KernelError> {
37 if events.is_empty() {
38 let logs = self
39 .logs
40 .read()
41 .map_err(|e| KernelError::EventStore(e.to_string()))?;
42 let last = logs
43 .get(run_id)
44 .and_then(|l| l.last())
45 .map(|e| e.seq)
46 .unwrap_or(0);
47 return Ok(last);
48 }
49 let mut logs = self
50 .logs
51 .write()
52 .map_err(|e| KernelError::EventStore(e.to_string()))?;
53 let log = logs.entry(run_id.clone()).or_default();
54 let start_seq = Self::next_seq(log);
55 for (i, event) in events.iter().cloned().enumerate() {
56 log.push(SequencedEvent {
57 seq: start_seq + i as Seq,
58 event,
59 });
60 }
61 Ok(*log.last().map(|e| &e.seq).unwrap())
62 }
63
64 fn scan(&self, run_id: &RunId, from: Seq) -> Result<Vec<SequencedEvent>, KernelError> {
65 let logs = self
66 .logs
67 .read()
68 .map_err(|e| KernelError::EventStore(e.to_string()))?;
69 let log = match logs.get(run_id) {
70 Some(l) => l,
71 None => return Ok(Vec::new()),
72 };
73 Ok(log.iter().filter(|e| e.seq >= from).cloned().collect())
74 }
75
76 fn head(&self, run_id: &RunId) -> Result<Seq, KernelError> {
77 let logs = self
78 .logs
79 .read()
80 .map_err(|e| KernelError::EventStore(e.to_string()))?;
81 Ok(logs
82 .get(run_id)
83 .and_then(|l| l.last())
84 .map(|e| e.seq)
85 .unwrap_or(0))
86 }
87}
88
89pub struct SharedEventStore(pub std::sync::Arc<InMemoryEventStore>);
91
92impl SharedEventStore {
93 pub fn new() -> Self {
94 Self(std::sync::Arc::new(InMemoryEventStore::new()))
95 }
96}
97
98impl Default for SharedEventStore {
99 fn default() -> Self {
100 Self::new()
101 }
102}
103
104impl EventStore for SharedEventStore {
105 fn append(&self, run_id: &RunId, events: &[Event]) -> Result<Seq, KernelError> {
106 self.0.append(run_id, events)
107 }
108
109 fn scan(&self, run_id: &RunId, from: Seq) -> Result<Vec<SequencedEvent>, KernelError> {
110 self.0.scan(run_id, from)
111 }
112
113 fn head(&self, run_id: &RunId) -> Result<Seq, KernelError> {
114 self.0.head(run_id)
115 }
116}