Skip to main content

fraiseql_server/subscriptions/
event_bridge.rs

1//! `EventBridge` that connects `ChangeLogListener` with `SubscriptionManager`.
2//!
3//! The `EventBridge` is responsible for:
4//! 1. Spawning `ChangeLogListener` in background
5//! 2. Receiving `EntityEvent` via `mpsc::channel`
6//! 3. Converting `EntityEvent` to `SubscriptionEvent`
7//! 4. Publishing events to `SubscriptionManager`
8//!
9//! Architecture:
10//! ```text
11//! Database (tb_entity_change_log)
12//!     ↓
13//! ChangeLogListener (polls & converts)
14//!     ↓
15//! EventBridge (routes & converts)
16//!     ↓
17//! SubscriptionManager (broadcasts to subscribers)
18//!     ↓
19//! WebSocket Handler (delivers to clients)
20//! ```
21
22use std::sync::Arc;
23
24use fraiseql_core::runtime::subscription::{
25    SubscriptionEvent, SubscriptionManager, SubscriptionOperation,
26};
27use tokio::sync::mpsc;
28use tracing::{debug, info};
29
30/// Configuration for the `EventBridge`
31#[derive(Debug, Clone, Copy)]
32pub struct EventBridgeConfig {
33    /// Channel capacity for event routing
34    pub channel_capacity: usize,
35}
36
37impl EventBridgeConfig {
38    /// Create config with defaults
39    #[must_use]
40    pub const fn new() -> Self {
41        Self {
42            channel_capacity: 100,
43        }
44    }
45
46    /// Set channel capacity
47    #[must_use]
48    pub const fn with_channel_capacity(mut self, capacity: usize) -> Self {
49        self.channel_capacity = capacity;
50        self
51    }
52}
53
54impl Default for EventBridgeConfig {
55    fn default() -> Self {
56        Self::new()
57    }
58}
59
60/// A simple event that `EventBridge` receives from `ChangeLogListener`
61#[derive(Debug, Clone)]
62pub struct EntityEvent {
63    /// Entity type (e.g., "Order", "User")
64    pub entity_type: String,
65
66    /// Entity ID (primary key)
67    pub entity_id: String,
68
69    /// Operation type ("INSERT", "UPDATE", "DELETE")
70    pub operation: String,
71
72    /// Entity data as JSON
73    pub data: serde_json::Value,
74
75    /// Optional old data (for UPDATE operations)
76    pub old_data: Option<serde_json::Value>,
77}
78
79impl EntityEvent {
80    /// Create a new entity event
81    #[must_use]
82    pub fn new(
83        entity_type: impl Into<String>,
84        entity_id: impl Into<String>,
85        operation: impl Into<String>,
86        data: serde_json::Value,
87    ) -> Self {
88        Self {
89            entity_type: entity_type.into(),
90            entity_id: entity_id.into(),
91            operation: operation.into(),
92            data,
93            old_data: None,
94        }
95    }
96
97    /// Add old data for UPDATE operations
98    #[must_use]
99    pub fn with_old_data(mut self, old_data: serde_json::Value) -> Self {
100        self.old_data = Some(old_data);
101        self
102    }
103}
104
105/// `EventBridge` that connects `ChangeLogListener` with `SubscriptionManager`
106pub struct EventBridge {
107    /// Subscription manager for broadcasting events
108    manager: Arc<SubscriptionManager>,
109
110    /// Receiver for entity events from `ChangeLogListener`
111    receiver: mpsc::Receiver<EntityEvent>,
112
113    /// Sender for entity events (used to send events to bridge)
114    sender: mpsc::Sender<EntityEvent>,
115}
116
117impl EventBridge {
118    /// Create a new `EventBridge`
119    #[must_use]
120    pub fn new(manager: Arc<SubscriptionManager>, config: EventBridgeConfig) -> Self {
121        let (sender, receiver) = mpsc::channel(config.channel_capacity);
122
123        Self {
124            manager,
125            receiver,
126            sender,
127        }
128    }
129
130    /// Get a sender for publishing entity events
131    #[must_use]
132    pub fn sender(&self) -> mpsc::Sender<EntityEvent> {
133        self.sender.clone()
134    }
135
136    /// Convert `EntityEvent` to `SubscriptionEvent`
137    fn convert_event(entity_event: EntityEvent) -> SubscriptionEvent {
138        // Convert operation string to SubscriptionOperation
139        let operation = match entity_event.operation.to_uppercase().as_str() {
140            "INSERT" => SubscriptionOperation::Create,
141            "UPDATE" => SubscriptionOperation::Update,
142            "DELETE" => SubscriptionOperation::Delete,
143            _ => {
144                // Default to Create for unknown operations
145                debug!("Unknown operation: {}, defaulting to Create", entity_event.operation);
146                SubscriptionOperation::Create
147            },
148        };
149
150        let mut event = SubscriptionEvent::new(
151            entity_event.entity_type,
152            entity_event.entity_id,
153            operation,
154            entity_event.data,
155        );
156
157        // Add old data if present
158        if let Some(old_data) = entity_event.old_data {
159            event = event.with_old_data(old_data);
160        }
161
162        event
163    }
164
165    /// Run the event bridge loop (spawned in background)
166    #[allow(clippy::cognitive_complexity)] // Reason: event loop with multi-source message routing and reconnection handling
167    pub async fn run(mut self) {
168        info!("EventBridge started");
169
170        while let Some(entity_event) = self.receiver.recv().await {
171            debug!("EventBridge received entity event: {}", entity_event.entity_type);
172
173            // Convert entity event to subscription event
174            let subscription_event = Self::convert_event(entity_event);
175
176            // Publish to subscription manager
177            let matched = self.manager.publish_event(subscription_event);
178
179            if matched > 0 {
180                debug!("EventBridge matched {} subscriptions", matched);
181            }
182        }
183
184        info!("EventBridge stopped");
185    }
186
187    /// Spawn `EventBridge` as a background task
188    pub fn spawn(self) -> tokio::task::JoinHandle<()> {
189        tokio::spawn(self.run())
190    }
191
192    /// Get the sender for sending events to the bridge
193    pub fn get_sender(&self) -> mpsc::Sender<EntityEvent> {
194        self.sender.clone()
195    }
196
197    /// Get the subscription manager (for testing)
198    pub fn manager(&self) -> Arc<SubscriptionManager> {
199        Arc::clone(&self.manager)
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use fraiseql_core::schema::CompiledSchema;
206
207    use super::*;
208
209    #[test]
210    fn test_event_bridge_creation() {
211        let schema = Arc::new(CompiledSchema::new());
212        let manager = Arc::new(SubscriptionManager::new(schema));
213        let config = EventBridgeConfig::new();
214
215        let bridge = EventBridge::new(manager, config);
216
217        // Verify bridge is created
218        assert!(
219            bridge.sender().try_reserve().is_ok(),
220            "event bridge channel should have capacity for at least one message"
221        );
222    }
223
224    #[test]
225    fn test_event_conversion_insert() {
226        let entity_event = EntityEvent::new(
227            "Order",
228            "order_123",
229            "INSERT",
230            serde_json::json!({
231                "id": "order_123",
232                "status": "pending"
233            }),
234        );
235
236        let subscription_event = EventBridge::convert_event(entity_event);
237
238        assert_eq!(subscription_event.entity_type, "Order");
239        assert_eq!(subscription_event.entity_id, "order_123");
240        assert_eq!(subscription_event.operation, SubscriptionOperation::Create);
241    }
242
243    #[test]
244    fn test_event_conversion_update() {
245        let entity_event = EntityEvent::new(
246            "Order",
247            "order_123",
248            "UPDATE",
249            serde_json::json!({
250                "id": "order_123",
251                "status": "shipped"
252            }),
253        );
254
255        let subscription_event = EventBridge::convert_event(entity_event);
256
257        assert_eq!(subscription_event.operation, SubscriptionOperation::Update);
258    }
259
260    #[test]
261    fn test_event_conversion_delete() {
262        let entity_event = EntityEvent::new(
263            "Order",
264            "order_123",
265            "DELETE",
266            serde_json::json!({
267                "id": "order_123"
268            }),
269        );
270
271        let subscription_event = EventBridge::convert_event(entity_event);
272
273        assert_eq!(subscription_event.operation, SubscriptionOperation::Delete);
274    }
275
276    #[test]
277    fn test_event_conversion_with_old_data() {
278        let entity_event = EntityEvent::new(
279            "Order",
280            "order_123",
281            "UPDATE",
282            serde_json::json!({
283                "id": "order_123",
284                "status": "shipped"
285            }),
286        )
287        .with_old_data(serde_json::json!({
288            "id": "order_123",
289            "status": "pending"
290        }));
291
292        let subscription_event = EventBridge::convert_event(entity_event);
293
294        assert!(
295            subscription_event.old_data.is_some(),
296            "update events should carry old_data for delta computation"
297        );
298    }
299
300    #[tokio::test]
301    async fn test_event_bridge_spawning() {
302        let schema = Arc::new(CompiledSchema::new());
303        let manager = Arc::new(SubscriptionManager::new(schema));
304        let config = EventBridgeConfig::new();
305
306        let bridge = EventBridge::new(manager, config);
307        let handle = bridge.spawn();
308
309        // Verify task was spawned
310        assert!(!handle.is_finished());
311
312        // Clean up
313        handle.abort();
314    }
315}