Skip to main content

a3s_event/provider/
memory.rs

1//! In-memory event provider for testing and lightweight usage
2//!
3//! Stores events in memory with no external dependencies.
4//! Events are lost on process restart.
5
6use crate::error::Result;
7use crate::provider::{EventProvider, PendingEvent, ProviderInfo, Subscription};
8use crate::subject::subject_matches;
9use crate::types::{Event, ReceivedEvent};
10use async_trait::async_trait;
11use std::sync::Arc;
12use tokio::sync::{broadcast, RwLock};
13
14/// In-memory event provider configuration
15#[derive(Debug, Clone)]
16pub struct MemoryConfig {
17    /// Subject prefix for events (default: "events")
18    pub subject_prefix: String,
19    /// Maximum events to retain (0 = unlimited)
20    pub max_events: usize,
21    /// Broadcast channel capacity
22    pub channel_capacity: usize,
23}
24
25impl Default for MemoryConfig {
26    fn default() -> Self {
27        Self {
28            subject_prefix: "events".to_string(),
29            max_events: 100_000,
30            channel_capacity: 10_000,
31        }
32    }
33}
34
35/// In-memory event provider
36///
37/// Uses `tokio::sync::broadcast` for pub/sub and a `Vec` for persistence.
38/// Suitable for testing, development, and single-process deployments.
39pub struct MemoryProvider {
40    config: MemoryConfig,
41    events: Arc<RwLock<Vec<Event>>>,
42    sender: broadcast::Sender<Event>,
43    sequence: Arc<std::sync::atomic::AtomicU64>,
44}
45
46impl MemoryProvider {
47    /// Create a new in-memory provider
48    pub fn new(config: MemoryConfig) -> Self {
49        let (sender, _) = broadcast::channel(config.channel_capacity);
50        Self {
51            config,
52            events: Arc::new(RwLock::new(Vec::new())),
53            sender,
54            sequence: Arc::new(std::sync::atomic::AtomicU64::new(1)),
55        }
56    }
57}
58
59impl Default for MemoryProvider {
60    fn default() -> Self {
61        Self::new(MemoryConfig::default())
62    }
63}
64
65#[async_trait]
66impl EventProvider for MemoryProvider {
67    async fn publish(&self, event: &Event) -> Result<u64> {
68        let seq = self
69            .sequence
70            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
71
72        {
73            let mut events = self.events.write().await;
74            events.push(event.clone());
75
76            // Enforce max_events limit
77            if self.config.max_events > 0 && events.len() > self.config.max_events {
78                let drain_count = events.len() - self.config.max_events;
79                events.drain(..drain_count);
80            }
81        }
82
83        // Broadcast to subscribers (ignore send errors — no receivers is fine)
84        let _ = self.sender.send(event.clone());
85
86        tracing::debug!(
87            event_id = %event.id,
88            subject = %event.subject,
89            sequence = seq,
90            "Event published (memory)"
91        );
92
93        Ok(seq)
94    }
95
96    async fn subscribe_durable(
97        &self,
98        _consumer_name: &str,
99        filter_subject: &str,
100    ) -> Result<Box<dyn Subscription>> {
101        // In-memory provider treats durable same as ephemeral
102        self.subscribe(filter_subject).await
103    }
104
105    async fn subscribe(
106        &self,
107        filter_subject: &str,
108    ) -> Result<Box<dyn Subscription>> {
109        let receiver = self.sender.subscribe();
110        Ok(Box::new(MemorySubscription {
111            receiver,
112            filter: filter_subject.to_string(),
113        }))
114    }
115
116    async fn history(
117        &self,
118        filter_subject: Option<&str>,
119        limit: usize,
120    ) -> Result<Vec<Event>> {
121        let events = self.events.read().await;
122        let filtered: Vec<Event> = events
123            .iter()
124            .rev()
125            .filter(|e| {
126                if let Some(filter) = filter_subject {
127                    subject_matches(&e.subject, filter)
128                } else {
129                    true
130                }
131            })
132            .take(limit)
133            .cloned()
134            .collect();
135        Ok(filtered)
136    }
137
138    async fn unsubscribe(&self, _consumer_name: &str) -> Result<()> {
139        // No-op for in-memory provider
140        Ok(())
141    }
142
143    async fn info(&self) -> Result<ProviderInfo> {
144        let events = self.events.read().await;
145        let bytes: u64 = events
146            .iter()
147            .map(|e| serde_json::to_vec(e).map(|v| v.len() as u64).unwrap_or(0))
148            .sum();
149
150        Ok(ProviderInfo {
151            provider: "memory".to_string(),
152            messages: events.len() as u64,
153            bytes,
154            consumers: self.sender.receiver_count(),
155        })
156    }
157
158    fn subject_prefix(&self) -> &str {
159        &self.config.subject_prefix
160    }
161
162    fn name(&self) -> &str {
163        "memory"
164    }
165}
166
167/// In-memory subscription backed by broadcast channel
168struct MemorySubscription {
169    receiver: broadcast::Receiver<Event>,
170    filter: String,
171}
172
173#[async_trait]
174impl Subscription for MemorySubscription {
175    async fn next(&mut self) -> Result<Option<ReceivedEvent>> {
176        loop {
177            match self.receiver.recv().await {
178                Ok(event) => {
179                    if subject_matches(&event.subject, &self.filter) {
180                        return Ok(Some(ReceivedEvent {
181                            event,
182                            sequence: 0,
183                            num_delivered: 1,
184                            stream: "memory".to_string(),
185                        }));
186                    }
187                    // Skip non-matching events
188                }
189                Err(broadcast::error::RecvError::Lagged(n)) => {
190                    tracing::warn!(skipped = n, "Memory subscriber lagged, skipped events");
191                    // Continue receiving
192                }
193                Err(broadcast::error::RecvError::Closed) => {
194                    return Ok(None);
195                }
196            }
197        }
198    }
199
200    async fn next_manual_ack(&mut self) -> Result<Option<PendingEvent>> {
201        match self.next().await? {
202            Some(received) => Ok(Some(PendingEvent::new(
203                received,
204                || Box::pin(async { Ok(()) }),
205                || Box::pin(async { Ok(()) }),
206            ))),
207            None => Ok(None),
208        }
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    #[test]
217    fn test_memory_config_default() {
218        let config = MemoryConfig::default();
219        assert_eq!(config.subject_prefix, "events");
220        assert_eq!(config.max_events, 100_000);
221        assert_eq!(config.channel_capacity, 10_000);
222    }
223
224    #[tokio::test]
225    async fn test_publish_and_history() {
226        let provider = MemoryProvider::default();
227
228        let event = Event::new(
229            "events.market.forex",
230            "market",
231            "Rate change",
232            "test",
233            serde_json::json!({}),
234        );
235        let seq = provider.publish(&event).await.unwrap();
236        assert!(seq > 0);
237
238        let history = provider.history(None, 10).await.unwrap();
239        assert_eq!(history.len(), 1);
240        assert_eq!(history[0].id, event.id);
241    }
242
243    #[tokio::test]
244    async fn test_history_with_filter() {
245        let provider = MemoryProvider::default();
246
247        let e1 = Event::new("events.market.forex", "market", "A", "test", serde_json::json!({}));
248        let e2 = Event::new("events.system.deploy", "system", "B", "test", serde_json::json!({}));
249        provider.publish(&e1).await.unwrap();
250        provider.publish(&e2).await.unwrap();
251
252        let market = provider.history(Some("events.market.>"), 10).await.unwrap();
253        assert_eq!(market.len(), 1);
254        assert_eq!(market[0].category, "market");
255
256        let all = provider.history(None, 10).await.unwrap();
257        assert_eq!(all.len(), 2);
258    }
259
260    #[tokio::test]
261    async fn test_max_events_limit() {
262        let provider = MemoryProvider::new(MemoryConfig {
263            max_events: 3,
264            ..Default::default()
265        });
266
267        for i in 0..5 {
268            let e = Event::new(
269                format!("events.test.{}", i),
270                "test",
271                format!("Event {}", i),
272                "test",
273                serde_json::json!({}),
274            );
275            provider.publish(&e).await.unwrap();
276        }
277
278        let history = provider.history(None, 10).await.unwrap();
279        assert_eq!(history.len(), 3);
280    }
281
282    #[tokio::test]
283    async fn test_subscribe_and_receive() {
284        let provider = MemoryProvider::default();
285        let mut sub = provider.subscribe("events.market.>").await.unwrap();
286
287        let event = Event::new(
288            "events.market.forex",
289            "market",
290            "Rate change",
291            "test",
292            serde_json::json!({}),
293        );
294
295        // Publish in background
296        let provider_clone = {
297            let events = provider.events.clone();
298            let sender = provider.sender.clone();
299            let seq = provider.sequence.clone();
300            (events, sender, seq)
301        };
302
303        let event_clone = event.clone();
304        tokio::spawn(async move {
305            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
306            let _ = provider_clone.1.send(event_clone);
307        });
308
309        let received = tokio::time::timeout(
310            std::time::Duration::from_millis(100),
311            sub.next(),
312        )
313        .await
314        .unwrap()
315        .unwrap()
316        .unwrap();
317
318        assert_eq!(received.event.id, event.id);
319    }
320
321    #[tokio::test]
322    async fn test_provider_info() {
323        let provider = MemoryProvider::default();
324
325        let e = Event::new("events.test.a", "test", "A", "test", serde_json::json!({}));
326        provider.publish(&e).await.unwrap();
327
328        let info = provider.info().await.unwrap();
329        assert_eq!(info.provider, "memory");
330        assert_eq!(info.messages, 1);
331        assert!(info.bytes > 0);
332    }
333
334    #[test]
335    fn test_build_subject() {
336        let provider = MemoryProvider::default();
337        assert_eq!(
338            provider.build_subject("market", "forex.usd"),
339            "events.market.forex.usd"
340        );
341    }
342
343    #[test]
344    fn test_category_subject() {
345        let provider = MemoryProvider::default();
346        assert_eq!(provider.category_subject("market"), "events.market.>");
347    }
348
349    #[test]
350    fn test_provider_name() {
351        let provider = MemoryProvider::default();
352        assert_eq!(provider.name(), "memory");
353    }
354}