1use crate::error::Result;
7use crate::provider::{EventProvider, PendingEvent, ProviderInfo, Subscription};
8use crate::subject::subject_matches;
9use crate::types::{Event, ReceivedEvent};
10use async_trait::async_trait;
11use std::sync::Arc;
12use tokio::sync::{broadcast, RwLock};
13
14#[derive(Debug, Clone)]
16pub struct MemoryConfig {
17 pub subject_prefix: String,
19 pub max_events: usize,
21 pub channel_capacity: usize,
23}
24
25impl Default for MemoryConfig {
26 fn default() -> Self {
27 Self {
28 subject_prefix: "events".to_string(),
29 max_events: 100_000,
30 channel_capacity: 10_000,
31 }
32 }
33}
34
35pub struct MemoryProvider {
40 config: MemoryConfig,
41 events: Arc<RwLock<Vec<Event>>>,
42 sender: broadcast::Sender<Event>,
43 sequence: Arc<std::sync::atomic::AtomicU64>,
44}
45
46impl MemoryProvider {
47 pub fn new(config: MemoryConfig) -> Self {
49 let (sender, _) = broadcast::channel(config.channel_capacity);
50 Self {
51 config,
52 events: Arc::new(RwLock::new(Vec::new())),
53 sender,
54 sequence: Arc::new(std::sync::atomic::AtomicU64::new(1)),
55 }
56 }
57}
58
59impl Default for MemoryProvider {
60 fn default() -> Self {
61 Self::new(MemoryConfig::default())
62 }
63}
64
65#[async_trait]
66impl EventProvider for MemoryProvider {
67 async fn publish(&self, event: &Event) -> Result<u64> {
68 let seq = self
69 .sequence
70 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
71
72 {
73 let mut events = self.events.write().await;
74 events.push(event.clone());
75
76 if self.config.max_events > 0 && events.len() > self.config.max_events {
78 let drain_count = events.len() - self.config.max_events;
79 events.drain(..drain_count);
80 }
81 }
82
83 let _ = self.sender.send(event.clone());
85
86 tracing::debug!(
87 event_id = %event.id,
88 subject = %event.subject,
89 sequence = seq,
90 "Event published (memory)"
91 );
92
93 Ok(seq)
94 }
95
96 async fn subscribe_durable(
97 &self,
98 _consumer_name: &str,
99 filter_subject: &str,
100 ) -> Result<Box<dyn Subscription>> {
101 self.subscribe(filter_subject).await
103 }
104
105 async fn subscribe(
106 &self,
107 filter_subject: &str,
108 ) -> Result<Box<dyn Subscription>> {
109 let receiver = self.sender.subscribe();
110 Ok(Box::new(MemorySubscription {
111 receiver,
112 filter: filter_subject.to_string(),
113 }))
114 }
115
116 async fn history(
117 &self,
118 filter_subject: Option<&str>,
119 limit: usize,
120 ) -> Result<Vec<Event>> {
121 let events = self.events.read().await;
122 let filtered: Vec<Event> = events
123 .iter()
124 .rev()
125 .filter(|e| {
126 if let Some(filter) = filter_subject {
127 subject_matches(&e.subject, filter)
128 } else {
129 true
130 }
131 })
132 .take(limit)
133 .cloned()
134 .collect();
135 Ok(filtered)
136 }
137
138 async fn unsubscribe(&self, _consumer_name: &str) -> Result<()> {
139 Ok(())
141 }
142
143 async fn info(&self) -> Result<ProviderInfo> {
144 let events = self.events.read().await;
145 let bytes: u64 = events
146 .iter()
147 .map(|e| serde_json::to_vec(e).map(|v| v.len() as u64).unwrap_or(0))
148 .sum();
149
150 Ok(ProviderInfo {
151 provider: "memory".to_string(),
152 messages: events.len() as u64,
153 bytes,
154 consumers: self.sender.receiver_count(),
155 })
156 }
157
158 fn subject_prefix(&self) -> &str {
159 &self.config.subject_prefix
160 }
161
162 fn name(&self) -> &str {
163 "memory"
164 }
165}
166
167struct MemorySubscription {
169 receiver: broadcast::Receiver<Event>,
170 filter: String,
171}
172
173#[async_trait]
174impl Subscription for MemorySubscription {
175 async fn next(&mut self) -> Result<Option<ReceivedEvent>> {
176 loop {
177 match self.receiver.recv().await {
178 Ok(event) => {
179 if subject_matches(&event.subject, &self.filter) {
180 return Ok(Some(ReceivedEvent {
181 event,
182 sequence: 0,
183 num_delivered: 1,
184 stream: "memory".to_string(),
185 }));
186 }
187 }
189 Err(broadcast::error::RecvError::Lagged(n)) => {
190 tracing::warn!(skipped = n, "Memory subscriber lagged, skipped events");
191 }
193 Err(broadcast::error::RecvError::Closed) => {
194 return Ok(None);
195 }
196 }
197 }
198 }
199
200 async fn next_manual_ack(&mut self) -> Result<Option<PendingEvent>> {
201 match self.next().await? {
202 Some(received) => Ok(Some(PendingEvent::new(
203 received,
204 || Box::pin(async { Ok(()) }),
205 || Box::pin(async { Ok(()) }),
206 ))),
207 None => Ok(None),
208 }
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215
216 #[test]
217 fn test_memory_config_default() {
218 let config = MemoryConfig::default();
219 assert_eq!(config.subject_prefix, "events");
220 assert_eq!(config.max_events, 100_000);
221 assert_eq!(config.channel_capacity, 10_000);
222 }
223
224 #[tokio::test]
225 async fn test_publish_and_history() {
226 let provider = MemoryProvider::default();
227
228 let event = Event::new(
229 "events.market.forex",
230 "market",
231 "Rate change",
232 "test",
233 serde_json::json!({}),
234 );
235 let seq = provider.publish(&event).await.unwrap();
236 assert!(seq > 0);
237
238 let history = provider.history(None, 10).await.unwrap();
239 assert_eq!(history.len(), 1);
240 assert_eq!(history[0].id, event.id);
241 }
242
243 #[tokio::test]
244 async fn test_history_with_filter() {
245 let provider = MemoryProvider::default();
246
247 let e1 = Event::new("events.market.forex", "market", "A", "test", serde_json::json!({}));
248 let e2 = Event::new("events.system.deploy", "system", "B", "test", serde_json::json!({}));
249 provider.publish(&e1).await.unwrap();
250 provider.publish(&e2).await.unwrap();
251
252 let market = provider.history(Some("events.market.>"), 10).await.unwrap();
253 assert_eq!(market.len(), 1);
254 assert_eq!(market[0].category, "market");
255
256 let all = provider.history(None, 10).await.unwrap();
257 assert_eq!(all.len(), 2);
258 }
259
260 #[tokio::test]
261 async fn test_max_events_limit() {
262 let provider = MemoryProvider::new(MemoryConfig {
263 max_events: 3,
264 ..Default::default()
265 });
266
267 for i in 0..5 {
268 let e = Event::new(
269 format!("events.test.{}", i),
270 "test",
271 format!("Event {}", i),
272 "test",
273 serde_json::json!({}),
274 );
275 provider.publish(&e).await.unwrap();
276 }
277
278 let history = provider.history(None, 10).await.unwrap();
279 assert_eq!(history.len(), 3);
280 }
281
282 #[tokio::test]
283 async fn test_subscribe_and_receive() {
284 let provider = MemoryProvider::default();
285 let mut sub = provider.subscribe("events.market.>").await.unwrap();
286
287 let event = Event::new(
288 "events.market.forex",
289 "market",
290 "Rate change",
291 "test",
292 serde_json::json!({}),
293 );
294
295 let provider_clone = {
297 let events = provider.events.clone();
298 let sender = provider.sender.clone();
299 let seq = provider.sequence.clone();
300 (events, sender, seq)
301 };
302
303 let event_clone = event.clone();
304 tokio::spawn(async move {
305 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
306 let _ = provider_clone.1.send(event_clone);
307 });
308
309 let received = tokio::time::timeout(
310 std::time::Duration::from_millis(100),
311 sub.next(),
312 )
313 .await
314 .unwrap()
315 .unwrap()
316 .unwrap();
317
318 assert_eq!(received.event.id, event.id);
319 }
320
321 #[tokio::test]
322 async fn test_provider_info() {
323 let provider = MemoryProvider::default();
324
325 let e = Event::new("events.test.a", "test", "A", "test", serde_json::json!({}));
326 provider.publish(&e).await.unwrap();
327
328 let info = provider.info().await.unwrap();
329 assert_eq!(info.provider, "memory");
330 assert_eq!(info.messages, 1);
331 assert!(info.bytes > 0);
332 }
333
334 #[test]
335 fn test_build_subject() {
336 let provider = MemoryProvider::default();
337 assert_eq!(
338 provider.build_subject("market", "forex.usd"),
339 "events.market.forex.usd"
340 );
341 }
342
343 #[test]
344 fn test_category_subject() {
345 let provider = MemoryProvider::default();
346 assert_eq!(provider.category_subject("market"), "events.market.>");
347 }
348
349 #[test]
350 fn test_provider_name() {
351 let provider = MemoryProvider::default();
352 assert_eq!(provider.name(), "memory");
353 }
354}