1use tokio::sync::broadcast;
8
9use ironflow_engine::notify::{Event, EventSubscriber, SubscriberFuture};
10
11const DEFAULT_CAPACITY: usize = 256;
13
14pub struct SseBroadcaster {
30 sender: broadcast::Sender<Event>,
31}
32
33impl SseBroadcaster {
34 pub fn new() -> Self {
44 Self::with_capacity(DEFAULT_CAPACITY)
45 }
46
47 pub fn with_capacity(capacity: usize) -> Self {
57 let (sender, _) = broadcast::channel(capacity);
58 Self { sender }
59 }
60
61 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
65 self.sender.subscribe()
66 }
67
68 pub fn receiver_count(&self) -> usize {
70 self.sender.receiver_count()
71 }
72
73 pub fn sender(&self) -> broadcast::Sender<Event> {
78 self.sender.clone()
79 }
80}
81
82impl Default for SseBroadcaster {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88impl EventSubscriber for SseBroadcaster {
89 fn name(&self) -> &str {
90 "sse"
91 }
92
93 fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
94 let event = event.clone();
95 Box::pin(async move {
96 let _ = self.sender.send(event);
98 })
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105 use chrono::Utc;
106 use ironflow_store::models::RunStatus;
107 use rust_decimal::Decimal;
108 use uuid::Uuid;
109
110 fn sample_event() -> Event {
111 Event::RunStatusChanged {
112 run_id: Uuid::now_v7(),
113 workflow_name: "deploy".to_string(),
114 from: RunStatus::Running,
115 to: RunStatus::Completed,
116 error: None,
117 cost_usd: Decimal::ZERO,
118 duration_ms: 1000,
119 at: Utc::now(),
120 }
121 }
122
123 #[test]
124 fn new_creates_broadcaster() {
125 let broadcaster = SseBroadcaster::new();
126 assert_eq!(broadcaster.receiver_count(), 0);
127 }
128
129 #[test]
130 fn default_creates_broadcaster() {
131 let broadcaster = SseBroadcaster::default();
132 assert_eq!(broadcaster.receiver_count(), 0);
133 }
134
135 #[test]
136 fn subscribe_creates_receiver() {
137 let broadcaster = SseBroadcaster::new();
138 let _rx = broadcaster.subscribe();
139 assert_eq!(broadcaster.receiver_count(), 1);
140 }
141
142 #[test]
143 fn receiver_count_tracks_active_receivers() {
144 let broadcaster = SseBroadcaster::new();
145 let _rx1 = broadcaster.subscribe();
146 let _rx2 = broadcaster.subscribe();
147 assert_eq!(broadcaster.receiver_count(), 2);
148 drop(_rx1);
149 assert_eq!(broadcaster.receiver_count(), 1);
150 }
151
152 #[tokio::test]
153 async fn handle_sends_event_to_receivers() {
154 let broadcaster = SseBroadcaster::new();
155 let mut rx = broadcaster.subscribe();
156
157 let event = sample_event();
158 broadcaster.handle(&event).await;
159
160 let received = rx.recv().await.expect("should receive event");
161 assert_eq!(received.event_type(), "run_status_changed");
162 }
163
164 #[tokio::test]
165 async fn handle_no_receivers_does_not_panic() {
166 let broadcaster = SseBroadcaster::new();
167 let event = sample_event();
168 broadcaster.handle(&event).await;
170 }
171
172 #[test]
173 fn sender_returns_clone() {
174 let broadcaster = SseBroadcaster::new();
175 let sender = broadcaster.sender();
176 let _rx = sender.subscribe();
177 assert_eq!(broadcaster.receiver_count(), 1);
179 }
180
181 #[test]
182 fn name_returns_sse() {
183 let broadcaster = SseBroadcaster::new();
184 assert_eq!(broadcaster.name(), "sse");
185 }
186}