Skip to main content

harn_vm/event_log/
memory.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, Mutex};
3
4use futures::stream::BoxStream;
5
6use super::util::{prepare_event_after, stream_from_broadcast, BroadcastMap};
7use super::{
8    AppendOutcome, CompactReport, ConsumerId, EventId, EventLog, EventLogBackendKind,
9    EventLogDescription, LogError, LogEvent, Topic,
10};
11
12#[derive(Default)]
13struct MemoryState {
14    topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
15    latest: HashMap<String, EventId>,
16    consumers: HashMap<(String, String), EventId>,
17}
18
19pub struct MemoryEventLog {
20    state: Mutex<MemoryState>,
21    pub(super) broadcasts: BroadcastMap,
22    pub(super) queue_depth: usize,
23}
24
25impl MemoryEventLog {
26    pub fn new(queue_depth: usize) -> Self {
27        Self {
28            state: Mutex::new(MemoryState::default()),
29            broadcasts: BroadcastMap::default(),
30            queue_depth: queue_depth.max(1),
31        }
32    }
33
34    fn state(&self) -> Result<std::sync::MutexGuard<'_, MemoryState>, LogError> {
35        self.state
36            .lock()
37            .map_err(|_| LogError::Io("memory event log state poisoned".to_string()))
38    }
39
40    pub(super) async fn topics(&self) -> Result<Vec<Topic>, LogError> {
41        let state = self.state()?;
42        let mut topics = state
43            .topics
44            .keys()
45            .map(|topic| Topic::new(topic.clone()))
46            .collect::<Result<Vec<_>, _>>()?;
47        topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
48        Ok(topics)
49    }
50
51    pub(super) async fn append_idempotent_by_header(
52        &self,
53        topic: &Topic,
54        header: &str,
55        value: &str,
56        event: LogEvent,
57    ) -> Result<AppendOutcome, LogError> {
58        let mut state = self.state()?;
59        if let Some((event_id, existing)) = state
60            .topics
61            .get(topic.as_str())
62            .into_iter()
63            .flat_map(|events| events.iter())
64            .find(|(_, event)| {
65                event
66                    .headers
67                    .get(header)
68                    .is_some_and(|found| found == value)
69            })
70        {
71            return Ok(AppendOutcome {
72                event_id: *event_id,
73                event: existing.clone(),
74                inserted: false,
75            });
76        }
77
78        let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
79        let previous = state
80            .topics
81            .get(topic.as_str())
82            .and_then(|events| events.back())
83            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
84        let event = prepare_event_after(topic, event_id, previous, event)?;
85        state.latest.insert(topic.as_str().to_string(), event_id);
86        state
87            .topics
88            .entry(topic.as_str().to_string())
89            .or_default()
90            .push_back((event_id, event.clone()));
91        drop(state);
92        self.broadcasts
93            .publish(topic, self.queue_depth, (event_id, event.clone()));
94        Ok(AppendOutcome {
95            event_id,
96            event,
97            inserted: true,
98        })
99    }
100
101    /// Read counterpart of [`Self::append_idempotent_by_header`]. The in-memory
102    /// backend has no header index, so this scans the topic — acceptable for a
103    /// dev/test backend (SQLite is the durable default).
104    pub(super) async fn read_idempotent_by_header(
105        &self,
106        topic: &Topic,
107        header: &str,
108        value: &str,
109    ) -> Result<Option<(EventId, LogEvent)>, LogError> {
110        let state = self.state()?;
111        Ok(state
112            .topics
113            .get(topic.as_str())
114            .into_iter()
115            .flat_map(|events| events.iter())
116            .find(|(_, event)| {
117                event
118                    .headers
119                    .get(header)
120                    .is_some_and(|found| found == value)
121            })
122            .map(|(event_id, event)| (*event_id, event.clone())))
123    }
124}
125
126impl EventLog for MemoryEventLog {
127    fn describe(&self) -> EventLogDescription {
128        EventLogDescription {
129            backend: EventLogBackendKind::Memory,
130            location: None,
131            size_bytes: None,
132            queue_depth: self.queue_depth,
133        }
134    }
135
136    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
137        let mut state = self.state()?;
138        let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
139        let previous = state
140            .topics
141            .get(topic.as_str())
142            .and_then(|events| events.back())
143            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
144        let event = prepare_event_after(topic, event_id, previous, event)?;
145        state.latest.insert(topic.as_str().to_string(), event_id);
146        state
147            .topics
148            .entry(topic.as_str().to_string())
149            .or_default()
150            .push_back((event_id, event.clone()));
151        drop(state);
152        self.broadcasts
153            .publish(topic, self.queue_depth, (event_id, event));
154        Ok(event_id)
155    }
156
157    async fn flush(&self) -> Result<(), LogError> {
158        Ok(())
159    }
160
161    async fn read_range(
162        &self,
163        topic: &Topic,
164        from: Option<EventId>,
165        limit: usize,
166    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
167        let from = from.unwrap_or(0);
168        let state = self.state()?;
169        let events = state
170            .topics
171            .get(topic.as_str())
172            .into_iter()
173            .flat_map(|events| events.iter())
174            .filter(|(event_id, _)| *event_id > from)
175            .take(limit)
176            .map(|(event_id, event)| (*event_id, event.clone()))
177            .collect();
178        Ok(events)
179    }
180
181    async fn subscribe(
182        self: Arc<Self>,
183        topic: &Topic,
184        from: Option<EventId>,
185    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
186        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
187        let history = self.read_range(topic, from, usize::MAX).await?;
188        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
189    }
190
191    async fn ack(
192        &self,
193        topic: &Topic,
194        consumer: &ConsumerId,
195        up_to: EventId,
196    ) -> Result<(), LogError> {
197        let mut state = self.state()?;
198        state.consumers.insert(
199            (topic.as_str().to_string(), consumer.as_str().to_string()),
200            up_to,
201        );
202        Ok(())
203    }
204
205    async fn consumer_cursor(
206        &self,
207        topic: &Topic,
208        consumer: &ConsumerId,
209    ) -> Result<Option<EventId>, LogError> {
210        let state = self.state()?;
211        Ok(state
212            .consumers
213            .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
214            .copied())
215    }
216
217    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
218        let state = self.state()?;
219        Ok(state.latest.get(topic.as_str()).copied())
220    }
221
222    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
223        let mut state = self.state()?;
224        let Some(events) = state.topics.get_mut(topic.as_str()) else {
225            return Ok(CompactReport::default());
226        };
227        let removed = events
228            .iter()
229            .take_while(|(event_id, _)| *event_id <= before)
230            .count();
231        for _ in 0..removed {
232            events.pop_front();
233        }
234        Ok(CompactReport {
235            removed,
236            remaining: events.len(),
237            latest: state.latest.get(topic.as_str()).copied(),
238            checkpointed: false,
239        })
240    }
241}