1use async_trait::async_trait;
6use dashmap::DashMap;
7use std::collections::{HashMap, VecDeque};
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11use crate::backend::StreamBackend;
12use crate::consumer::ConsumerGroup;
13use crate::error::{StreamError, StreamResult};
14use crate::event::StreamEvent;
15use crate::types::{Offset, PartitionId, StreamPosition, TopicName};
16
17static MEMORY_STORAGE: std::sync::OnceLock<Arc<DashMap<TopicName, Arc<RwLock<TopicData>>>>> =
19 std::sync::OnceLock::new();
20
21fn get_memory_storage() -> Arc<DashMap<TopicName, Arc<RwLock<TopicData>>>> {
22 MEMORY_STORAGE
23 .get_or_init(|| Arc::new(DashMap::new()))
24 .clone()
25}
26
27pub async fn clear_memory_storage() {
29 let storage = get_memory_storage();
30 storage.clear();
31}
32
33#[derive(Clone)]
34struct TopicData {
35 events: VecDeque<(StreamEvent, Offset)>,
36 next_offset: u64,
37 consumer_offsets: HashMap<String, u64>,
38}
39
40pub struct MemoryBackend {
42 connected: bool,
43}
44
45impl MemoryBackend {
46 pub fn new() -> Self {
47 Self { connected: false }
48 }
49}
50
51impl Default for MemoryBackend {
52 fn default() -> Self {
53 Self::new()
54 }
55}
56
57#[async_trait]
58impl StreamBackend for MemoryBackend {
59 fn name(&self) -> &'static str {
60 "memory"
61 }
62
63 async fn connect(&mut self) -> StreamResult<()> {
64 self.connected = true;
65 Ok(())
66 }
67
68 async fn disconnect(&mut self) -> StreamResult<()> {
69 self.connected = false;
70 Ok(())
71 }
72
73 async fn create_topic(&self, topic: &TopicName, _partitions: u32) -> StreamResult<()> {
74 let storage = get_memory_storage();
75 storage.entry(topic.clone()).or_insert_with(|| {
76 Arc::new(RwLock::new(TopicData {
77 events: VecDeque::new(),
78 next_offset: 0,
79 consumer_offsets: HashMap::new(),
80 }))
81 });
82 Ok(())
83 }
84
85 async fn delete_topic(&self, topic: &TopicName) -> StreamResult<()> {
86 get_memory_storage().remove(topic);
87 Ok(())
88 }
89
90 async fn list_topics(&self) -> StreamResult<Vec<TopicName>> {
91 Ok(get_memory_storage()
92 .iter()
93 .map(|entry| entry.key().clone())
94 .collect())
95 }
96
97 async fn send_event(&self, topic: &TopicName, event: StreamEvent) -> StreamResult<Offset> {
98 let storage = get_memory_storage();
99 let topic_data = storage
100 .get(topic)
101 .ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
102
103 let mut data = topic_data.write().await;
104 let offset = Offset::new(data.next_offset);
105 data.next_offset += 1;
106 data.events.push_back((event, offset));
107
108 if data.events.len() > 10000 {
110 data.events.pop_front();
111 }
112
113 Ok(offset)
114 }
115
116 async fn send_batch(
117 &self,
118 topic: &TopicName,
119 events: Vec<StreamEvent>,
120 ) -> StreamResult<Vec<Offset>> {
121 let mut offsets = Vec::new();
122 for event in events {
123 let offset = self.send_event(topic, event).await?;
124 offsets.push(offset);
125 }
126 Ok(offsets)
127 }
128
129 async fn receive_events(
130 &self,
131 topic: &TopicName,
132 consumer_group: Option<&ConsumerGroup>,
133 position: StreamPosition,
134 max_events: usize,
135 ) -> StreamResult<Vec<(StreamEvent, Offset)>> {
136 let storage = get_memory_storage();
137 let topic_data = storage
138 .get(topic)
139 .ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
140
141 let mut data = topic_data.write().await;
142
143 let start_offset = if let Some(group) = consumer_group {
144 let group_name = group.name();
145 let current_offset = data.consumer_offsets.get(group_name).copied().unwrap_or(0);
146
147 match position {
148 StreamPosition::Beginning => current_offset, StreamPosition::End => data.next_offset,
150 StreamPosition::Offset(offset) => offset,
151 }
152 } else {
153 match position {
154 StreamPosition::Beginning => 0,
155 StreamPosition::End => data.next_offset,
156 StreamPosition::Offset(offset) => offset,
157 }
158 };
159
160 let mut events = Vec::new();
161 for (event, offset) in &data.events {
162 if offset.value() >= start_offset && events.len() < max_events {
163 events.push((event.clone(), *offset));
164 }
165 }
166
167 if let Some(group) = consumer_group {
169 if let Some((_, last_offset)) = events.last() {
170 data.consumer_offsets
171 .insert(group.name().to_string(), last_offset.value() + 1);
172 }
173 }
174
175 Ok(events)
176 }
177
178 async fn commit_offset(
179 &self,
180 topic: &TopicName,
181 consumer_group: &ConsumerGroup,
182 _partition: PartitionId,
183 offset: Offset,
184 ) -> StreamResult<()> {
185 let storage = get_memory_storage();
186 let topic_data = storage
187 .get(topic)
188 .ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
189
190 let mut data = topic_data.write().await;
191 data.consumer_offsets
192 .insert(consumer_group.name().to_string(), offset.value() + 1);
193 Ok(())
194 }
195
196 async fn seek(
197 &self,
198 topic: &TopicName,
199 consumer_group: &ConsumerGroup,
200 _partition: PartitionId,
201 position: StreamPosition,
202 ) -> StreamResult<()> {
203 let storage = get_memory_storage();
204 let topic_data = storage
205 .get(topic)
206 .ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
207
208 let mut data = topic_data.write().await;
209
210 let offset = match position {
211 StreamPosition::Beginning => 0,
212 StreamPosition::End => data.next_offset,
213 StreamPosition::Offset(offset) => offset,
214 };
215
216 data.consumer_offsets
217 .insert(consumer_group.name().to_string(), offset);
218 Ok(())
219 }
220
221 async fn get_consumer_lag(
222 &self,
223 topic: &TopicName,
224 consumer_group: &ConsumerGroup,
225 ) -> StreamResult<HashMap<PartitionId, u64>> {
226 let storage = get_memory_storage();
227 let topic_data = storage
228 .get(topic)
229 .ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
230
231 let data = topic_data.read().await;
232
233 let current_offset = data
234 .consumer_offsets
235 .get(consumer_group.name())
236 .copied()
237 .unwrap_or(0);
238
239 let lag = data.next_offset.saturating_sub(current_offset);
240 let mut result = HashMap::new();
241 result.insert(PartitionId::new(0), lag);
242 Ok(result)
243 }
244
245 async fn get_topic_metadata(&self, topic: &TopicName) -> StreamResult<HashMap<String, String>> {
246 let storage = get_memory_storage();
247 let topic_data = storage
248 .get(topic)
249 .ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
250
251 let data = topic_data.read().await;
252
253 let mut metadata = HashMap::new();
254 metadata.insert("backend".to_string(), "memory".to_string());
255 metadata.insert("event_count".to_string(), data.events.len().to_string());
256 metadata.insert("next_offset".to_string(), data.next_offset.to_string());
257 metadata.insert(
258 "consumer_groups".to_string(),
259 data.consumer_offsets.len().to_string(),
260 );
261
262 Ok(metadata)
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 use crate::event::StreamEvent;
270
271 #[tokio::test]
272 async fn test_memory_backend_basic_operations() {
273 clear_memory_storage().await;
275
276 let mut backend = MemoryBackend::new();
277 assert_eq!(backend.name(), "memory");
278
279 backend.connect().await.unwrap();
281
282 let topic = TopicName::new(format!("test-topic-basic-{}", uuid::Uuid::new_v4()));
284 backend.create_topic(&topic, 1).await.unwrap();
285
286 let topics = backend.list_topics().await.unwrap();
288 assert!(
289 topics.iter().any(|t| t.as_str() == topic.as_str()),
290 "Our topic should exist"
291 );
292
293 let event = StreamEvent::TripleAdded {
295 subject: "http://example.org/s".to_string(),
296 predicate: "http://example.org/p".to_string(),
297 object: "http://example.org/o".to_string(),
298 graph: None,
299 metadata: crate::event::EventMetadata::default(),
300 };
301
302 let offset = backend.send_event(&topic, event.clone()).await.unwrap();
303 assert_eq!(offset.value(), 0);
304
305 let events = backend
307 .receive_events(&topic, None, StreamPosition::Beginning, 10)
308 .await
309 .unwrap();
310 assert_eq!(events.len(), 1);
311
312 backend.delete_topic(&topic).await.unwrap();
314 let topics = backend.list_topics().await.unwrap();
315 assert!(
316 !topics.iter().any(|t| t.as_str() == topic.as_str()),
317 "Our topic should be deleted"
318 );
319 }
320
321 #[tokio::test]
322 async fn test_consumer_groups() {
323 clear_memory_storage().await;
325
326 let mut backend = MemoryBackend::new();
327 backend.connect().await.unwrap();
328
329 let topic = TopicName::new(format!("test-topic-groups-{}", uuid::Uuid::new_v4()));
330 backend.create_topic(&topic, 1).await.unwrap();
331
332 for i in 0..5 {
334 let event = StreamEvent::GraphCreated {
335 graph: format!("http://example.org/graph{i}"),
336 metadata: crate::event::EventMetadata::default(),
337 };
338 backend.send_event(&topic, event).await.unwrap();
339 }
340
341 let group = ConsumerGroup::new("test-group".to_string());
343
344 let events = backend
346 .receive_events(&topic, Some(&group), StreamPosition::Beginning, 3)
347 .await
348 .unwrap();
349 assert_eq!(events.len(), 3);
350
351 let events = backend
353 .receive_events(&topic, Some(&group), StreamPosition::Beginning, 10)
354 .await
355 .unwrap();
356 assert_eq!(events.len(), 2);
357
358 let lag = backend.get_consumer_lag(&topic, &group).await.unwrap();
360 assert_eq!(lag.get(&PartitionId::new(0)), Some(&0));
361 }
362}