Skip to main content

this/core/
events.rs

1//! Internal event system for real-time notifications
2//!
3//! The EventBus is the core of the real-time system. It uses `tokio::sync::broadcast`
4//! to decouple mutations (REST, GraphQL handlers) from notifications (WebSocket, SSE).
5//!
6//! # Architecture
7//!
8//! ```text
9//! REST Handler ──┐
10//!                ├──▶ EventBus::publish() ──▶ broadcast channel ──▶ WebSocket subscribers
11//! GraphQL Handler┘                                                ──▶ SSE subscribers
12//! ```
13//!
14//! # Usage
15//!
16//! ```rust,ignore
17//! let event_bus = EventBus::new(1024);
18//!
19//! // Subscribe to events
20//! let mut rx = event_bus.subscribe();
21//!
22//! // Publish an event (non-blocking, fire-and-forget)
23//! event_bus.publish(FrameworkEvent::Entity(EntityEvent::Created {
24//!     entity_type: "order".to_string(),
25//!     entity_id: Uuid::new_v4(),
26//!     data: json!({"name": "Order #1"}),
27//! }));
28//!
29//! // Receive events
30//! if let Ok(event) = rx.recv().await {
31//!     println!("Received: {:?}", event);
32//! }
33//! ```
34
35use crate::events::log::EventLog;
36use crate::events::types::SeqNo;
37use chrono::{DateTime, Utc};
38use serde::{Deserialize, Serialize};
39use std::sync::Arc;
40use tokio::sync::broadcast;
41use uuid::Uuid;
42
43/// Events related to entity mutations (create, update, delete)
44#[derive(Debug, Clone, Serialize, Deserialize)]
45#[serde(tag = "action", rename_all = "snake_case")]
46pub enum EntityEvent {
47    /// An entity was created
48    Created {
49        entity_type: String,
50        entity_id: Uuid,
51        data: serde_json::Value,
52    },
53    /// An entity was updated
54    Updated {
55        entity_type: String,
56        entity_id: Uuid,
57        data: serde_json::Value,
58    },
59    /// An entity was deleted
60    Deleted {
61        entity_type: String,
62        entity_id: Uuid,
63    },
64}
65
66/// Events related to link mutations (create, delete)
67#[derive(Debug, Clone, Serialize, Deserialize)]
68#[serde(tag = "action", rename_all = "snake_case")]
69pub enum LinkEvent {
70    /// A link was created between two entities
71    Created {
72        link_type: String,
73        link_id: Uuid,
74        source_id: Uuid,
75        target_id: Uuid,
76        metadata: Option<serde_json::Value>,
77    },
78    /// A link was deleted
79    Deleted {
80        link_type: String,
81        link_id: Uuid,
82        source_id: Uuid,
83        target_id: Uuid,
84    },
85}
86
87/// Top-level framework event that wraps entity and link events
88#[derive(Debug, Clone, Serialize, Deserialize)]
89#[serde(tag = "kind", rename_all = "snake_case")]
90pub enum FrameworkEvent {
91    /// An entity event
92    Entity(EntityEvent),
93    /// A link event
94    Link(LinkEvent),
95}
96
97impl FrameworkEvent {
98    /// Get the timestamp of the event (generated at creation time)
99    /// Note: timestamp is added by EventEnvelope, not by the event itself
100    pub fn event_kind(&self) -> &str {
101        match self {
102            FrameworkEvent::Entity(_) => "entity",
103            FrameworkEvent::Link(_) => "link",
104        }
105    }
106
107    /// Get the entity type this event relates to
108    pub fn entity_type(&self) -> Option<&str> {
109        match self {
110            FrameworkEvent::Entity(e) => match e {
111                EntityEvent::Created { entity_type, .. }
112                | EntityEvent::Updated { entity_type, .. }
113                | EntityEvent::Deleted { entity_type, .. } => Some(entity_type),
114            },
115            FrameworkEvent::Link(_) => None,
116        }
117    }
118
119    /// Get the entity ID this event relates to (if applicable)
120    pub fn entity_id(&self) -> Option<Uuid> {
121        match self {
122            FrameworkEvent::Entity(e) => match e {
123                EntityEvent::Created { entity_id, .. }
124                | EntityEvent::Updated { entity_id, .. }
125                | EntityEvent::Deleted { entity_id, .. } => Some(*entity_id),
126            },
127            FrameworkEvent::Link(l) => match l {
128                LinkEvent::Created { link_id, .. } | LinkEvent::Deleted { link_id, .. } => {
129                    Some(*link_id)
130                }
131            },
132        }
133    }
134
135    /// Get the action name (created, updated, deleted)
136    pub fn action(&self) -> &str {
137        match self {
138            FrameworkEvent::Entity(e) => match e {
139                EntityEvent::Created { .. } => "created",
140                EntityEvent::Updated { .. } => "updated",
141                EntityEvent::Deleted { .. } => "deleted",
142            },
143            FrameworkEvent::Link(l) => match l {
144                LinkEvent::Created { .. } => "created",
145                LinkEvent::Deleted { .. } => "deleted",
146            },
147        }
148    }
149}
150
151/// Envelope wrapping a framework event with metadata
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct EventEnvelope {
154    /// Unique event ID
155    pub id: Uuid,
156    /// When the event occurred
157    pub timestamp: DateTime<Utc>,
158    /// The actual event
159    pub event: FrameworkEvent,
160    /// Sequence number assigned by the EventLog (None if not yet persisted)
161    #[serde(skip_serializing_if = "Option::is_none", default)]
162    pub seq_no: Option<SeqNo>,
163}
164
165impl EventEnvelope {
166    /// Create a new event envelope
167    pub fn new(event: FrameworkEvent) -> Self {
168        Self {
169            id: Uuid::new_v4(),
170            timestamp: Utc::now(),
171            event,
172            seq_no: None,
173        }
174    }
175}
176
177/// Broadcast-based event bus for the framework
178///
179/// Uses `tokio::sync::broadcast` which allows multiple receivers and is
180/// designed for exactly this kind of pub/sub pattern.
181///
182/// The bus is cheap to clone (Arc internally) and can be shared across threads.
183///
184/// # EventLog Bridge
185///
186/// When an `EventLog` is attached via `with_event_log()`, every published event
187/// is also appended to the persistent log. The EventLog becomes the source of
188/// truth, while the broadcast channel remains the real-time notification path.
189///
190/// ```text
191/// publish(event) ──┬──▶ broadcast channel (real-time, fire-and-forget)
192///                  └──▶ EventLog.append() (persistent, replayable)
193/// ```
194#[derive(Clone)]
195pub struct EventBus {
196    sender: broadcast::Sender<EventEnvelope>,
197    /// Optional persistent event log (bridge)
198    event_log: Option<Arc<dyn EventLog>>,
199}
200
201impl std::fmt::Debug for EventBus {
202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203        f.debug_struct("EventBus")
204            .field("sender", &self.sender)
205            .field("has_event_log", &self.event_log.is_some())
206            .finish()
207    }
208}
209
210impl EventBus {
211    /// Create a new EventBus with the given channel capacity
212    ///
213    /// The capacity determines how many events can be buffered before
214    /// slow receivers start losing events (lagged).
215    ///
216    /// # Arguments
217    ///
218    /// * `capacity` - Buffer size for the broadcast channel (recommended: 1024)
219    pub fn new(capacity: usize) -> Self {
220        let (sender, _) = broadcast::channel(capacity);
221        Self {
222            sender,
223            event_log: None,
224        }
225    }
226
227    /// Attach a persistent EventLog to this bus
228    ///
229    /// When set, every `publish()` call also appends the event to the log.
230    /// The append is done via `tokio::spawn` to avoid blocking the publisher.
231    ///
232    /// This enables the event flow system to consume events from the durable
233    /// log instead of the ephemeral broadcast channel.
234    pub fn with_event_log(mut self, event_log: Arc<dyn EventLog>) -> Self {
235        self.event_log = Some(event_log);
236        self
237    }
238
239    /// Get a reference to the attached EventLog, if any
240    pub fn event_log(&self) -> Option<&Arc<dyn EventLog>> {
241        self.event_log.as_ref()
242    }
243
244    /// Publish an event to all subscribers
245    ///
246    /// This is non-blocking and will never fail. If there are no subscribers,
247    /// the event is simply dropped. If subscribers are lagging, they will
248    /// receive a `Lagged` error on their next recv().
249    ///
250    /// If an EventLog is attached, the event is also appended to the log
251    /// asynchronously (fire-and-forget via tokio::spawn).
252    ///
253    /// Returns the number of broadcast receivers that will receive the event.
254    pub fn publish(&self, event: FrameworkEvent) -> usize {
255        // Create a single envelope shared between broadcast and EventLog
256        let envelope = EventEnvelope::new(event);
257
258        // If an EventLog is attached, append a clone to it (non-blocking)
259        if let Some(event_log) = &self.event_log {
260            let log = event_log.clone();
261            let envelope_clone = envelope.clone();
262            tokio::spawn(async move {
263                if let Err(e) = log.append(envelope_clone).await {
264                    tracing::warn!("Failed to append event to EventLog: {}", e);
265                }
266            });
267        }
268
269        // send() returns Err only if there are no receivers, which is fine
270        self.sender.send(envelope).unwrap_or(0)
271    }
272
273    /// Subscribe to events
274    ///
275    /// Returns a receiver that will get all future events published to the bus.
276    /// Events published before this call are not received.
277    pub fn subscribe(&self) -> broadcast::Receiver<EventEnvelope> {
278        self.sender.subscribe()
279    }
280
281    /// Get the current number of active subscribers
282    pub fn receiver_count(&self) -> usize {
283        self.sender.receiver_count()
284    }
285}
286
287impl Default for EventBus {
288    fn default() -> Self {
289        Self::new(1024)
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296    use serde_json::json;
297
298    #[test]
299    fn test_entity_event_created() {
300        let event = EntityEvent::Created {
301            entity_type: "order".to_string(),
302            entity_id: Uuid::new_v4(),
303            data: json!({"name": "Order #1"}),
304        };
305
306        let json = serde_json::to_value(&event).unwrap();
307        assert_eq!(json["action"], "created");
308        assert_eq!(json["entity_type"], "order");
309    }
310
311    #[test]
312    fn test_link_event_created() {
313        let event = LinkEvent::Created {
314            link_type: "has_invoice".to_string(),
315            link_id: Uuid::new_v4(),
316            source_id: Uuid::new_v4(),
317            target_id: Uuid::new_v4(),
318            metadata: Some(json!({"priority": "high"})),
319        };
320
321        let json = serde_json::to_value(&event).unwrap();
322        assert_eq!(json["action"], "created");
323        assert_eq!(json["link_type"], "has_invoice");
324    }
325
326    #[test]
327    fn test_framework_event_entity_type() {
328        let event = FrameworkEvent::Entity(EntityEvent::Updated {
329            entity_type: "invoice".to_string(),
330            entity_id: Uuid::new_v4(),
331            data: json!({"status": "paid"}),
332        });
333
334        assert_eq!(event.entity_type(), Some("invoice"));
335        assert_eq!(event.action(), "updated");
336        assert_eq!(event.event_kind(), "entity");
337    }
338
339    #[test]
340    fn test_framework_event_link() {
341        let event = FrameworkEvent::Link(LinkEvent::Deleted {
342            link_type: "has_invoice".to_string(),
343            link_id: Uuid::new_v4(),
344            source_id: Uuid::new_v4(),
345            target_id: Uuid::new_v4(),
346        });
347
348        assert_eq!(event.entity_type(), None);
349        assert_eq!(event.action(), "deleted");
350        assert_eq!(event.event_kind(), "link");
351    }
352
353    #[test]
354    fn test_event_envelope_has_metadata() {
355        let event = FrameworkEvent::Entity(EntityEvent::Created {
356            entity_type: "order".to_string(),
357            entity_id: Uuid::new_v4(),
358            data: json!({}),
359        });
360
361        let envelope = EventEnvelope::new(event);
362        assert!(!envelope.id.is_nil());
363        assert!(envelope.timestamp <= Utc::now());
364    }
365
366    #[test]
367    fn test_event_envelope_serialization_roundtrip() {
368        let event = FrameworkEvent::Entity(EntityEvent::Created {
369            entity_type: "order".to_string(),
370            entity_id: Uuid::new_v4(),
371            data: json!({"amount": 42.0}),
372        });
373
374        let envelope = EventEnvelope::new(event);
375        let json = serde_json::to_string(&envelope).unwrap();
376        let deserialized: EventEnvelope = serde_json::from_str(&json).unwrap();
377
378        assert_eq!(envelope.id, deserialized.id);
379        assert_eq!(envelope.event.event_kind(), deserialized.event.event_kind());
380    }
381
382    #[tokio::test]
383    async fn test_event_bus_publish_subscribe() {
384        let bus = EventBus::new(16);
385        let mut rx = bus.subscribe();
386
387        let entity_id = Uuid::new_v4();
388        let event = FrameworkEvent::Entity(EntityEvent::Created {
389            entity_type: "order".to_string(),
390            entity_id,
391            data: json!({"name": "Test Order"}),
392        });
393
394        let receivers = bus.publish(event);
395        assert_eq!(receivers, 1);
396
397        let received = rx.recv().await.unwrap();
398        assert_eq!(received.event.entity_id(), Some(entity_id));
399        assert_eq!(received.event.action(), "created");
400    }
401
402    #[tokio::test]
403    async fn test_event_bus_multiple_subscribers() {
404        let bus = EventBus::new(16);
405        let mut rx1 = bus.subscribe();
406        let mut rx2 = bus.subscribe();
407
408        assert_eq!(bus.receiver_count(), 2);
409
410        let event = FrameworkEvent::Entity(EntityEvent::Deleted {
411            entity_type: "order".to_string(),
412            entity_id: Uuid::new_v4(),
413        });
414
415        let receivers = bus.publish(event);
416        assert_eq!(receivers, 2);
417
418        let e1 = rx1.recv().await.unwrap();
419        let e2 = rx2.recv().await.unwrap();
420
421        assert_eq!(e1.id, e2.id); // Same event envelope
422    }
423
424    #[test]
425    fn test_event_bus_publish_without_subscribers() {
426        let bus = EventBus::new(16);
427
428        let event = FrameworkEvent::Entity(EntityEvent::Created {
429            entity_type: "order".to_string(),
430            entity_id: Uuid::new_v4(),
431            data: json!({}),
432        });
433
434        // Should not panic even with no subscribers
435        let receivers = bus.publish(event);
436        assert_eq!(receivers, 0);
437    }
438
439    #[test]
440    fn test_event_bus_default() {
441        let bus = EventBus::default();
442        assert_eq!(bus.receiver_count(), 0);
443    }
444
445    #[test]
446    fn test_event_bus_clone() {
447        let bus = EventBus::new(16);
448        let _rx = bus.subscribe();
449
450        let bus2 = bus.clone();
451        assert_eq!(bus2.receiver_count(), 1);
452
453        let _rx2 = bus2.subscribe();
454        assert_eq!(bus.receiver_count(), 2);
455    }
456
457    #[test]
458    fn test_entity_event_deleted_serialization() {
459        let entity_id = Uuid::new_v4();
460        let event = EntityEvent::Deleted {
461            entity_type: "invoice".to_string(),
462            entity_id,
463        };
464
465        let json = serde_json::to_value(&event).expect("EntityEvent::Deleted should serialize");
466        assert_eq!(json["action"], "deleted");
467        assert_eq!(json["entity_type"], "invoice");
468        assert_eq!(json["entity_id"], entity_id.to_string());
469        // Deleted variant should NOT have a "data" field
470        assert!(json.get("data").is_none());
471    }
472
473    #[test]
474    fn test_link_event_deleted_serialization() {
475        let link_id = Uuid::new_v4();
476        let source_id = Uuid::new_v4();
477        let target_id = Uuid::new_v4();
478        let event = LinkEvent::Deleted {
479            link_type: "ownership".to_string(),
480            link_id,
481            source_id,
482            target_id,
483        };
484
485        let json = serde_json::to_value(&event).expect("LinkEvent::Deleted should serialize");
486        assert_eq!(json["action"], "deleted");
487        assert_eq!(json["link_type"], "ownership");
488        assert_eq!(json["link_id"], link_id.to_string());
489        assert_eq!(json["source_id"], source_id.to_string());
490        assert_eq!(json["target_id"], target_id.to_string());
491        // Deleted variant should NOT have metadata
492        assert!(json.get("metadata").is_none());
493    }
494
495    #[test]
496    fn test_framework_event_entity_id_for_link_created() {
497        let link_id = Uuid::new_v4();
498        let event = FrameworkEvent::Link(LinkEvent::Created {
499            link_type: "worker".to_string(),
500            link_id,
501            source_id: Uuid::new_v4(),
502            target_id: Uuid::new_v4(),
503            metadata: None,
504        });
505
506        // entity_id() on a Link event should return the link_id
507        assert_eq!(event.entity_id(), Some(link_id));
508        // entity_type() should return None for link events
509        assert_eq!(event.entity_type(), None);
510    }
511
512    #[test]
513    fn test_framework_event_pattern_matching_all_entity_actions() {
514        let id = Uuid::new_v4();
515
516        let created = FrameworkEvent::Entity(EntityEvent::Created {
517            entity_type: "order".to_string(),
518            entity_id: id,
519            data: json!({}),
520        });
521        assert_eq!(created.action(), "created");
522        assert_eq!(created.event_kind(), "entity");
523        assert_eq!(created.entity_type(), Some("order"));
524        assert_eq!(created.entity_id(), Some(id));
525
526        let updated = FrameworkEvent::Entity(EntityEvent::Updated {
527            entity_type: "order".to_string(),
528            entity_id: id,
529            data: json!({"status": "shipped"}),
530        });
531        assert_eq!(updated.action(), "updated");
532
533        let deleted = FrameworkEvent::Entity(EntityEvent::Deleted {
534            entity_type: "order".to_string(),
535            entity_id: id,
536        });
537        assert_eq!(deleted.action(), "deleted");
538        assert_eq!(deleted.entity_id(), Some(id));
539    }
540
541    #[test]
542    fn test_framework_event_pattern_matching_all_link_actions() {
543        let link_id = Uuid::new_v4();
544
545        let created = FrameworkEvent::Link(LinkEvent::Created {
546            link_type: "driver".to_string(),
547            link_id,
548            source_id: Uuid::new_v4(),
549            target_id: Uuid::new_v4(),
550            metadata: Some(json!({"license": "B"})),
551        });
552        assert_eq!(created.action(), "created");
553        assert_eq!(created.event_kind(), "link");
554        assert_eq!(created.entity_id(), Some(link_id));
555
556        let deleted = FrameworkEvent::Link(LinkEvent::Deleted {
557            link_type: "driver".to_string(),
558            link_id,
559            source_id: Uuid::new_v4(),
560            target_id: Uuid::new_v4(),
561        });
562        assert_eq!(deleted.action(), "deleted");
563        assert_eq!(deleted.event_kind(), "link");
564        assert_eq!(deleted.entity_id(), Some(link_id));
565    }
566
567    #[tokio::test]
568    async fn test_event_bus_without_event_log() {
569        let bus = EventBus::new(16);
570        assert!(bus.event_log().is_none());
571
572        let mut rx = bus.subscribe();
573        bus.publish(FrameworkEvent::Entity(EntityEvent::Created {
574            entity_type: "order".to_string(),
575            entity_id: Uuid::new_v4(),
576            data: json!({}),
577        }));
578
579        let received = rx.recv().await.unwrap();
580        assert_eq!(received.event.action(), "created");
581    }
582
583    #[tokio::test]
584    async fn test_event_bus_with_event_log_bridge() {
585        use crate::events::log::EventLog;
586        use crate::events::memory::InMemoryEventLog;
587
588        let event_log = Arc::new(InMemoryEventLog::new());
589        let bus = EventBus::new(16).with_event_log(event_log.clone());
590
591        assert!(bus.event_log().is_some());
592
593        let mut rx = bus.subscribe();
594
595        // Publish an event
596        bus.publish(FrameworkEvent::Entity(EntityEvent::Created {
597            entity_type: "user".to_string(),
598            entity_id: Uuid::new_v4(),
599            data: json!({"name": "Alice"}),
600        }));
601
602        // Should receive via broadcast
603        let received = rx.recv().await.unwrap();
604        assert_eq!(received.event.entity_type(), Some("user"));
605
606        // Wait for the spawned task to complete
607        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
608
609        // Event should also be in the EventLog
610        assert_eq!(event_log.last_seq_no().await, Some(1));
611    }
612
613    #[tokio::test]
614    async fn test_event_bus_bridge_multiple_events() {
615        use crate::events::log::EventLog;
616        use crate::events::memory::InMemoryEventLog;
617        use crate::events::types::SeekPosition;
618        use tokio_stream::StreamExt;
619
620        let event_log = Arc::new(InMemoryEventLog::new());
621        let bus = EventBus::new(16).with_event_log(event_log.clone());
622
623        // Publish 5 events
624        for i in 0..5 {
625            bus.publish(FrameworkEvent::Entity(EntityEvent::Created {
626                entity_type: format!("type_{i}"),
627                entity_id: Uuid::new_v4(),
628                data: json!({}),
629            }));
630        }
631
632        // Wait for spawned tasks
633        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
634
635        // All events should be in the log
636        assert_eq!(event_log.last_seq_no().await, Some(5));
637
638        // Subscribe from beginning and replay
639        let stream = event_log
640            .subscribe("test", SeekPosition::Beginning)
641            .await
642            .unwrap();
643        let events: Vec<_> = stream.take(5).collect().await;
644        assert_eq!(events.len(), 5);
645    }
646
647    #[tokio::test]
648    async fn test_event_bus_backward_compatible_default() {
649        // Default bus has no event_log — same behavior as before
650        let bus = EventBus::default();
651        assert!(bus.event_log().is_none());
652        assert_eq!(bus.receiver_count(), 0);
653
654        // Publishing without subscribers or log should not panic
655        let receivers = bus.publish(FrameworkEvent::Entity(EntityEvent::Created {
656            entity_type: "order".to_string(),
657            entity_id: Uuid::new_v4(),
658            data: json!({}),
659        }));
660        assert_eq!(receivers, 0);
661    }
662}