Skip to main content

this/core/
events.rs

1//! Internal event system for real-time notifications
2//!
3//! The EventBus is the core of the real-time system. It uses `tokio::sync::broadcast`
4//! to decouple mutations (REST, GraphQL handlers) from notifications (WebSocket, SSE).
5//!
6//! # Architecture
7//!
8//! ```text
9//! REST Handler ──┐
10//!                ├──▶ EventBus::publish() ──▶ broadcast channel ──▶ WebSocket subscribers
11//! GraphQL Handler┘                                                ──▶ SSE subscribers
12//! ```
13//!
14//! # Usage
15//!
16//! ```rust,ignore
17//! let event_bus = EventBus::new(1024);
18//!
19//! // Subscribe to events
20//! let mut rx = event_bus.subscribe();
21//!
22//! // Publish an event (non-blocking, fire-and-forget)
23//! event_bus.publish(FrameworkEvent::Entity(EntityEvent::Created {
24//!     entity_type: "order".to_string(),
25//!     entity_id: Uuid::new_v4(),
26//!     data: json!({"name": "Order #1"}),
27//! }));
28//!
29//! // Receive events
30//! if let Ok(event) = rx.recv().await {
31//!     println!("Received: {:?}", event);
32//! }
33//! ```
34
35use chrono::{DateTime, Utc};
36use serde::{Deserialize, Serialize};
37use tokio::sync::broadcast;
38use uuid::Uuid;
39
40/// Events related to entity mutations (create, update, delete)
41#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "action", rename_all = "snake_case")]
43pub enum EntityEvent {
44    /// An entity was created
45    Created {
46        entity_type: String,
47        entity_id: Uuid,
48        data: serde_json::Value,
49    },
50    /// An entity was updated
51    Updated {
52        entity_type: String,
53        entity_id: Uuid,
54        data: serde_json::Value,
55    },
56    /// An entity was deleted
57    Deleted {
58        entity_type: String,
59        entity_id: Uuid,
60    },
61}
62
63/// Events related to link mutations (create, delete)
64#[derive(Debug, Clone, Serialize, Deserialize)]
65#[serde(tag = "action", rename_all = "snake_case")]
66pub enum LinkEvent {
67    /// A link was created between two entities
68    Created {
69        link_type: String,
70        link_id: Uuid,
71        source_id: Uuid,
72        target_id: Uuid,
73        metadata: Option<serde_json::Value>,
74    },
75    /// A link was deleted
76    Deleted {
77        link_type: String,
78        link_id: Uuid,
79        source_id: Uuid,
80        target_id: Uuid,
81    },
82}
83
84/// Top-level framework event that wraps entity and link events
85#[derive(Debug, Clone, Serialize, Deserialize)]
86#[serde(tag = "kind", rename_all = "snake_case")]
87pub enum FrameworkEvent {
88    /// An entity event
89    Entity(EntityEvent),
90    /// A link event
91    Link(LinkEvent),
92}
93
94impl FrameworkEvent {
95    /// Get the timestamp of the event (generated at creation time)
96    /// Note: timestamp is added by EventEnvelope, not by the event itself
97    pub fn event_kind(&self) -> &str {
98        match self {
99            FrameworkEvent::Entity(_) => "entity",
100            FrameworkEvent::Link(_) => "link",
101        }
102    }
103
104    /// Get the entity type this event relates to
105    pub fn entity_type(&self) -> Option<&str> {
106        match self {
107            FrameworkEvent::Entity(e) => match e {
108                EntityEvent::Created { entity_type, .. }
109                | EntityEvent::Updated { entity_type, .. }
110                | EntityEvent::Deleted { entity_type, .. } => Some(entity_type),
111            },
112            FrameworkEvent::Link(_) => None,
113        }
114    }
115
116    /// Get the entity ID this event relates to (if applicable)
117    pub fn entity_id(&self) -> Option<Uuid> {
118        match self {
119            FrameworkEvent::Entity(e) => match e {
120                EntityEvent::Created { entity_id, .. }
121                | EntityEvent::Updated { entity_id, .. }
122                | EntityEvent::Deleted { entity_id, .. } => Some(*entity_id),
123            },
124            FrameworkEvent::Link(l) => match l {
125                LinkEvent::Created { link_id, .. } | LinkEvent::Deleted { link_id, .. } => {
126                    Some(*link_id)
127                }
128            },
129        }
130    }
131
132    /// Get the action name (created, updated, deleted)
133    pub fn action(&self) -> &str {
134        match self {
135            FrameworkEvent::Entity(e) => match e {
136                EntityEvent::Created { .. } => "created",
137                EntityEvent::Updated { .. } => "updated",
138                EntityEvent::Deleted { .. } => "deleted",
139            },
140            FrameworkEvent::Link(l) => match l {
141                LinkEvent::Created { .. } => "created",
142                LinkEvent::Deleted { .. } => "deleted",
143            },
144        }
145    }
146}
147
148/// Envelope wrapping a framework event with metadata
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct EventEnvelope {
151    /// Unique event ID
152    pub id: Uuid,
153    /// When the event occurred
154    pub timestamp: DateTime<Utc>,
155    /// The actual event
156    pub event: FrameworkEvent,
157}
158
159impl EventEnvelope {
160    /// Create a new event envelope
161    pub fn new(event: FrameworkEvent) -> Self {
162        Self {
163            id: Uuid::new_v4(),
164            timestamp: Utc::now(),
165            event,
166        }
167    }
168}
169
170/// Broadcast-based event bus for the framework
171///
172/// Uses `tokio::sync::broadcast` which allows multiple receivers and is
173/// designed for exactly this kind of pub/sub pattern.
174///
175/// The bus is cheap to clone (Arc internally) and can be shared across threads.
176#[derive(Debug, Clone)]
177pub struct EventBus {
178    sender: broadcast::Sender<EventEnvelope>,
179}
180
181impl EventBus {
182    /// Create a new EventBus with the given channel capacity
183    ///
184    /// The capacity determines how many events can be buffered before
185    /// slow receivers start losing events (lagged).
186    ///
187    /// # Arguments
188    ///
189    /// * `capacity` - Buffer size for the broadcast channel (recommended: 1024)
190    pub fn new(capacity: usize) -> Self {
191        let (sender, _) = broadcast::channel(capacity);
192        Self { sender }
193    }
194
195    /// Publish an event to all subscribers
196    ///
197    /// This is non-blocking and will never fail. If there are no subscribers,
198    /// the event is simply dropped. If subscribers are lagging, they will
199    /// receive a `Lagged` error on their next recv().
200    ///
201    /// Returns the number of receivers that will receive the event.
202    pub fn publish(&self, event: FrameworkEvent) -> usize {
203        let envelope = EventEnvelope::new(event);
204        // send() returns Err only if there are no receivers, which is fine
205        self.sender.send(envelope).unwrap_or(0)
206    }
207
208    /// Subscribe to events
209    ///
210    /// Returns a receiver that will get all future events published to the bus.
211    /// Events published before this call are not received.
212    pub fn subscribe(&self) -> broadcast::Receiver<EventEnvelope> {
213        self.sender.subscribe()
214    }
215
216    /// Get the current number of active subscribers
217    pub fn receiver_count(&self) -> usize {
218        self.sender.receiver_count()
219    }
220}
221
222impl Default for EventBus {
223    fn default() -> Self {
224        Self::new(1024)
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231    use serde_json::json;
232
233    #[test]
234    fn test_entity_event_created() {
235        let event = EntityEvent::Created {
236            entity_type: "order".to_string(),
237            entity_id: Uuid::new_v4(),
238            data: json!({"name": "Order #1"}),
239        };
240
241        let json = serde_json::to_value(&event).unwrap();
242        assert_eq!(json["action"], "created");
243        assert_eq!(json["entity_type"], "order");
244    }
245
246    #[test]
247    fn test_link_event_created() {
248        let event = LinkEvent::Created {
249            link_type: "has_invoice".to_string(),
250            link_id: Uuid::new_v4(),
251            source_id: Uuid::new_v4(),
252            target_id: Uuid::new_v4(),
253            metadata: Some(json!({"priority": "high"})),
254        };
255
256        let json = serde_json::to_value(&event).unwrap();
257        assert_eq!(json["action"], "created");
258        assert_eq!(json["link_type"], "has_invoice");
259    }
260
261    #[test]
262    fn test_framework_event_entity_type() {
263        let event = FrameworkEvent::Entity(EntityEvent::Updated {
264            entity_type: "invoice".to_string(),
265            entity_id: Uuid::new_v4(),
266            data: json!({"status": "paid"}),
267        });
268
269        assert_eq!(event.entity_type(), Some("invoice"));
270        assert_eq!(event.action(), "updated");
271        assert_eq!(event.event_kind(), "entity");
272    }
273
274    #[test]
275    fn test_framework_event_link() {
276        let event = FrameworkEvent::Link(LinkEvent::Deleted {
277            link_type: "has_invoice".to_string(),
278            link_id: Uuid::new_v4(),
279            source_id: Uuid::new_v4(),
280            target_id: Uuid::new_v4(),
281        });
282
283        assert_eq!(event.entity_type(), None);
284        assert_eq!(event.action(), "deleted");
285        assert_eq!(event.event_kind(), "link");
286    }
287
288    #[test]
289    fn test_event_envelope_has_metadata() {
290        let event = FrameworkEvent::Entity(EntityEvent::Created {
291            entity_type: "order".to_string(),
292            entity_id: Uuid::new_v4(),
293            data: json!({}),
294        });
295
296        let envelope = EventEnvelope::new(event);
297        assert!(!envelope.id.is_nil());
298        assert!(envelope.timestamp <= Utc::now());
299    }
300
301    #[test]
302    fn test_event_envelope_serialization_roundtrip() {
303        let event = FrameworkEvent::Entity(EntityEvent::Created {
304            entity_type: "order".to_string(),
305            entity_id: Uuid::new_v4(),
306            data: json!({"amount": 42.0}),
307        });
308
309        let envelope = EventEnvelope::new(event);
310        let json = serde_json::to_string(&envelope).unwrap();
311        let deserialized: EventEnvelope = serde_json::from_str(&json).unwrap();
312
313        assert_eq!(envelope.id, deserialized.id);
314        assert_eq!(envelope.event.event_kind(), deserialized.event.event_kind());
315    }
316
317    #[tokio::test]
318    async fn test_event_bus_publish_subscribe() {
319        let bus = EventBus::new(16);
320        let mut rx = bus.subscribe();
321
322        let entity_id = Uuid::new_v4();
323        let event = FrameworkEvent::Entity(EntityEvent::Created {
324            entity_type: "order".to_string(),
325            entity_id,
326            data: json!({"name": "Test Order"}),
327        });
328
329        let receivers = bus.publish(event);
330        assert_eq!(receivers, 1);
331
332        let received = rx.recv().await.unwrap();
333        assert_eq!(received.event.entity_id(), Some(entity_id));
334        assert_eq!(received.event.action(), "created");
335    }
336
337    #[tokio::test]
338    async fn test_event_bus_multiple_subscribers() {
339        let bus = EventBus::new(16);
340        let mut rx1 = bus.subscribe();
341        let mut rx2 = bus.subscribe();
342
343        assert_eq!(bus.receiver_count(), 2);
344
345        let event = FrameworkEvent::Entity(EntityEvent::Deleted {
346            entity_type: "order".to_string(),
347            entity_id: Uuid::new_v4(),
348        });
349
350        let receivers = bus.publish(event);
351        assert_eq!(receivers, 2);
352
353        let e1 = rx1.recv().await.unwrap();
354        let e2 = rx2.recv().await.unwrap();
355
356        assert_eq!(e1.id, e2.id); // Same event envelope
357    }
358
359    #[test]
360    fn test_event_bus_publish_without_subscribers() {
361        let bus = EventBus::new(16);
362
363        let event = FrameworkEvent::Entity(EntityEvent::Created {
364            entity_type: "order".to_string(),
365            entity_id: Uuid::new_v4(),
366            data: json!({}),
367        });
368
369        // Should not panic even with no subscribers
370        let receivers = bus.publish(event);
371        assert_eq!(receivers, 0);
372    }
373
374    #[test]
375    fn test_event_bus_default() {
376        let bus = EventBus::default();
377        assert_eq!(bus.receiver_count(), 0);
378    }
379
380    #[test]
381    fn test_event_bus_clone() {
382        let bus = EventBus::new(16);
383        let _rx = bus.subscribe();
384
385        let bus2 = bus.clone();
386        assert_eq!(bus2.receiver_count(), 1);
387
388        let _rx2 = bus2.subscribe();
389        assert_eq!(bus.receiver_count(), 2);
390    }
391}