fraiseql_server/subscriptions/
event_bridge.rs1use std::sync::Arc;
23
24use fraiseql_core::runtime::subscription::{
25 SubscriptionEvent, SubscriptionManager, SubscriptionOperation,
26};
27use tokio::sync::mpsc;
28use tracing::{debug, info};
29
30#[derive(Debug, Clone, Copy)]
32pub struct EventBridgeConfig {
33 pub channel_capacity: usize,
35}
36
37impl EventBridgeConfig {
38 #[must_use]
40 pub const fn new() -> Self {
41 Self {
42 channel_capacity: 100,
43 }
44 }
45
46 #[must_use]
48 pub const fn with_channel_capacity(mut self, capacity: usize) -> Self {
49 self.channel_capacity = capacity;
50 self
51 }
52}
53
54impl Default for EventBridgeConfig {
55 fn default() -> Self {
56 Self::new()
57 }
58}
59
60#[derive(Debug, Clone)]
62pub struct EntityEvent {
63 pub entity_type: String,
65
66 pub entity_id: String,
68
69 pub operation: String,
71
72 pub data: serde_json::Value,
74
75 pub old_data: Option<serde_json::Value>,
77}
78
79impl EntityEvent {
80 #[must_use]
82 pub fn new(
83 entity_type: impl Into<String>,
84 entity_id: impl Into<String>,
85 operation: impl Into<String>,
86 data: serde_json::Value,
87 ) -> Self {
88 Self {
89 entity_type: entity_type.into(),
90 entity_id: entity_id.into(),
91 operation: operation.into(),
92 data,
93 old_data: None,
94 }
95 }
96
97 #[must_use]
99 pub fn with_old_data(mut self, old_data: serde_json::Value) -> Self {
100 self.old_data = Some(old_data);
101 self
102 }
103}
104
105pub struct EventBridge {
107 manager: Arc<SubscriptionManager>,
109
110 receiver: mpsc::Receiver<EntityEvent>,
112
113 sender: mpsc::Sender<EntityEvent>,
115}
116
117impl EventBridge {
118 #[must_use]
120 pub fn new(manager: Arc<SubscriptionManager>, config: EventBridgeConfig) -> Self {
121 let (sender, receiver) = mpsc::channel(config.channel_capacity);
122
123 Self {
124 manager,
125 receiver,
126 sender,
127 }
128 }
129
130 #[must_use]
132 pub fn sender(&self) -> mpsc::Sender<EntityEvent> {
133 self.sender.clone()
134 }
135
136 fn convert_event(entity_event: EntityEvent) -> SubscriptionEvent {
138 let operation = match entity_event.operation.to_uppercase().as_str() {
140 "INSERT" => SubscriptionOperation::Create,
141 "UPDATE" => SubscriptionOperation::Update,
142 "DELETE" => SubscriptionOperation::Delete,
143 _ => {
144 debug!("Unknown operation: {}, defaulting to Create", entity_event.operation);
146 SubscriptionOperation::Create
147 },
148 };
149
150 let mut event = SubscriptionEvent::new(
151 entity_event.entity_type,
152 entity_event.entity_id,
153 operation,
154 entity_event.data,
155 );
156
157 if let Some(old_data) = entity_event.old_data {
159 event = event.with_old_data(old_data);
160 }
161
162 event
163 }
164
165 #[allow(clippy::cognitive_complexity)] pub async fn run(mut self) {
168 info!("EventBridge started");
169
170 while let Some(entity_event) = self.receiver.recv().await {
171 debug!("EventBridge received entity event: {}", entity_event.entity_type);
172
173 let subscription_event = Self::convert_event(entity_event);
175
176 let matched = self.manager.publish_event(subscription_event);
178
179 if matched > 0 {
180 debug!("EventBridge matched {} subscriptions", matched);
181 }
182 }
183
184 info!("EventBridge stopped");
185 }
186
187 pub fn spawn(self) -> tokio::task::JoinHandle<()> {
189 tokio::spawn(self.run())
190 }
191
192 pub fn get_sender(&self) -> mpsc::Sender<EntityEvent> {
194 self.sender.clone()
195 }
196
197 pub fn manager(&self) -> Arc<SubscriptionManager> {
199 Arc::clone(&self.manager)
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use fraiseql_core::schema::CompiledSchema;
206
207 use super::*;
208
209 #[test]
210 fn test_event_bridge_creation() {
211 let schema = Arc::new(CompiledSchema::new());
212 let manager = Arc::new(SubscriptionManager::new(schema));
213 let config = EventBridgeConfig::new();
214
215 let bridge = EventBridge::new(manager, config);
216
217 assert!(
219 bridge.sender().try_reserve().is_ok(),
220 "event bridge channel should have capacity for at least one message"
221 );
222 }
223
224 #[test]
225 fn test_event_conversion_insert() {
226 let entity_event = EntityEvent::new(
227 "Order",
228 "order_123",
229 "INSERT",
230 serde_json::json!({
231 "id": "order_123",
232 "status": "pending"
233 }),
234 );
235
236 let subscription_event = EventBridge::convert_event(entity_event);
237
238 assert_eq!(subscription_event.entity_type, "Order");
239 assert_eq!(subscription_event.entity_id, "order_123");
240 assert_eq!(subscription_event.operation, SubscriptionOperation::Create);
241 }
242
243 #[test]
244 fn test_event_conversion_update() {
245 let entity_event = EntityEvent::new(
246 "Order",
247 "order_123",
248 "UPDATE",
249 serde_json::json!({
250 "id": "order_123",
251 "status": "shipped"
252 }),
253 );
254
255 let subscription_event = EventBridge::convert_event(entity_event);
256
257 assert_eq!(subscription_event.operation, SubscriptionOperation::Update);
258 }
259
260 #[test]
261 fn test_event_conversion_delete() {
262 let entity_event = EntityEvent::new(
263 "Order",
264 "order_123",
265 "DELETE",
266 serde_json::json!({
267 "id": "order_123"
268 }),
269 );
270
271 let subscription_event = EventBridge::convert_event(entity_event);
272
273 assert_eq!(subscription_event.operation, SubscriptionOperation::Delete);
274 }
275
276 #[test]
277 fn test_event_conversion_with_old_data() {
278 let entity_event = EntityEvent::new(
279 "Order",
280 "order_123",
281 "UPDATE",
282 serde_json::json!({
283 "id": "order_123",
284 "status": "shipped"
285 }),
286 )
287 .with_old_data(serde_json::json!({
288 "id": "order_123",
289 "status": "pending"
290 }));
291
292 let subscription_event = EventBridge::convert_event(entity_event);
293
294 assert!(
295 subscription_event.old_data.is_some(),
296 "update events should carry old_data for delta computation"
297 );
298 }
299
300 #[tokio::test]
301 async fn test_event_bridge_spawning() {
302 let schema = Arc::new(CompiledSchema::new());
303 let manager = Arc::new(SubscriptionManager::new(schema));
304 let config = EventBridgeConfig::new();
305
306 let bridge = EventBridge::new(manager, config);
307 let handle = bridge.spawn();
308
309 assert!(!handle.is_finished());
311
312 handle.abort();
314 }
315}