zopp_events_memory/
lib.rs1use async_trait::async_trait;
11use dashmap::DashMap;
12use std::sync::Arc;
13use tokio::sync::broadcast;
14use tokio_stream::wrappers::BroadcastStream;
15use tokio_stream::StreamExt;
16use zopp_events::{EventBus, EventBusError, EventStream, SecretChangeEvent};
17use zopp_storage::EnvironmentId;
18
19const CHANNEL_CAPACITY: usize = 100;
20
21pub struct MemoryEventBus {
26 channels: Arc<DashMap<EnvironmentId, broadcast::Sender<SecretChangeEvent>>>,
27}
28
29impl MemoryEventBus {
30 pub fn new() -> Self {
31 Self {
32 channels: Arc::new(DashMap::new()),
33 }
34 }
35
36 fn get_or_create_channel(
38 &self,
39 env_id: &EnvironmentId,
40 ) -> broadcast::Sender<SecretChangeEvent> {
41 self.channels
42 .entry(env_id.clone())
43 .or_insert_with(|| broadcast::channel(CHANNEL_CAPACITY).0)
44 .clone()
45 }
46}
47
48impl Default for MemoryEventBus {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54#[async_trait]
55impl EventBus for MemoryEventBus {
56 async fn publish(
57 &self,
58 env_id: &EnvironmentId,
59 event: SecretChangeEvent,
60 ) -> Result<(), EventBusError> {
61 let tx = self.get_or_create_channel(env_id);
62
63 let _ = tx.send(event);
65
66 Ok(())
67 }
68
69 async fn subscribe(&self, env_id: &EnvironmentId) -> Result<EventStream, EventBusError> {
70 let tx = self.get_or_create_channel(env_id);
71 let rx = tx.subscribe();
72
73 let stream = BroadcastStream::new(rx).filter_map(|result| result.ok());
77
78 Ok(Box::pin(stream))
79 }
80}
81
82#[cfg(test)]
83mod tests {
84 use super::*;
85 use futures::StreamExt;
86 use uuid::Uuid;
87 use zopp_events::EventType;
88 use zopp_storage::EnvironmentId;
89
90 #[tokio::test]
91 async fn publish_and_subscribe() {
92 let bus = MemoryEventBus::new();
93 let env_id = EnvironmentId(Uuid::new_v4());
94
95 let mut stream = bus.subscribe(&env_id).await.unwrap();
97
98 let event = SecretChangeEvent {
100 event_type: EventType::Created,
101 key: "API_KEY".to_string(),
102 version: 1,
103 timestamp: 12345,
104 };
105 bus.publish(&env_id, event.clone()).await.unwrap();
106
107 let received = tokio::time::timeout(std::time::Duration::from_millis(100), stream.next())
109 .await
110 .expect("timeout")
111 .expect("stream ended");
112
113 assert_eq!(received.key, "API_KEY");
114 assert_eq!(received.version, 1);
115 assert_eq!(received.event_type, EventType::Created);
116 }
117
118 #[tokio::test]
119 async fn multiple_subscribers() {
120 let bus = MemoryEventBus::new();
121 let env_id = EnvironmentId(Uuid::new_v4());
122
123 let mut stream1 = bus.subscribe(&env_id).await.unwrap();
125 let mut stream2 = bus.subscribe(&env_id).await.unwrap();
126
127 let event = SecretChangeEvent {
129 event_type: EventType::Updated,
130 key: "SECRET".to_string(),
131 version: 2,
132 timestamp: 67890,
133 };
134 bus.publish(&env_id, event).await.unwrap();
135
136 let recv1 = stream1.next().await.unwrap();
138 let recv2 = stream2.next().await.unwrap();
139
140 assert_eq!(recv1.key, "SECRET");
141 assert_eq!(recv2.key, "SECRET");
142 }
143
144 #[tokio::test]
145 async fn publish_before_subscribe_is_lost() {
146 let bus = MemoryEventBus::new();
147 let env_id = EnvironmentId(Uuid::new_v4());
148
149 let event = SecretChangeEvent {
151 event_type: EventType::Deleted,
152 key: "OLD".to_string(),
153 version: 3,
154 timestamp: 99999,
155 };
156 bus.publish(&env_id, event).await.unwrap();
157
158 let mut stream = bus.subscribe(&env_id).await.unwrap();
160
161 let result =
163 tokio::time::timeout(std::time::Duration::from_millis(50), stream.next()).await;
164
165 assert!(
166 result.is_err(),
167 "Should not receive event published before subscription"
168 );
169 }
170}