zopp_events_memory/
lib.rs

1//! In-memory event bus implementation using tokio broadcast channels.
2//!
3//! This implementation is suitable for:
4//! - Single server deployments
5//! - Development and testing
6//! - Simple deployments that don't require horizontal scaling
7//!
8//! For multi-replica deployments, use Redis or Postgres event bus instead.
9
10use async_trait::async_trait;
11use dashmap::DashMap;
12use std::sync::Arc;
13use tokio::sync::broadcast;
14use tokio_stream::wrappers::BroadcastStream;
15use tokio_stream::StreamExt;
16use zopp_events::{EventBus, EventBusError, EventStream, SecretChangeEvent};
17use zopp_storage::EnvironmentId;
18
19const CHANNEL_CAPACITY: usize = 100;
20
21/// In-memory event bus using tokio broadcast channels.
22///
23/// Events are only broadcast within a single process.
24/// If you have multiple server replicas, they will NOT receive each other's events.
25pub struct MemoryEventBus {
26    channels: Arc<DashMap<EnvironmentId, broadcast::Sender<SecretChangeEvent>>>,
27}
28
29impl MemoryEventBus {
30    pub fn new() -> Self {
31        Self {
32            channels: Arc::new(DashMap::new()),
33        }
34    }
35
36    /// Get or create a broadcast channel for an environment
37    fn get_or_create_channel(
38        &self,
39        env_id: &EnvironmentId,
40    ) -> broadcast::Sender<SecretChangeEvent> {
41        self.channels
42            .entry(env_id.clone())
43            .or_insert_with(|| broadcast::channel(CHANNEL_CAPACITY).0)
44            .clone()
45    }
46}
47
48impl Default for MemoryEventBus {
49    fn default() -> Self {
50        Self::new()
51    }
52}
53
54#[async_trait]
55impl EventBus for MemoryEventBus {
56    async fn publish(
57        &self,
58        env_id: &EnvironmentId,
59        event: SecretChangeEvent,
60    ) -> Result<(), EventBusError> {
61        let tx = self.get_or_create_channel(env_id);
62
63        // Ignore error if no receivers (this is fine)
64        let _ = tx.send(event);
65
66        Ok(())
67    }
68
69    async fn subscribe(&self, env_id: &EnvironmentId) -> Result<EventStream, EventBusError> {
70        let tx = self.get_or_create_channel(env_id);
71        let rx = tx.subscribe();
72
73        // Convert BroadcastStream to our EventStream type
74        // Filter out lagged errors (happens when receiver can't keep up)
75        // Client fell behind, they should do a full resync
76        let stream = BroadcastStream::new(rx).filter_map(|result| result.ok());
77
78        Ok(Box::pin(stream))
79    }
80}
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85    use futures::StreamExt;
86    use uuid::Uuid;
87    use zopp_events::EventType;
88    use zopp_storage::EnvironmentId;
89
90    #[tokio::test]
91    async fn publish_and_subscribe() {
92        let bus = MemoryEventBus::new();
93        let env_id = EnvironmentId(Uuid::new_v4());
94
95        // Subscribe first
96        let mut stream = bus.subscribe(&env_id).await.unwrap();
97
98        // Publish event
99        let event = SecretChangeEvent {
100            event_type: EventType::Created,
101            key: "API_KEY".to_string(),
102            version: 1,
103            timestamp: 12345,
104        };
105        bus.publish(&env_id, event.clone()).await.unwrap();
106
107        // Receive event
108        let received = tokio::time::timeout(std::time::Duration::from_millis(100), stream.next())
109            .await
110            .expect("timeout")
111            .expect("stream ended");
112
113        assert_eq!(received.key, "API_KEY");
114        assert_eq!(received.version, 1);
115        assert_eq!(received.event_type, EventType::Created);
116    }
117
118    #[tokio::test]
119    async fn multiple_subscribers() {
120        let bus = MemoryEventBus::new();
121        let env_id = EnvironmentId(Uuid::new_v4());
122
123        // Multiple subscribers
124        let mut stream1 = bus.subscribe(&env_id).await.unwrap();
125        let mut stream2 = bus.subscribe(&env_id).await.unwrap();
126
127        // Publish event
128        let event = SecretChangeEvent {
129            event_type: EventType::Updated,
130            key: "SECRET".to_string(),
131            version: 2,
132            timestamp: 67890,
133        };
134        bus.publish(&env_id, event).await.unwrap();
135
136        // Both should receive
137        let recv1 = stream1.next().await.unwrap();
138        let recv2 = stream2.next().await.unwrap();
139
140        assert_eq!(recv1.key, "SECRET");
141        assert_eq!(recv2.key, "SECRET");
142    }
143
144    #[tokio::test]
145    async fn publish_before_subscribe_is_lost() {
146        let bus = MemoryEventBus::new();
147        let env_id = EnvironmentId(Uuid::new_v4());
148
149        // Publish before subscribing
150        let event = SecretChangeEvent {
151            event_type: EventType::Deleted,
152            key: "OLD".to_string(),
153            version: 3,
154            timestamp: 99999,
155        };
156        bus.publish(&env_id, event).await.unwrap();
157
158        // Subscribe after - should not receive the old event
159        let mut stream = bus.subscribe(&env_id).await.unwrap();
160
161        // Should timeout (no event)
162        let result =
163            tokio::time::timeout(std::time::Duration::from_millis(50), stream.next()).await;
164
165        assert!(
166            result.is_err(),
167            "Should not receive event published before subscription"
168        );
169    }
170}