Skip to main content

ironflow_api/
sse.rs

1//! Server-Sent Events broadcaster for real-time event streaming.
2//!
3//! [`SseBroadcaster`] implements [`EventSubscriber`] and forwards events into
4//! a [`tokio::sync::broadcast`] channel. The SSE route creates receivers from
5//! this channel and streams them to connected clients.
6
7use tokio::sync::broadcast;
8
9use ironflow_engine::notify::{Event, EventSubscriber, SubscriberFuture};
10
11/// Default broadcast channel capacity.
12const DEFAULT_CAPACITY: usize = 256;
13
14/// Broadcasts [`Event`]s to SSE clients via a [`tokio::sync::broadcast`] channel.
15///
16/// Register this as an [`EventSubscriber`] on the [`Engine`](ironflow_engine::engine::Engine)
17/// to forward all (or filtered) domain events to connected SSE clients.
18///
19/// # Examples
20///
21/// ```
22/// use ironflow_api::sse::SseBroadcaster;
23///
24/// let broadcaster = SseBroadcaster::new();
25/// assert_eq!(broadcaster.receiver_count(), 0);
26/// let _receiver = broadcaster.subscribe();
27/// assert_eq!(broadcaster.receiver_count(), 1);
28/// ```
29pub struct SseBroadcaster {
30    sender: broadcast::Sender<Event>,
31}
32
33impl SseBroadcaster {
34    /// Create a new broadcaster with the default capacity (256).
35    ///
36    /// # Examples
37    ///
38    /// ```
39    /// use ironflow_api::sse::SseBroadcaster;
40    ///
41    /// let broadcaster = SseBroadcaster::new();
42    /// ```
43    pub fn new() -> Self {
44        Self::with_capacity(DEFAULT_CAPACITY)
45    }
46
47    /// Create a new broadcaster with a custom channel capacity.
48    ///
49    /// # Examples
50    ///
51    /// ```
52    /// use ironflow_api::sse::SseBroadcaster;
53    ///
54    /// let broadcaster = SseBroadcaster::with_capacity(64);
55    /// ```
56    pub fn with_capacity(capacity: usize) -> Self {
57        let (sender, _) = broadcast::channel(capacity);
58        Self { sender }
59    }
60
61    /// Create a new receiver for the broadcast channel.
62    ///
63    /// Each SSE client connection calls this to get its own receiver.
64    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
65        self.sender.subscribe()
66    }
67
68    /// Returns the number of active receivers (connected SSE clients).
69    pub fn receiver_count(&self) -> usize {
70        self.sender.receiver_count()
71    }
72
73    /// Returns a clone of the underlying sender.
74    ///
75    /// Stored in [`AppState`](crate::state::AppState) so that the SSE route
76    /// can create receivers without holding a reference to the broadcaster.
77    pub fn sender(&self) -> broadcast::Sender<Event> {
78        self.sender.clone()
79    }
80}
81
82impl Default for SseBroadcaster {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88impl EventSubscriber for SseBroadcaster {
89    fn name(&self) -> &str {
90        "sse"
91    }
92
93    fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
94        let event = event.clone();
95        Box::pin(async move {
96            // Ignore send errors: they mean no receivers are connected.
97            let _ = self.sender.send(event);
98        })
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105    use chrono::Utc;
106    use ironflow_store::models::RunStatus;
107    use rust_decimal::Decimal;
108    use uuid::Uuid;
109
110    fn sample_event() -> Event {
111        Event::RunStatusChanged {
112            run_id: Uuid::now_v7(),
113            workflow_name: "deploy".to_string(),
114            from: RunStatus::Running,
115            to: RunStatus::Completed,
116            error: None,
117            cost_usd: Decimal::ZERO,
118            duration_ms: 1000,
119            at: Utc::now(),
120        }
121    }
122
123    #[test]
124    fn new_creates_broadcaster() {
125        let broadcaster = SseBroadcaster::new();
126        assert_eq!(broadcaster.receiver_count(), 0);
127    }
128
129    #[test]
130    fn default_creates_broadcaster() {
131        let broadcaster = SseBroadcaster::default();
132        assert_eq!(broadcaster.receiver_count(), 0);
133    }
134
135    #[test]
136    fn subscribe_creates_receiver() {
137        let broadcaster = SseBroadcaster::new();
138        let _rx = broadcaster.subscribe();
139        assert_eq!(broadcaster.receiver_count(), 1);
140    }
141
142    #[test]
143    fn receiver_count_tracks_active_receivers() {
144        let broadcaster = SseBroadcaster::new();
145        let _rx1 = broadcaster.subscribe();
146        let _rx2 = broadcaster.subscribe();
147        assert_eq!(broadcaster.receiver_count(), 2);
148        drop(_rx1);
149        assert_eq!(broadcaster.receiver_count(), 1);
150    }
151
152    #[tokio::test]
153    async fn handle_sends_event_to_receivers() {
154        let broadcaster = SseBroadcaster::new();
155        let mut rx = broadcaster.subscribe();
156
157        let event = sample_event();
158        broadcaster.handle(&event).await;
159
160        let received = rx.recv().await.expect("should receive event");
161        assert_eq!(received.event_type(), "run_status_changed");
162    }
163
164    #[tokio::test]
165    async fn handle_no_receivers_does_not_panic() {
166        let broadcaster = SseBroadcaster::new();
167        let event = sample_event();
168        // No receivers -- should not panic
169        broadcaster.handle(&event).await;
170    }
171
172    #[test]
173    fn sender_returns_clone() {
174        let broadcaster = SseBroadcaster::new();
175        let sender = broadcaster.sender();
176        let _rx = sender.subscribe();
177        // The receiver from the cloned sender counts on the original channel
178        assert_eq!(broadcaster.receiver_count(), 1);
179    }
180
181    #[test]
182    fn name_returns_sse() {
183        let broadcaster = SseBroadcaster::new();
184        assert_eq!(broadcaster.name(), "sse");
185    }
186}