fraiseql_server/subscriptions/
event_bridge.rs1use std::sync::Arc;
23
24use fraiseql_core::runtime::subscription::{
25 SubscriptionEvent, SubscriptionManager, SubscriptionOperation,
26};
27use tokio::sync::mpsc;
28use tracing::{debug, info};
29
30#[derive(Debug, Clone)]
32pub struct EventBridgeConfig {
33 pub channel_capacity: usize,
35}
36
37impl EventBridgeConfig {
38 #[must_use]
40 pub fn new() -> Self {
41 Self {
42 channel_capacity: 100,
43 }
44 }
45
46 #[must_use]
48 pub fn with_channel_capacity(mut self, capacity: usize) -> Self {
49 self.channel_capacity = capacity;
50 self
51 }
52}
53
54impl Default for EventBridgeConfig {
55 fn default() -> Self {
56 Self::new()
57 }
58}
59
60#[derive(Debug, Clone)]
62pub struct EntityEvent {
63 pub entity_type: String,
65
66 pub entity_id: String,
68
69 pub operation: String,
71
72 pub data: serde_json::Value,
74
75 pub old_data: Option<serde_json::Value>,
77}
78
79impl EntityEvent {
80 #[must_use]
82 pub fn new(
83 entity_type: impl Into<String>,
84 entity_id: impl Into<String>,
85 operation: impl Into<String>,
86 data: serde_json::Value,
87 ) -> Self {
88 Self {
89 entity_type: entity_type.into(),
90 entity_id: entity_id.into(),
91 operation: operation.into(),
92 data,
93 old_data: None,
94 }
95 }
96
97 #[must_use]
99 pub fn with_old_data(mut self, old_data: serde_json::Value) -> Self {
100 self.old_data = Some(old_data);
101 self
102 }
103}
104
105pub struct EventBridge {
107 manager: Arc<SubscriptionManager>,
109
110 receiver: mpsc::Receiver<EntityEvent>,
112
113 sender: mpsc::Sender<EntityEvent>,
115
116 #[allow(dead_code)]
118 config: EventBridgeConfig,
119}
120
121impl EventBridge {
122 #[must_use]
124 pub fn new(manager: Arc<SubscriptionManager>, config: EventBridgeConfig) -> Self {
125 let (sender, receiver) = mpsc::channel(config.channel_capacity);
126
127 Self {
128 manager,
129 receiver,
130 sender,
131 config,
132 }
133 }
134
135 #[must_use]
137 pub fn sender(&self) -> mpsc::Sender<EntityEvent> {
138 self.sender.clone()
139 }
140
141 fn convert_event(entity_event: EntityEvent) -> SubscriptionEvent {
143 let operation = match entity_event.operation.to_uppercase().as_str() {
145 "INSERT" => SubscriptionOperation::Create,
146 "UPDATE" => SubscriptionOperation::Update,
147 "DELETE" => SubscriptionOperation::Delete,
148 _ => {
149 debug!("Unknown operation: {}, defaulting to Create", entity_event.operation);
151 SubscriptionOperation::Create
152 },
153 };
154
155 let mut event = SubscriptionEvent::new(
156 entity_event.entity_type,
157 entity_event.entity_id,
158 operation,
159 entity_event.data,
160 );
161
162 if let Some(old_data) = entity_event.old_data {
164 event = event.with_old_data(old_data);
165 }
166
167 event
168 }
169
170 pub async fn run(mut self) {
172 info!("EventBridge started");
173
174 while let Some(entity_event) = self.receiver.recv().await {
175 debug!("EventBridge received entity event: {}", entity_event.entity_type);
176
177 let subscription_event = Self::convert_event(entity_event);
179
180 let matched = self.manager.publish_event(subscription_event);
182
183 if matched > 0 {
184 debug!("EventBridge matched {} subscriptions", matched);
185 }
186 }
187
188 info!("EventBridge stopped");
189 }
190
191 pub fn spawn(self) -> tokio::task::JoinHandle<()> {
193 tokio::spawn(self.run())
194 }
195
196 pub fn get_sender(&self) -> mpsc::Sender<EntityEvent> {
198 self.sender.clone()
199 }
200
201 pub fn manager(&self) -> Arc<SubscriptionManager> {
203 Arc::clone(&self.manager)
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use fraiseql_core::schema::CompiledSchema;
210
211 use super::*;
212
213 #[test]
214 fn test_event_bridge_creation() {
215 let schema = Arc::new(CompiledSchema::new());
216 let manager = Arc::new(SubscriptionManager::new(schema));
217 let config = EventBridgeConfig::new();
218
219 let bridge = EventBridge::new(manager, config);
220
221 assert!(bridge.sender().try_reserve().is_ok());
223 }
224
225 #[test]
226 fn test_event_conversion_insert() {
227 let entity_event = EntityEvent::new(
228 "Order",
229 "order_123",
230 "INSERT",
231 serde_json::json!({
232 "id": "order_123",
233 "status": "pending"
234 }),
235 );
236
237 let subscription_event = EventBridge::convert_event(entity_event);
238
239 assert_eq!(subscription_event.entity_type, "Order");
240 assert_eq!(subscription_event.entity_id, "order_123");
241 assert_eq!(subscription_event.operation, SubscriptionOperation::Create);
242 }
243
244 #[test]
245 fn test_event_conversion_update() {
246 let entity_event = EntityEvent::new(
247 "Order",
248 "order_123",
249 "UPDATE",
250 serde_json::json!({
251 "id": "order_123",
252 "status": "shipped"
253 }),
254 );
255
256 let subscription_event = EventBridge::convert_event(entity_event);
257
258 assert_eq!(subscription_event.operation, SubscriptionOperation::Update);
259 }
260
261 #[test]
262 fn test_event_conversion_delete() {
263 let entity_event = EntityEvent::new(
264 "Order",
265 "order_123",
266 "DELETE",
267 serde_json::json!({
268 "id": "order_123"
269 }),
270 );
271
272 let subscription_event = EventBridge::convert_event(entity_event);
273
274 assert_eq!(subscription_event.operation, SubscriptionOperation::Delete);
275 }
276
277 #[test]
278 fn test_event_conversion_with_old_data() {
279 let entity_event = EntityEvent::new(
280 "Order",
281 "order_123",
282 "UPDATE",
283 serde_json::json!({
284 "id": "order_123",
285 "status": "shipped"
286 }),
287 )
288 .with_old_data(serde_json::json!({
289 "id": "order_123",
290 "status": "pending"
291 }));
292
293 let subscription_event = EventBridge::convert_event(entity_event);
294
295 assert!(subscription_event.old_data.is_some());
296 }
297
298 #[tokio::test]
299 async fn test_event_bridge_spawning() {
300 let schema = Arc::new(CompiledSchema::new());
301 let manager = Arc::new(SubscriptionManager::new(schema));
302 let config = EventBridgeConfig::new();
303
304 let bridge = EventBridge::new(manager, config);
305 let handle = bridge.spawn();
306
307 assert!(!handle.is_finished());
309
310 handle.abort();
312 }
313}