1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, Mutex};
3
4use futures::stream::BoxStream;
5
6use super::util::{prepare_event_after, stream_from_broadcast, BroadcastMap};
7use super::{
8 AppendOutcome, CompactReport, ConsumerId, EventId, EventLog, EventLogBackendKind,
9 EventLogDescription, LogError, LogEvent, Topic,
10};
11
12#[derive(Default)]
13struct MemoryState {
14 topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
15 latest: HashMap<String, EventId>,
16 consumers: HashMap<(String, String), EventId>,
17}
18
19pub struct MemoryEventLog {
20 state: Mutex<MemoryState>,
21 pub(super) broadcasts: BroadcastMap,
22 pub(super) queue_depth: usize,
23}
24
25impl MemoryEventLog {
26 pub fn new(queue_depth: usize) -> Self {
27 Self {
28 state: Mutex::new(MemoryState::default()),
29 broadcasts: BroadcastMap::default(),
30 queue_depth: queue_depth.max(1),
31 }
32 }
33
34 fn state(&self) -> Result<std::sync::MutexGuard<'_, MemoryState>, LogError> {
35 self.state
36 .lock()
37 .map_err(|_| LogError::Io("memory event log state poisoned".to_string()))
38 }
39
40 pub(super) async fn topics(&self) -> Result<Vec<Topic>, LogError> {
41 let state = self.state()?;
42 let mut topics = state
43 .topics
44 .keys()
45 .map(|topic| Topic::new(topic.clone()))
46 .collect::<Result<Vec<_>, _>>()?;
47 topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
48 Ok(topics)
49 }
50
51 pub(super) async fn append_idempotent_by_header(
52 &self,
53 topic: &Topic,
54 header: &str,
55 value: &str,
56 event: LogEvent,
57 ) -> Result<AppendOutcome, LogError> {
58 let mut state = self.state()?;
59 if let Some((event_id, existing)) = state
60 .topics
61 .get(topic.as_str())
62 .into_iter()
63 .flat_map(|events| events.iter())
64 .find(|(_, event)| {
65 event
66 .headers
67 .get(header)
68 .is_some_and(|found| found == value)
69 })
70 {
71 return Ok(AppendOutcome {
72 event_id: *event_id,
73 event: existing.clone(),
74 inserted: false,
75 });
76 }
77
78 let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
79 let previous = state
80 .topics
81 .get(topic.as_str())
82 .and_then(|events| events.back())
83 .map(|(previous_id, previous_event)| (*previous_id, previous_event));
84 let event = prepare_event_after(topic, event_id, previous, event)?;
85 state.latest.insert(topic.as_str().to_string(), event_id);
86 state
87 .topics
88 .entry(topic.as_str().to_string())
89 .or_default()
90 .push_back((event_id, event.clone()));
91 drop(state);
92 self.broadcasts
93 .publish(topic, self.queue_depth, (event_id, event.clone()));
94 Ok(AppendOutcome {
95 event_id,
96 event,
97 inserted: true,
98 })
99 }
100}
101
102impl EventLog for MemoryEventLog {
103 fn describe(&self) -> EventLogDescription {
104 EventLogDescription {
105 backend: EventLogBackendKind::Memory,
106 location: None,
107 size_bytes: None,
108 queue_depth: self.queue_depth,
109 }
110 }
111
112 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
113 let mut state = self.state()?;
114 let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
115 let previous = state
116 .topics
117 .get(topic.as_str())
118 .and_then(|events| events.back())
119 .map(|(previous_id, previous_event)| (*previous_id, previous_event));
120 let event = prepare_event_after(topic, event_id, previous, event)?;
121 state.latest.insert(topic.as_str().to_string(), event_id);
122 state
123 .topics
124 .entry(topic.as_str().to_string())
125 .or_default()
126 .push_back((event_id, event.clone()));
127 drop(state);
128 self.broadcasts
129 .publish(topic, self.queue_depth, (event_id, event));
130 Ok(event_id)
131 }
132
133 async fn flush(&self) -> Result<(), LogError> {
134 Ok(())
135 }
136
137 async fn read_range(
138 &self,
139 topic: &Topic,
140 from: Option<EventId>,
141 limit: usize,
142 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
143 let from = from.unwrap_or(0);
144 let state = self.state()?;
145 let events = state
146 .topics
147 .get(topic.as_str())
148 .into_iter()
149 .flat_map(|events| events.iter())
150 .filter(|(event_id, _)| *event_id > from)
151 .take(limit)
152 .map(|(event_id, event)| (*event_id, event.clone()))
153 .collect();
154 Ok(events)
155 }
156
157 async fn subscribe(
158 self: Arc<Self>,
159 topic: &Topic,
160 from: Option<EventId>,
161 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
162 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
163 let history = self.read_range(topic, from, usize::MAX).await?;
164 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
165 }
166
167 async fn ack(
168 &self,
169 topic: &Topic,
170 consumer: &ConsumerId,
171 up_to: EventId,
172 ) -> Result<(), LogError> {
173 let mut state = self.state()?;
174 state.consumers.insert(
175 (topic.as_str().to_string(), consumer.as_str().to_string()),
176 up_to,
177 );
178 Ok(())
179 }
180
181 async fn consumer_cursor(
182 &self,
183 topic: &Topic,
184 consumer: &ConsumerId,
185 ) -> Result<Option<EventId>, LogError> {
186 let state = self.state()?;
187 Ok(state
188 .consumers
189 .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
190 .copied())
191 }
192
193 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
194 let state = self.state()?;
195 Ok(state.latest.get(topic.as_str()).copied())
196 }
197
198 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
199 let mut state = self.state()?;
200 let Some(events) = state.topics.get_mut(topic.as_str()) else {
201 return Ok(CompactReport::default());
202 };
203 let removed = events
204 .iter()
205 .take_while(|(event_id, _)| *event_id <= before)
206 .count();
207 for _ in 0..removed {
208 events.pop_front();
209 }
210 Ok(CompactReport {
211 removed,
212 remaining: events.len(),
213 latest: state.latest.get(topic.as_str()).copied(),
214 checkpointed: false,
215 })
216 }
217}