1use crate::error::Result;
7use crate::provider::{EventProvider, PendingEvent, ProviderInfo, Subscription};
8use crate::subject::subject_matches;
9use crate::types::{Event, ReceivedEvent};
10use async_trait::async_trait;
11use std::sync::Arc;
12use tokio::sync::{broadcast, RwLock};
13
14#[derive(Debug, Clone)]
16pub struct MemoryConfig {
17 pub subject_prefix: String,
19 pub max_events: usize,
21 pub channel_capacity: usize,
23}
24
25impl Default for MemoryConfig {
26 fn default() -> Self {
27 Self {
28 subject_prefix: "events".to_string(),
29 max_events: 100_000,
30 channel_capacity: 10_000,
31 }
32 }
33}
34
35pub struct MemoryProvider {
40 config: MemoryConfig,
41 events: Arc<RwLock<Vec<Event>>>,
42 sender: broadcast::Sender<Event>,
43 sequence: Arc<std::sync::atomic::AtomicU64>,
44}
45
46impl MemoryProvider {
47 pub fn new(config: MemoryConfig) -> Self {
49 let (sender, _) = broadcast::channel(config.channel_capacity);
50 Self {
51 config,
52 events: Arc::new(RwLock::new(Vec::new())),
53 sender,
54 sequence: Arc::new(std::sync::atomic::AtomicU64::new(1)),
55 }
56 }
57}
58
59impl Default for MemoryProvider {
60 fn default() -> Self {
61 Self::new(MemoryConfig::default())
62 }
63}
64
65#[async_trait]
66impl EventProvider for MemoryProvider {
67 async fn publish(&self, event: &Event) -> Result<u64> {
68 let seq = self
69 .sequence
70 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
71
72 {
73 let mut events = self.events.write().await;
74 events.push(event.clone());
75
76 if self.config.max_events > 0 && events.len() > self.config.max_events {
78 let drain_count = events.len() - self.config.max_events;
79 events.drain(..drain_count);
80 }
81 }
82
83 let _ = self.sender.send(event.clone());
85
86 tracing::debug!(
87 event_id = %event.id,
88 subject = %event.subject,
89 sequence = seq,
90 "Event published (memory)"
91 );
92
93 Ok(seq)
94 }
95
96 async fn subscribe_durable(
97 &self,
98 _consumer_name: &str,
99 filter_subject: &str,
100 ) -> Result<Box<dyn Subscription>> {
101 self.subscribe(filter_subject).await
103 }
104
105 async fn subscribe(
106 &self,
107 filter_subject: &str,
108 ) -> Result<Box<dyn Subscription>> {
109 let receiver = self.sender.subscribe();
110 Ok(Box::new(MemorySubscription {
111 receiver,
112 filter: filter_subject.to_string(),
113 }))
114 }
115
116 async fn history(
117 &self,
118 filter_subject: Option<&str>,
119 limit: usize,
120 ) -> Result<Vec<Event>> {
121 let events = self.events.read().await;
122 let filtered: Vec<Event> = events
123 .iter()
124 .rev()
125 .filter(|e| {
126 if let Some(filter) = filter_subject {
127 subject_matches(&e.subject, filter)
128 } else {
129 true
130 }
131 })
132 .take(limit)
133 .cloned()
134 .collect();
135 Ok(filtered)
136 }
137
138 async fn unsubscribe(&self, _consumer_name: &str) -> Result<()> {
139 Ok(())
141 }
142
143 async fn info(&self) -> Result<ProviderInfo> {
144 let events = self.events.read().await;
145 let bytes: u64 = events
146 .iter()
147 .map(|e| serde_json::to_vec(e).map(|v| v.len() as u64).unwrap_or(0))
148 .sum();
149
150 Ok(ProviderInfo {
151 provider: "memory".to_string(),
152 messages: events.len() as u64,
153 bytes,
154 consumers: self.sender.receiver_count(),
155 })
156 }
157
158 fn build_subject(&self, category: &str, topic: &str) -> String {
159 format!("{}.{}.{}", self.config.subject_prefix, category, topic)
160 }
161
162 fn category_subject(&self, category: &str) -> String {
163 format!("{}.{}.>", self.config.subject_prefix, category)
164 }
165
166 fn name(&self) -> &str {
167 "memory"
168 }
169}
170
171struct MemorySubscription {
173 receiver: broadcast::Receiver<Event>,
174 filter: String,
175}
176
177#[async_trait]
178impl Subscription for MemorySubscription {
179 async fn next(&mut self) -> Result<Option<ReceivedEvent>> {
180 loop {
181 match self.receiver.recv().await {
182 Ok(event) => {
183 if subject_matches(&event.subject, &self.filter) {
184 return Ok(Some(ReceivedEvent {
185 event,
186 sequence: 0,
187 num_delivered: 1,
188 stream: "memory".to_string(),
189 }));
190 }
191 }
193 Err(broadcast::error::RecvError::Lagged(n)) => {
194 tracing::warn!(skipped = n, "Memory subscriber lagged, skipped events");
195 }
197 Err(broadcast::error::RecvError::Closed) => {
198 return Ok(None);
199 }
200 }
201 }
202 }
203
204 async fn next_manual_ack(&mut self) -> Result<Option<PendingEvent>> {
205 match self.next().await? {
206 Some(received) => Ok(Some(PendingEvent::new(
207 received,
208 || Box::pin(async { Ok(()) }),
209 || Box::pin(async { Ok(()) }),
210 ))),
211 None => Ok(None),
212 }
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219
220 #[test]
221 fn test_memory_config_default() {
222 let config = MemoryConfig::default();
223 assert_eq!(config.subject_prefix, "events");
224 assert_eq!(config.max_events, 100_000);
225 assert_eq!(config.channel_capacity, 10_000);
226 }
227
228 #[tokio::test]
229 async fn test_publish_and_history() {
230 let provider = MemoryProvider::default();
231
232 let event = Event::new(
233 "events.market.forex",
234 "market",
235 "Rate change",
236 "test",
237 serde_json::json!({}),
238 );
239 let seq = provider.publish(&event).await.unwrap();
240 assert!(seq > 0);
241
242 let history = provider.history(None, 10).await.unwrap();
243 assert_eq!(history.len(), 1);
244 assert_eq!(history[0].id, event.id);
245 }
246
247 #[tokio::test]
248 async fn test_history_with_filter() {
249 let provider = MemoryProvider::default();
250
251 let e1 = Event::new("events.market.forex", "market", "A", "test", serde_json::json!({}));
252 let e2 = Event::new("events.system.deploy", "system", "B", "test", serde_json::json!({}));
253 provider.publish(&e1).await.unwrap();
254 provider.publish(&e2).await.unwrap();
255
256 let market = provider.history(Some("events.market.>"), 10).await.unwrap();
257 assert_eq!(market.len(), 1);
258 assert_eq!(market[0].category, "market");
259
260 let all = provider.history(None, 10).await.unwrap();
261 assert_eq!(all.len(), 2);
262 }
263
264 #[tokio::test]
265 async fn test_max_events_limit() {
266 let provider = MemoryProvider::new(MemoryConfig {
267 max_events: 3,
268 ..Default::default()
269 });
270
271 for i in 0..5 {
272 let e = Event::new(
273 format!("events.test.{}", i),
274 "test",
275 format!("Event {}", i),
276 "test",
277 serde_json::json!({}),
278 );
279 provider.publish(&e).await.unwrap();
280 }
281
282 let history = provider.history(None, 10).await.unwrap();
283 assert_eq!(history.len(), 3);
284 }
285
286 #[tokio::test]
287 async fn test_subscribe_and_receive() {
288 let provider = MemoryProvider::default();
289 let mut sub = provider.subscribe("events.market.>").await.unwrap();
290
291 let event = Event::new(
292 "events.market.forex",
293 "market",
294 "Rate change",
295 "test",
296 serde_json::json!({}),
297 );
298
299 let provider_clone = {
301 let events = provider.events.clone();
302 let sender = provider.sender.clone();
303 let seq = provider.sequence.clone();
304 (events, sender, seq)
305 };
306
307 let event_clone = event.clone();
308 tokio::spawn(async move {
309 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
310 let _ = provider_clone.1.send(event_clone);
311 });
312
313 let received = tokio::time::timeout(
314 std::time::Duration::from_millis(100),
315 sub.next(),
316 )
317 .await
318 .unwrap()
319 .unwrap()
320 .unwrap();
321
322 assert_eq!(received.event.id, event.id);
323 }
324
325 #[tokio::test]
326 async fn test_provider_info() {
327 let provider = MemoryProvider::default();
328
329 let e = Event::new("events.test.a", "test", "A", "test", serde_json::json!({}));
330 provider.publish(&e).await.unwrap();
331
332 let info = provider.info().await.unwrap();
333 assert_eq!(info.provider, "memory");
334 assert_eq!(info.messages, 1);
335 assert!(info.bytes > 0);
336 }
337
338 #[test]
339 fn test_build_subject() {
340 let provider = MemoryProvider::default();
341 assert_eq!(
342 provider.build_subject("market", "forex.usd"),
343 "events.market.forex.usd"
344 );
345 }
346
347 #[test]
348 fn test_category_subject() {
349 let provider = MemoryProvider::default();
350 assert_eq!(provider.category_subject("market"), "events.market.>");
351 }
352
353 #[test]
354 fn test_provider_name() {
355 let provider = MemoryProvider::default();
356 assert_eq!(provider.name(), "memory");
357 }
358}