Skip to main content

oris_kernel/kernel/
event_store.rs

1//! In-memory EventStore implementation for the kernel.
2//!
3//! Append is atomic (all or nothing); scan returns events in ascending seq order.
4
5use std::collections::HashMap;
6use std::sync::RwLock;
7
8use crate::kernel::event::{Event, EventStore, KernelError, SequencedEvent};
9use crate::kernel::identity::{RunId, Seq};
10
11/// In-memory event store: one log per run, seq assigned on append.
12pub struct InMemoryEventStore {
13    /// run_id -> ordered events (seq 1, 2, 3, ...)
14    logs: RwLock<HashMap<RunId, Vec<SequencedEvent>>>,
15}
16
17impl InMemoryEventStore {
18    pub fn new() -> Self {
19        Self {
20            logs: RwLock::new(HashMap::new()),
21        }
22    }
23
24    fn next_seq(log: &[SequencedEvent]) -> Seq {
25        log.last().map(|e| e.seq + 1).unwrap_or(1)
26    }
27}
28
29impl Default for InMemoryEventStore {
30    fn default() -> Self {
31        Self::new()
32    }
33}
34
35impl EventStore for InMemoryEventStore {
36    fn append(&self, run_id: &RunId, events: &[Event]) -> Result<Seq, KernelError> {
37        if events.is_empty() {
38            let logs = self
39                .logs
40                .read()
41                .map_err(|e| KernelError::EventStore(e.to_string()))?;
42            let last = logs
43                .get(run_id)
44                .and_then(|l| l.last())
45                .map(|e| e.seq)
46                .unwrap_or(0);
47            return Ok(last);
48        }
49        let mut logs = self
50            .logs
51            .write()
52            .map_err(|e| KernelError::EventStore(e.to_string()))?;
53        let log = logs.entry(run_id.clone()).or_default();
54        let start_seq = Self::next_seq(log);
55        for (i, event) in events.iter().cloned().enumerate() {
56            log.push(SequencedEvent {
57                seq: start_seq + i as Seq,
58                event,
59            });
60        }
61        Ok(*log.last().map(|e| &e.seq).unwrap())
62    }
63
64    fn scan(&self, run_id: &RunId, from: Seq) -> Result<Vec<SequencedEvent>, KernelError> {
65        let logs = self
66            .logs
67            .read()
68            .map_err(|e| KernelError::EventStore(e.to_string()))?;
69        let log = match logs.get(run_id) {
70            Some(l) => l,
71            None => return Ok(Vec::new()),
72        };
73        Ok(log.iter().filter(|e| e.seq >= from).cloned().collect())
74    }
75
76    fn head(&self, run_id: &RunId) -> Result<Seq, KernelError> {
77        let logs = self
78            .logs
79            .read()
80            .map_err(|e| KernelError::EventStore(e.to_string()))?;
81        Ok(logs
82            .get(run_id)
83            .and_then(|l| l.last())
84            .map(|e| e.seq)
85            .unwrap_or(0))
86    }
87}
88
89/// Shared event store: wraps `Arc<InMemoryEventStore>` so graph and Kernel can share the same log.
90pub struct SharedEventStore(pub std::sync::Arc<InMemoryEventStore>);
91
92impl SharedEventStore {
93    pub fn new() -> Self {
94        Self(std::sync::Arc::new(InMemoryEventStore::new()))
95    }
96}
97
98impl Default for SharedEventStore {
99    fn default() -> Self {
100        Self::new()
101    }
102}
103
104impl EventStore for SharedEventStore {
105    fn append(&self, run_id: &RunId, events: &[Event]) -> Result<Seq, KernelError> {
106        self.0.append(run_id, events)
107    }
108
109    fn scan(&self, run_id: &RunId, from: Seq) -> Result<Vec<SequencedEvent>, KernelError> {
110        self.0.scan(run_id, from)
111    }
112
113    fn head(&self, run_id: &RunId) -> Result<Seq, KernelError> {
114        self.0.head(run_id)
115    }
116}