Skip to main content

a3s_event/provider/
memory.rs

1//! In-memory event provider for testing and lightweight usage
2//!
3//! Stores events in memory with no external dependencies.
4//! Events are lost on process restart.
5
6use crate::error::Result;
7use crate::provider::{EventProvider, PendingEvent, ProviderInfo, Subscription};
8use crate::subject::subject_matches;
9use crate::types::{Event, ReceivedEvent};
10use async_trait::async_trait;
11use std::sync::Arc;
12use tokio::sync::{broadcast, RwLock};
13
14/// In-memory event provider configuration
15#[derive(Debug, Clone)]
16pub struct MemoryConfig {
17    /// Subject prefix for events (default: "events")
18    pub subject_prefix: String,
19    /// Maximum events to retain (0 = unlimited)
20    pub max_events: usize,
21    /// Broadcast channel capacity
22    pub channel_capacity: usize,
23}
24
25impl Default for MemoryConfig {
26    fn default() -> Self {
27        Self {
28            subject_prefix: "events".to_string(),
29            max_events: 100_000,
30            channel_capacity: 10_000,
31        }
32    }
33}
34
35/// In-memory event provider
36///
37/// Uses `tokio::sync::broadcast` for pub/sub and a `Vec` for persistence.
38/// Suitable for testing, development, and single-process deployments.
39pub struct MemoryProvider {
40    config: MemoryConfig,
41    events: Arc<RwLock<Vec<Event>>>,
42    sender: broadcast::Sender<Event>,
43    sequence: Arc<std::sync::atomic::AtomicU64>,
44}
45
46impl MemoryProvider {
47    /// Create a new in-memory provider
48    pub fn new(config: MemoryConfig) -> Self {
49        let (sender, _) = broadcast::channel(config.channel_capacity);
50        Self {
51            config,
52            events: Arc::new(RwLock::new(Vec::new())),
53            sender,
54            sequence: Arc::new(std::sync::atomic::AtomicU64::new(1)),
55        }
56    }
57}
58
59impl Default for MemoryProvider {
60    fn default() -> Self {
61        Self::new(MemoryConfig::default())
62    }
63}
64
65#[async_trait]
66impl EventProvider for MemoryProvider {
67    async fn publish(&self, event: &Event) -> Result<u64> {
68        let seq = self
69            .sequence
70            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
71
72        {
73            let mut events = self.events.write().await;
74            events.push(event.clone());
75
76            // Enforce max_events limit
77            if self.config.max_events > 0 && events.len() > self.config.max_events {
78                let drain_count = events.len() - self.config.max_events;
79                events.drain(..drain_count);
80            }
81        }
82
83        // Broadcast to subscribers (ignore send errors — no receivers is fine)
84        let _ = self.sender.send(event.clone());
85
86        tracing::debug!(
87            event_id = %event.id,
88            subject = %event.subject,
89            sequence = seq,
90            "Event published (memory)"
91        );
92
93        Ok(seq)
94    }
95
96    async fn subscribe_durable(
97        &self,
98        _consumer_name: &str,
99        filter_subject: &str,
100    ) -> Result<Box<dyn Subscription>> {
101        // In-memory provider treats durable same as ephemeral
102        self.subscribe(filter_subject).await
103    }
104
105    async fn subscribe(
106        &self,
107        filter_subject: &str,
108    ) -> Result<Box<dyn Subscription>> {
109        let receiver = self.sender.subscribe();
110        Ok(Box::new(MemorySubscription {
111            receiver,
112            filter: filter_subject.to_string(),
113        }))
114    }
115
116    async fn history(
117        &self,
118        filter_subject: Option<&str>,
119        limit: usize,
120    ) -> Result<Vec<Event>> {
121        let events = self.events.read().await;
122        let filtered: Vec<Event> = events
123            .iter()
124            .rev()
125            .filter(|e| {
126                if let Some(filter) = filter_subject {
127                    subject_matches(&e.subject, filter)
128                } else {
129                    true
130                }
131            })
132            .take(limit)
133            .cloned()
134            .collect();
135        Ok(filtered)
136    }
137
138    async fn unsubscribe(&self, _consumer_name: &str) -> Result<()> {
139        // No-op for in-memory provider
140        Ok(())
141    }
142
143    async fn info(&self) -> Result<ProviderInfo> {
144        let events = self.events.read().await;
145        let bytes: u64 = events
146            .iter()
147            .map(|e| serde_json::to_vec(e).map(|v| v.len() as u64).unwrap_or(0))
148            .sum();
149
150        Ok(ProviderInfo {
151            provider: "memory".to_string(),
152            messages: events.len() as u64,
153            bytes,
154            consumers: self.sender.receiver_count(),
155        })
156    }
157
158    fn build_subject(&self, category: &str, topic: &str) -> String {
159        format!("{}.{}.{}", self.config.subject_prefix, category, topic)
160    }
161
162    fn category_subject(&self, category: &str) -> String {
163        format!("{}.{}.>", self.config.subject_prefix, category)
164    }
165
166    fn name(&self) -> &str {
167        "memory"
168    }
169}
170
171/// In-memory subscription backed by broadcast channel
172struct MemorySubscription {
173    receiver: broadcast::Receiver<Event>,
174    filter: String,
175}
176
177#[async_trait]
178impl Subscription for MemorySubscription {
179    async fn next(&mut self) -> Result<Option<ReceivedEvent>> {
180        loop {
181            match self.receiver.recv().await {
182                Ok(event) => {
183                    if subject_matches(&event.subject, &self.filter) {
184                        return Ok(Some(ReceivedEvent {
185                            event,
186                            sequence: 0,
187                            num_delivered: 1,
188                            stream: "memory".to_string(),
189                        }));
190                    }
191                    // Skip non-matching events
192                }
193                Err(broadcast::error::RecvError::Lagged(n)) => {
194                    tracing::warn!(skipped = n, "Memory subscriber lagged, skipped events");
195                    // Continue receiving
196                }
197                Err(broadcast::error::RecvError::Closed) => {
198                    return Ok(None);
199                }
200            }
201        }
202    }
203
204    async fn next_manual_ack(&mut self) -> Result<Option<PendingEvent>> {
205        match self.next().await? {
206            Some(received) => Ok(Some(PendingEvent::new(
207                received,
208                || Box::pin(async { Ok(()) }),
209                || Box::pin(async { Ok(()) }),
210            ))),
211            None => Ok(None),
212        }
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219
220    #[test]
221    fn test_memory_config_default() {
222        let config = MemoryConfig::default();
223        assert_eq!(config.subject_prefix, "events");
224        assert_eq!(config.max_events, 100_000);
225        assert_eq!(config.channel_capacity, 10_000);
226    }
227
228    #[tokio::test]
229    async fn test_publish_and_history() {
230        let provider = MemoryProvider::default();
231
232        let event = Event::new(
233            "events.market.forex",
234            "market",
235            "Rate change",
236            "test",
237            serde_json::json!({}),
238        );
239        let seq = provider.publish(&event).await.unwrap();
240        assert!(seq > 0);
241
242        let history = provider.history(None, 10).await.unwrap();
243        assert_eq!(history.len(), 1);
244        assert_eq!(history[0].id, event.id);
245    }
246
247    #[tokio::test]
248    async fn test_history_with_filter() {
249        let provider = MemoryProvider::default();
250
251        let e1 = Event::new("events.market.forex", "market", "A", "test", serde_json::json!({}));
252        let e2 = Event::new("events.system.deploy", "system", "B", "test", serde_json::json!({}));
253        provider.publish(&e1).await.unwrap();
254        provider.publish(&e2).await.unwrap();
255
256        let market = provider.history(Some("events.market.>"), 10).await.unwrap();
257        assert_eq!(market.len(), 1);
258        assert_eq!(market[0].category, "market");
259
260        let all = provider.history(None, 10).await.unwrap();
261        assert_eq!(all.len(), 2);
262    }
263
264    #[tokio::test]
265    async fn test_max_events_limit() {
266        let provider = MemoryProvider::new(MemoryConfig {
267            max_events: 3,
268            ..Default::default()
269        });
270
271        for i in 0..5 {
272            let e = Event::new(
273                format!("events.test.{}", i),
274                "test",
275                format!("Event {}", i),
276                "test",
277                serde_json::json!({}),
278            );
279            provider.publish(&e).await.unwrap();
280        }
281
282        let history = provider.history(None, 10).await.unwrap();
283        assert_eq!(history.len(), 3);
284    }
285
286    #[tokio::test]
287    async fn test_subscribe_and_receive() {
288        let provider = MemoryProvider::default();
289        let mut sub = provider.subscribe("events.market.>").await.unwrap();
290
291        let event = Event::new(
292            "events.market.forex",
293            "market",
294            "Rate change",
295            "test",
296            serde_json::json!({}),
297        );
298
299        // Publish in background
300        let provider_clone = {
301            let events = provider.events.clone();
302            let sender = provider.sender.clone();
303            let seq = provider.sequence.clone();
304            (events, sender, seq)
305        };
306
307        let event_clone = event.clone();
308        tokio::spawn(async move {
309            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
310            let _ = provider_clone.1.send(event_clone);
311        });
312
313        let received = tokio::time::timeout(
314            std::time::Duration::from_millis(100),
315            sub.next(),
316        )
317        .await
318        .unwrap()
319        .unwrap()
320        .unwrap();
321
322        assert_eq!(received.event.id, event.id);
323    }
324
325    #[tokio::test]
326    async fn test_provider_info() {
327        let provider = MemoryProvider::default();
328
329        let e = Event::new("events.test.a", "test", "A", "test", serde_json::json!({}));
330        provider.publish(&e).await.unwrap();
331
332        let info = provider.info().await.unwrap();
333        assert_eq!(info.provider, "memory");
334        assert_eq!(info.messages, 1);
335        assert!(info.bytes > 0);
336    }
337
338    #[test]
339    fn test_build_subject() {
340        let provider = MemoryProvider::default();
341        assert_eq!(
342            provider.build_subject("market", "forex.usd"),
343            "events.market.forex.usd"
344        );
345    }
346
347    #[test]
348    fn test_category_subject() {
349        let provider = MemoryProvider::default();
350        assert_eq!(provider.category_subject("market"), "events.market.>");
351    }
352
353    #[test]
354    fn test_provider_name() {
355        let provider = MemoryProvider::default();
356        assert_eq!(provider.name(), "memory");
357    }
358}