Skip to main content

harn_vm/event_log/
memory.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, Mutex};
3
4use futures::stream::BoxStream;
5
6use super::util::{prepare_event_after, stream_from_broadcast, BroadcastMap};
7use super::{
8    AppendOutcome, CompactReport, ConsumerId, EventId, EventLog, EventLogBackendKind,
9    EventLogDescription, LogError, LogEvent, Topic,
10};
11
12#[derive(Default)]
13struct MemoryState {
14    topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
15    latest: HashMap<String, EventId>,
16    consumers: HashMap<(String, String), EventId>,
17}
18
19pub struct MemoryEventLog {
20    state: Mutex<MemoryState>,
21    pub(super) broadcasts: BroadcastMap,
22    pub(super) queue_depth: usize,
23}
24
25impl MemoryEventLog {
26    pub fn new(queue_depth: usize) -> Self {
27        Self {
28            state: Mutex::new(MemoryState::default()),
29            broadcasts: BroadcastMap::default(),
30            queue_depth: queue_depth.max(1),
31        }
32    }
33
34    fn state(&self) -> Result<std::sync::MutexGuard<'_, MemoryState>, LogError> {
35        self.state
36            .lock()
37            .map_err(|_| LogError::Io("memory event log state poisoned".to_string()))
38    }
39
40    pub(super) async fn topics(&self) -> Result<Vec<Topic>, LogError> {
41        let state = self.state()?;
42        let mut topics = state
43            .topics
44            .keys()
45            .map(|topic| Topic::new(topic.clone()))
46            .collect::<Result<Vec<_>, _>>()?;
47        topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
48        Ok(topics)
49    }
50
51    pub(super) async fn append_idempotent_by_header(
52        &self,
53        topic: &Topic,
54        header: &str,
55        value: &str,
56        event: LogEvent,
57    ) -> Result<AppendOutcome, LogError> {
58        let mut state = self.state()?;
59        if let Some((event_id, existing)) = state
60            .topics
61            .get(topic.as_str())
62            .into_iter()
63            .flat_map(|events| events.iter())
64            .find(|(_, event)| {
65                event
66                    .headers
67                    .get(header)
68                    .is_some_and(|found| found == value)
69            })
70        {
71            return Ok(AppendOutcome {
72                event_id: *event_id,
73                event: existing.clone(),
74                inserted: false,
75            });
76        }
77
78        let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
79        let previous = state
80            .topics
81            .get(topic.as_str())
82            .and_then(|events| events.back())
83            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
84        let event = prepare_event_after(topic, event_id, previous, event)?;
85        state.latest.insert(topic.as_str().to_string(), event_id);
86        state
87            .topics
88            .entry(topic.as_str().to_string())
89            .or_default()
90            .push_back((event_id, event.clone()));
91        drop(state);
92        self.broadcasts
93            .publish(topic, self.queue_depth, (event_id, event.clone()));
94        Ok(AppendOutcome {
95            event_id,
96            event,
97            inserted: true,
98        })
99    }
100}
101
102impl EventLog for MemoryEventLog {
103    fn describe(&self) -> EventLogDescription {
104        EventLogDescription {
105            backend: EventLogBackendKind::Memory,
106            location: None,
107            size_bytes: None,
108            queue_depth: self.queue_depth,
109        }
110    }
111
112    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
113        let mut state = self.state()?;
114        let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
115        let previous = state
116            .topics
117            .get(topic.as_str())
118            .and_then(|events| events.back())
119            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
120        let event = prepare_event_after(topic, event_id, previous, event)?;
121        state.latest.insert(topic.as_str().to_string(), event_id);
122        state
123            .topics
124            .entry(topic.as_str().to_string())
125            .or_default()
126            .push_back((event_id, event.clone()));
127        drop(state);
128        self.broadcasts
129            .publish(topic, self.queue_depth, (event_id, event));
130        Ok(event_id)
131    }
132
133    async fn flush(&self) -> Result<(), LogError> {
134        Ok(())
135    }
136
137    async fn read_range(
138        &self,
139        topic: &Topic,
140        from: Option<EventId>,
141        limit: usize,
142    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
143        let from = from.unwrap_or(0);
144        let state = self.state()?;
145        let events = state
146            .topics
147            .get(topic.as_str())
148            .into_iter()
149            .flat_map(|events| events.iter())
150            .filter(|(event_id, _)| *event_id > from)
151            .take(limit)
152            .map(|(event_id, event)| (*event_id, event.clone()))
153            .collect();
154        Ok(events)
155    }
156
157    async fn subscribe(
158        self: Arc<Self>,
159        topic: &Topic,
160        from: Option<EventId>,
161    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
162        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
163        let history = self.read_range(topic, from, usize::MAX).await?;
164        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
165    }
166
167    async fn ack(
168        &self,
169        topic: &Topic,
170        consumer: &ConsumerId,
171        up_to: EventId,
172    ) -> Result<(), LogError> {
173        let mut state = self.state()?;
174        state.consumers.insert(
175            (topic.as_str().to_string(), consumer.as_str().to_string()),
176            up_to,
177        );
178        Ok(())
179    }
180
181    async fn consumer_cursor(
182        &self,
183        topic: &Topic,
184        consumer: &ConsumerId,
185    ) -> Result<Option<EventId>, LogError> {
186        let state = self.state()?;
187        Ok(state
188            .consumers
189            .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
190            .copied())
191    }
192
193    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
194        let state = self.state()?;
195        Ok(state.latest.get(topic.as_str()).copied())
196    }
197
198    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
199        let mut state = self.state()?;
200        let Some(events) = state.topics.get_mut(topic.as_str()) else {
201            return Ok(CompactReport::default());
202        };
203        let removed = events
204            .iter()
205            .take_while(|(event_id, _)| *event_id <= before)
206            .count();
207        for _ in 0..removed {
208            events.pop_front();
209        }
210        Ok(CompactReport {
211            removed,
212            remaining: events.len(),
213            latest: state.latest.get(topic.as_str()).copied(),
214            checkpointed: false,
215        })
216    }
217}