Skip to main content

oxirs_stream/backend/
memory.rs

1//! # Memory Backend
2//!
3//! In-memory backend implementation for testing and development.
4
5use async_trait::async_trait;
6use dashmap::DashMap;
7use std::collections::{HashMap, VecDeque};
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11use crate::backend::StreamBackend;
12use crate::consumer::ConsumerGroup;
13use crate::error::{StreamError, StreamResult};
14use crate::event::StreamEvent;
15use crate::types::{Offset, PartitionId, StreamPosition, TopicName};
16
17// Global shared storage for memory backend
18static MEMORY_STORAGE: std::sync::OnceLock<Arc<DashMap<TopicName, Arc<RwLock<TopicData>>>>> =
19    std::sync::OnceLock::new();
20
21fn get_memory_storage() -> Arc<DashMap<TopicName, Arc<RwLock<TopicData>>>> {
22    MEMORY_STORAGE
23        .get_or_init(|| Arc::new(DashMap::new()))
24        .clone()
25}
26
27/// Clear all memory storage (for testing)
28pub async fn clear_memory_storage() {
29    let storage = get_memory_storage();
30    storage.clear();
31}
32
33#[derive(Clone)]
34struct TopicData {
35    events: VecDeque<(StreamEvent, Offset)>,
36    next_offset: u64,
37    consumer_offsets: HashMap<String, u64>,
38}
39
40/// Memory backend for testing
41pub struct MemoryBackend {
42    connected: bool,
43}
44
45impl MemoryBackend {
46    pub fn new() -> Self {
47        Self { connected: false }
48    }
49}
50
51impl Default for MemoryBackend {
52    fn default() -> Self {
53        Self::new()
54    }
55}
56
57#[async_trait]
58impl StreamBackend for MemoryBackend {
59    fn name(&self) -> &'static str {
60        "memory"
61    }
62
63    async fn connect(&mut self) -> StreamResult<()> {
64        self.connected = true;
65        Ok(())
66    }
67
68    async fn disconnect(&mut self) -> StreamResult<()> {
69        self.connected = false;
70        Ok(())
71    }
72
73    async fn create_topic(&self, topic: &TopicName, _partitions: u32) -> StreamResult<()> {
74        let storage = get_memory_storage();
75        storage.entry(topic.clone()).or_insert_with(|| {
76            Arc::new(RwLock::new(TopicData {
77                events: VecDeque::new(),
78                next_offset: 0,
79                consumer_offsets: HashMap::new(),
80            }))
81        });
82        Ok(())
83    }
84
85    async fn delete_topic(&self, topic: &TopicName) -> StreamResult<()> {
86        get_memory_storage().remove(topic);
87        Ok(())
88    }
89
90    async fn list_topics(&self) -> StreamResult<Vec<TopicName>> {
91        Ok(get_memory_storage()
92            .iter()
93            .map(|entry| entry.key().clone())
94            .collect())
95    }
96
97    async fn send_event(&self, topic: &TopicName, event: StreamEvent) -> StreamResult<Offset> {
98        let storage = get_memory_storage();
99        let topic_data = storage
100            .get(topic)
101            .ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
102
103        let mut data = topic_data.write().await;
104        let offset = Offset::new(data.next_offset);
105        data.next_offset += 1;
106        data.events.push_back((event, offset));
107
108        // Limit memory usage
109        if data.events.len() > 10000 {
110            data.events.pop_front();
111        }
112
113        Ok(offset)
114    }
115
116    async fn send_batch(
117        &self,
118        topic: &TopicName,
119        events: Vec<StreamEvent>,
120    ) -> StreamResult<Vec<Offset>> {
121        let mut offsets = Vec::new();
122        for event in events {
123            let offset = self.send_event(topic, event).await?;
124            offsets.push(offset);
125        }
126        Ok(offsets)
127    }
128
129    async fn receive_events(
130        &self,
131        topic: &TopicName,
132        consumer_group: Option<&ConsumerGroup>,
133        position: StreamPosition,
134        max_events: usize,
135    ) -> StreamResult<Vec<(StreamEvent, Offset)>> {
136        let storage = get_memory_storage();
137        let topic_data = storage
138            .get(topic)
139            .ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
140
141        let mut data = topic_data.write().await;
142
143        let start_offset = if let Some(group) = consumer_group {
144            let group_name = group.name();
145            let current_offset = data.consumer_offsets.get(group_name).copied().unwrap_or(0);
146
147            match position {
148                StreamPosition::Beginning => current_offset, // Use consumer group's current offset
149                StreamPosition::End => data.next_offset,
150                StreamPosition::Offset(offset) => offset,
151            }
152        } else {
153            match position {
154                StreamPosition::Beginning => 0,
155                StreamPosition::End => data.next_offset,
156                StreamPosition::Offset(offset) => offset,
157            }
158        };
159
160        let mut events = Vec::new();
161        for (event, offset) in &data.events {
162            if offset.value() >= start_offset && events.len() < max_events {
163                events.push((event.clone(), *offset));
164            }
165        }
166
167        // Update consumer offset if using consumer group
168        if let Some(group) = consumer_group {
169            if let Some((_, last_offset)) = events.last() {
170                data.consumer_offsets
171                    .insert(group.name().to_string(), last_offset.value() + 1);
172            }
173        }
174
175        Ok(events)
176    }
177
178    async fn commit_offset(
179        &self,
180        topic: &TopicName,
181        consumer_group: &ConsumerGroup,
182        _partition: PartitionId,
183        offset: Offset,
184    ) -> StreamResult<()> {
185        let storage = get_memory_storage();
186        let topic_data = storage
187            .get(topic)
188            .ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
189
190        let mut data = topic_data.write().await;
191        data.consumer_offsets
192            .insert(consumer_group.name().to_string(), offset.value() + 1);
193        Ok(())
194    }
195
196    async fn seek(
197        &self,
198        topic: &TopicName,
199        consumer_group: &ConsumerGroup,
200        _partition: PartitionId,
201        position: StreamPosition,
202    ) -> StreamResult<()> {
203        let storage = get_memory_storage();
204        let topic_data = storage
205            .get(topic)
206            .ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
207
208        let mut data = topic_data.write().await;
209
210        let offset = match position {
211            StreamPosition::Beginning => 0,
212            StreamPosition::End => data.next_offset,
213            StreamPosition::Offset(offset) => offset,
214        };
215
216        data.consumer_offsets
217            .insert(consumer_group.name().to_string(), offset);
218        Ok(())
219    }
220
221    async fn get_consumer_lag(
222        &self,
223        topic: &TopicName,
224        consumer_group: &ConsumerGroup,
225    ) -> StreamResult<HashMap<PartitionId, u64>> {
226        let storage = get_memory_storage();
227        let topic_data = storage
228            .get(topic)
229            .ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
230
231        let data = topic_data.read().await;
232
233        let current_offset = data
234            .consumer_offsets
235            .get(consumer_group.name())
236            .copied()
237            .unwrap_or(0);
238
239        let lag = data.next_offset.saturating_sub(current_offset);
240        let mut result = HashMap::new();
241        result.insert(PartitionId::new(0), lag);
242        Ok(result)
243    }
244
245    async fn get_topic_metadata(&self, topic: &TopicName) -> StreamResult<HashMap<String, String>> {
246        let storage = get_memory_storage();
247        let topic_data = storage
248            .get(topic)
249            .ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
250
251        let data = topic_data.read().await;
252
253        let mut metadata = HashMap::new();
254        metadata.insert("backend".to_string(), "memory".to_string());
255        metadata.insert("event_count".to_string(), data.events.len().to_string());
256        metadata.insert("next_offset".to_string(), data.next_offset.to_string());
257        metadata.insert(
258            "consumer_groups".to_string(),
259            data.consumer_offsets.len().to_string(),
260        );
261
262        Ok(metadata)
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use crate::event::StreamEvent;
270
271    #[tokio::test]
272    async fn test_memory_backend_basic_operations() {
273        // Clear any previous test data
274        clear_memory_storage().await;
275
276        let mut backend = MemoryBackend::new();
277        assert_eq!(backend.name(), "memory");
278
279        // Connect
280        backend.connect().await.unwrap();
281
282        // Create topic with unique name
283        let topic = TopicName::new(format!("test-topic-basic-{}", uuid::Uuid::new_v4()));
284        backend.create_topic(&topic, 1).await.unwrap();
285
286        // List topics - check our topic exists (may have other topics from parallel tests)
287        let topics = backend.list_topics().await.unwrap();
288        assert!(
289            topics.iter().any(|t| t.as_str() == topic.as_str()),
290            "Our topic should exist"
291        );
292
293        // Send event
294        let event = StreamEvent::TripleAdded {
295            subject: "http://example.org/s".to_string(),
296            predicate: "http://example.org/p".to_string(),
297            object: "http://example.org/o".to_string(),
298            graph: None,
299            metadata: crate::event::EventMetadata::default(),
300        };
301
302        let offset = backend.send_event(&topic, event.clone()).await.unwrap();
303        assert_eq!(offset.value(), 0);
304
305        // Receive events
306        let events = backend
307            .receive_events(&topic, None, StreamPosition::Beginning, 10)
308            .await
309            .unwrap();
310        assert_eq!(events.len(), 1);
311
312        // Delete topic
313        backend.delete_topic(&topic).await.unwrap();
314        let topics = backend.list_topics().await.unwrap();
315        assert!(
316            !topics.iter().any(|t| t.as_str() == topic.as_str()),
317            "Our topic should be deleted"
318        );
319    }
320
321    #[tokio::test]
322    async fn test_consumer_groups() {
323        // Clear any previous test data
324        clear_memory_storage().await;
325
326        let mut backend = MemoryBackend::new();
327        backend.connect().await.unwrap();
328
329        let topic = TopicName::new(format!("test-topic-groups-{}", uuid::Uuid::new_v4()));
330        backend.create_topic(&topic, 1).await.unwrap();
331
332        // Send some events
333        for i in 0..5 {
334            let event = StreamEvent::GraphCreated {
335                graph: format!("http://example.org/graph{i}"),
336                metadata: crate::event::EventMetadata::default(),
337            };
338            backend.send_event(&topic, event).await.unwrap();
339        }
340
341        // Create consumer group
342        let group = ConsumerGroup::new("test-group".to_string());
343
344        // First read - should get all events
345        let events = backend
346            .receive_events(&topic, Some(&group), StreamPosition::Beginning, 3)
347            .await
348            .unwrap();
349        assert_eq!(events.len(), 3);
350
351        // Second read - should get remaining events
352        let events = backend
353            .receive_events(&topic, Some(&group), StreamPosition::Beginning, 10)
354            .await
355            .unwrap();
356        assert_eq!(events.len(), 2);
357
358        // Check consumer lag
359        let lag = backend.get_consumer_lag(&topic, &group).await.unwrap();
360        assert_eq!(lag.get(&PartitionId::new(0)), Some(&0));
361    }
362}