1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, Mutex};
3
4use futures::stream::BoxStream;
5
6use super::util::{prepare_event_after, stream_from_broadcast, BroadcastMap};
7use super::{
8 AppendOutcome, CompactReport, ConsumerId, EventId, EventLog, EventLogBackendKind,
9 EventLogDescription, LogError, LogEvent, Topic,
10};
11
12#[derive(Default)]
13struct MemoryState {
14 topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
15 latest: HashMap<String, EventId>,
16 consumers: HashMap<(String, String), EventId>,
17}
18
19pub struct MemoryEventLog {
20 state: Mutex<MemoryState>,
21 pub(super) broadcasts: BroadcastMap,
22 pub(super) queue_depth: usize,
23}
24
25impl MemoryEventLog {
26 pub fn new(queue_depth: usize) -> Self {
27 Self {
28 state: Mutex::new(MemoryState::default()),
29 broadcasts: BroadcastMap::default(),
30 queue_depth: queue_depth.max(1),
31 }
32 }
33
34 fn state(&self) -> Result<std::sync::MutexGuard<'_, MemoryState>, LogError> {
35 self.state
36 .lock()
37 .map_err(|_| LogError::Io("memory event log state poisoned".to_string()))
38 }
39
40 pub(super) async fn topics(&self) -> Result<Vec<Topic>, LogError> {
41 let state = self.state()?;
42 let mut topics = state
43 .topics
44 .keys()
45 .map(|topic| Topic::new(topic.clone()))
46 .collect::<Result<Vec<_>, _>>()?;
47 topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
48 Ok(topics)
49 }
50
51 pub(super) async fn append_idempotent_by_header(
52 &self,
53 topic: &Topic,
54 header: &str,
55 value: &str,
56 event: LogEvent,
57 ) -> Result<AppendOutcome, LogError> {
58 let mut state = self.state()?;
59 if let Some((event_id, existing)) = state
60 .topics
61 .get(topic.as_str())
62 .into_iter()
63 .flat_map(|events| events.iter())
64 .find(|(_, event)| {
65 event
66 .headers
67 .get(header)
68 .is_some_and(|found| found == value)
69 })
70 {
71 return Ok(AppendOutcome {
72 event_id: *event_id,
73 event: existing.clone(),
74 inserted: false,
75 });
76 }
77
78 let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
79 let previous = state
80 .topics
81 .get(topic.as_str())
82 .and_then(|events| events.back())
83 .map(|(previous_id, previous_event)| (*previous_id, previous_event));
84 let event = prepare_event_after(topic, event_id, previous, event)?;
85 state.latest.insert(topic.as_str().to_string(), event_id);
86 state
87 .topics
88 .entry(topic.as_str().to_string())
89 .or_default()
90 .push_back((event_id, event.clone()));
91 drop(state);
92 self.broadcasts
93 .publish(topic, self.queue_depth, (event_id, event.clone()));
94 Ok(AppendOutcome {
95 event_id,
96 event,
97 inserted: true,
98 })
99 }
100
101 pub(super) async fn read_idempotent_by_header(
105 &self,
106 topic: &Topic,
107 header: &str,
108 value: &str,
109 ) -> Result<Option<(EventId, LogEvent)>, LogError> {
110 let state = self.state()?;
111 Ok(state
112 .topics
113 .get(topic.as_str())
114 .into_iter()
115 .flat_map(|events| events.iter())
116 .find(|(_, event)| {
117 event
118 .headers
119 .get(header)
120 .is_some_and(|found| found == value)
121 })
122 .map(|(event_id, event)| (*event_id, event.clone())))
123 }
124}
125
126impl EventLog for MemoryEventLog {
127 fn describe(&self) -> EventLogDescription {
128 EventLogDescription {
129 backend: EventLogBackendKind::Memory,
130 location: None,
131 size_bytes: None,
132 queue_depth: self.queue_depth,
133 }
134 }
135
136 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
137 let mut state = self.state()?;
138 let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
139 let previous = state
140 .topics
141 .get(topic.as_str())
142 .and_then(|events| events.back())
143 .map(|(previous_id, previous_event)| (*previous_id, previous_event));
144 let event = prepare_event_after(topic, event_id, previous, event)?;
145 state.latest.insert(topic.as_str().to_string(), event_id);
146 state
147 .topics
148 .entry(topic.as_str().to_string())
149 .or_default()
150 .push_back((event_id, event.clone()));
151 drop(state);
152 self.broadcasts
153 .publish(topic, self.queue_depth, (event_id, event));
154 Ok(event_id)
155 }
156
157 async fn flush(&self) -> Result<(), LogError> {
158 Ok(())
159 }
160
161 async fn read_range(
162 &self,
163 topic: &Topic,
164 from: Option<EventId>,
165 limit: usize,
166 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
167 let from = from.unwrap_or(0);
168 let state = self.state()?;
169 let events = state
170 .topics
171 .get(topic.as_str())
172 .into_iter()
173 .flat_map(|events| events.iter())
174 .filter(|(event_id, _)| *event_id > from)
175 .take(limit)
176 .map(|(event_id, event)| (*event_id, event.clone()))
177 .collect();
178 Ok(events)
179 }
180
181 async fn subscribe(
182 self: Arc<Self>,
183 topic: &Topic,
184 from: Option<EventId>,
185 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
186 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
187 let history = self.read_range(topic, from, usize::MAX).await?;
188 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
189 }
190
191 async fn ack(
192 &self,
193 topic: &Topic,
194 consumer: &ConsumerId,
195 up_to: EventId,
196 ) -> Result<(), LogError> {
197 let mut state = self.state()?;
198 state.consumers.insert(
199 (topic.as_str().to_string(), consumer.as_str().to_string()),
200 up_to,
201 );
202 Ok(())
203 }
204
205 async fn consumer_cursor(
206 &self,
207 topic: &Topic,
208 consumer: &ConsumerId,
209 ) -> Result<Option<EventId>, LogError> {
210 let state = self.state()?;
211 Ok(state
212 .consumers
213 .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
214 .copied())
215 }
216
217 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
218 let state = self.state()?;
219 Ok(state.latest.get(topic.as_str()).copied())
220 }
221
222 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
223 let mut state = self.state()?;
224 let Some(events) = state.topics.get_mut(topic.as_str()) else {
225 return Ok(CompactReport::default());
226 };
227 let removed = events
228 .iter()
229 .take_while(|(event_id, _)| *event_id <= before)
230 .count();
231 for _ in 0..removed {
232 events.pop_front();
233 }
234 Ok(CompactReport {
235 removed,
236 remaining: events.len(),
237 latest: state.latest.get(topic.as_str()).copied(),
238 checkpointed: false,
239 })
240 }
241}