Skip to main content

fraiseql_server/subscriptions/
event_bridge.rs

1//! EventBridge that connects ChangeLogListener with SubscriptionManager.
2//!
3//! The EventBridge is responsible for:
4//! 1. Spawning ChangeLogListener in background
5//! 2. Receiving EntityEvent via mpsc::channel
6//! 3. Converting EntityEvent to SubscriptionEvent
7//! 4. Publishing events to SubscriptionManager
8//!
9//! Architecture:
10//! ```text
11//! Database (tb_entity_change_log)
12//!     ↓
13//! ChangeLogListener (polls & converts)
14//!     ↓
15//! EventBridge (routes & converts)
16//!     ↓
17//! SubscriptionManager (broadcasts to subscribers)
18//!     ↓
19//! WebSocket Handler (delivers to clients)
20//! ```
21
22use std::sync::Arc;
23
24use fraiseql_core::runtime::subscription::{
25    SubscriptionEvent, SubscriptionManager, SubscriptionOperation,
26};
27use tokio::sync::mpsc;
28use tracing::{debug, info};
29
30/// Configuration for the EventBridge
31#[derive(Debug, Clone)]
32pub struct EventBridgeConfig {
33    /// Channel capacity for event routing
34    pub channel_capacity: usize,
35}
36
37impl EventBridgeConfig {
38    /// Create config with defaults
39    #[must_use]
40    pub fn new() -> Self {
41        Self {
42            channel_capacity: 100,
43        }
44    }
45
46    /// Set channel capacity
47    #[must_use]
48    pub fn with_channel_capacity(mut self, capacity: usize) -> Self {
49        self.channel_capacity = capacity;
50        self
51    }
52}
53
54impl Default for EventBridgeConfig {
55    fn default() -> Self {
56        Self::new()
57    }
58}
59
60/// A simple event that EventBridge receives from ChangeLogListener
61#[derive(Debug, Clone)]
62pub struct EntityEvent {
63    /// Entity type (e.g., "Order", "User")
64    pub entity_type: String,
65
66    /// Entity ID (primary key)
67    pub entity_id: String,
68
69    /// Operation type ("INSERT", "UPDATE", "DELETE")
70    pub operation: String,
71
72    /// Entity data as JSON
73    pub data: serde_json::Value,
74
75    /// Optional old data (for UPDATE operations)
76    pub old_data: Option<serde_json::Value>,
77}
78
79impl EntityEvent {
80    /// Create a new entity event
81    #[must_use]
82    pub fn new(
83        entity_type: impl Into<String>,
84        entity_id: impl Into<String>,
85        operation: impl Into<String>,
86        data: serde_json::Value,
87    ) -> Self {
88        Self {
89            entity_type: entity_type.into(),
90            entity_id: entity_id.into(),
91            operation: operation.into(),
92            data,
93            old_data: None,
94        }
95    }
96
97    /// Add old data for UPDATE operations
98    #[must_use]
99    pub fn with_old_data(mut self, old_data: serde_json::Value) -> Self {
100        self.old_data = Some(old_data);
101        self
102    }
103}
104
105/// EventBridge that connects ChangeLogListener with SubscriptionManager
106pub struct EventBridge {
107    /// Subscription manager for broadcasting events
108    manager: Arc<SubscriptionManager>,
109
110    /// Receiver for entity events from ChangeLogListener
111    receiver: mpsc::Receiver<EntityEvent>,
112
113    /// Sender for entity events (used to send events to bridge)
114    sender: mpsc::Sender<EntityEvent>,
115
116    /// Configuration (stored for potential future use)
117    #[allow(dead_code)]
118    config: EventBridgeConfig,
119}
120
121impl EventBridge {
122    /// Create a new EventBridge
123    #[must_use]
124    pub fn new(manager: Arc<SubscriptionManager>, config: EventBridgeConfig) -> Self {
125        let (sender, receiver) = mpsc::channel(config.channel_capacity);
126
127        Self {
128            manager,
129            receiver,
130            sender,
131            config,
132        }
133    }
134
135    /// Get a sender for publishing entity events
136    #[must_use]
137    pub fn sender(&self) -> mpsc::Sender<EntityEvent> {
138        self.sender.clone()
139    }
140
141    /// Convert EntityEvent to SubscriptionEvent
142    fn convert_event(entity_event: EntityEvent) -> SubscriptionEvent {
143        // Convert operation string to SubscriptionOperation
144        let operation = match entity_event.operation.to_uppercase().as_str() {
145            "INSERT" => SubscriptionOperation::Create,
146            "UPDATE" => SubscriptionOperation::Update,
147            "DELETE" => SubscriptionOperation::Delete,
148            _ => {
149                // Default to Create for unknown operations
150                debug!("Unknown operation: {}, defaulting to Create", entity_event.operation);
151                SubscriptionOperation::Create
152            },
153        };
154
155        let mut event = SubscriptionEvent::new(
156            entity_event.entity_type,
157            entity_event.entity_id,
158            operation,
159            entity_event.data,
160        );
161
162        // Add old data if present
163        if let Some(old_data) = entity_event.old_data {
164            event = event.with_old_data(old_data);
165        }
166
167        event
168    }
169
170    /// Run the event bridge loop (spawned in background)
171    pub async fn run(mut self) {
172        info!("EventBridge started");
173
174        while let Some(entity_event) = self.receiver.recv().await {
175            debug!("EventBridge received entity event: {}", entity_event.entity_type);
176
177            // Convert entity event to subscription event
178            let subscription_event = Self::convert_event(entity_event);
179
180            // Publish to subscription manager
181            let matched = self.manager.publish_event(subscription_event);
182
183            if matched > 0 {
184                debug!("EventBridge matched {} subscriptions", matched);
185            }
186        }
187
188        info!("EventBridge stopped");
189    }
190
191    /// Spawn EventBridge as a background task
192    pub fn spawn(self) -> tokio::task::JoinHandle<()> {
193        tokio::spawn(self.run())
194    }
195
196    /// Get the sender for sending events to the bridge
197    pub fn get_sender(&self) -> mpsc::Sender<EntityEvent> {
198        self.sender.clone()
199    }
200
201    /// Get the subscription manager (for testing)
202    pub fn manager(&self) -> Arc<SubscriptionManager> {
203        Arc::clone(&self.manager)
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use fraiseql_core::schema::CompiledSchema;
210
211    use super::*;
212
213    #[test]
214    fn test_event_bridge_creation() {
215        let schema = Arc::new(CompiledSchema::new());
216        let manager = Arc::new(SubscriptionManager::new(schema));
217        let config = EventBridgeConfig::new();
218
219        let bridge = EventBridge::new(manager, config);
220
221        // Verify bridge is created
222        assert!(bridge.sender().try_reserve().is_ok());
223    }
224
225    #[test]
226    fn test_event_conversion_insert() {
227        let entity_event = EntityEvent::new(
228            "Order",
229            "order_123",
230            "INSERT",
231            serde_json::json!({
232                "id": "order_123",
233                "status": "pending"
234            }),
235        );
236
237        let subscription_event = EventBridge::convert_event(entity_event);
238
239        assert_eq!(subscription_event.entity_type, "Order");
240        assert_eq!(subscription_event.entity_id, "order_123");
241        assert_eq!(subscription_event.operation, SubscriptionOperation::Create);
242    }
243
244    #[test]
245    fn test_event_conversion_update() {
246        let entity_event = EntityEvent::new(
247            "Order",
248            "order_123",
249            "UPDATE",
250            serde_json::json!({
251                "id": "order_123",
252                "status": "shipped"
253            }),
254        );
255
256        let subscription_event = EventBridge::convert_event(entity_event);
257
258        assert_eq!(subscription_event.operation, SubscriptionOperation::Update);
259    }
260
261    #[test]
262    fn test_event_conversion_delete() {
263        let entity_event = EntityEvent::new(
264            "Order",
265            "order_123",
266            "DELETE",
267            serde_json::json!({
268                "id": "order_123"
269            }),
270        );
271
272        let subscription_event = EventBridge::convert_event(entity_event);
273
274        assert_eq!(subscription_event.operation, SubscriptionOperation::Delete);
275    }
276
277    #[test]
278    fn test_event_conversion_with_old_data() {
279        let entity_event = EntityEvent::new(
280            "Order",
281            "order_123",
282            "UPDATE",
283            serde_json::json!({
284                "id": "order_123",
285                "status": "shipped"
286            }),
287        )
288        .with_old_data(serde_json::json!({
289            "id": "order_123",
290            "status": "pending"
291        }));
292
293        let subscription_event = EventBridge::convert_event(entity_event);
294
295        assert!(subscription_event.old_data.is_some());
296    }
297
298    #[tokio::test]
299    async fn test_event_bridge_spawning() {
300        let schema = Arc::new(CompiledSchema::new());
301        let manager = Arc::new(SubscriptionManager::new(schema));
302        let config = EventBridgeConfig::new();
303
304        let bridge = EventBridge::new(manager, config);
305        let handle = bridge.spawn();
306
307        // Verify task was spawned
308        assert!(!handle.is_finished());
309
310        // Clean up
311        handle.abort();
312    }
313}