ironflow_engine/notify/
publisher.rs1use std::sync::Arc;
4
5use tokio::spawn;
6
7use super::{Event, EventSubscriber};
8
9struct Subscription {
11 subscriber: Arc<dyn EventSubscriber>,
12 event_types: Vec<&'static str>,
13}
14
15impl Subscription {
16 fn accepts(&self, event: &Event) -> bool {
18 self.event_types.contains(&event.event_type())
19 }
20}
21
22pub struct EventPublisher {
40 subscriptions: Vec<Subscription>,
41}
42
43impl EventPublisher {
44 pub fn new() -> Self {
55 Self {
56 subscriptions: Vec::new(),
57 }
58 }
59
60 pub fn subscribe(
88 &mut self,
89 subscriber: impl EventSubscriber + 'static,
90 event_types: &[&'static str],
91 ) {
92 self.subscriptions.push(Subscription {
93 subscriber: Arc::new(subscriber),
94 event_types: event_types.to_vec(),
95 });
96 }
97
98 pub fn subscriber_count(&self) -> usize {
100 self.subscriptions.len()
101 }
102
103 pub fn publish(&self, event: Event) {
108 for subscription in &self.subscriptions {
109 if !subscription.accepts(&event) {
110 continue;
111 }
112 let subscriber = subscription.subscriber.clone();
113 let event = event.clone();
114 spawn(async move {
115 subscriber.handle(&event).await;
116 });
117 }
118 }
119}
120
121impl Default for EventPublisher {
122 fn default() -> Self {
123 Self::new()
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130 use crate::notify::{SubscriberFuture, WebhookSubscriber};
131 use rust_decimal::Decimal;
132 use std::sync::atomic::{AtomicU32, Ordering};
133 use std::time::Duration;
134 use tokio::time::sleep;
135
136 use chrono::Utc;
137 use ironflow_store::models::RunStatus;
138 use uuid::Uuid;
139
140 fn sample_run_status_changed() -> Event {
141 Event::RunStatusChanged {
142 run_id: Uuid::now_v7(),
143 workflow_name: "deploy".to_string(),
144 from: RunStatus::Running,
145 to: RunStatus::Completed,
146 error: None,
147 cost_usd: Decimal::new(42, 2),
148 duration_ms: 5000,
149 at: Utc::now(),
150 }
151 }
152
153 fn sample_user_signed_in() -> Event {
154 Event::UserSignedIn {
155 user_id: Uuid::now_v7(),
156 username: "alice".to_string(),
157 at: Utc::now(),
158 }
159 }
160
161 #[test]
162 fn starts_empty() {
163 let publisher = EventPublisher::new();
164 assert_eq!(publisher.subscriber_count(), 0);
165 }
166
167 #[test]
168 fn subscribe_increments_count() {
169 let mut publisher = EventPublisher::new();
170 publisher.subscribe(
171 WebhookSubscriber::new("https://example.com"),
172 &[Event::RUN_STATUS_CHANGED],
173 );
174 assert_eq!(publisher.subscriber_count(), 1);
175 }
176
177 #[test]
178 fn publish_with_no_subscribers_is_noop() {
179 let publisher = EventPublisher::new();
180 publisher.publish(sample_run_status_changed());
181 }
182
183 #[test]
184 fn default_is_empty() {
185 let publisher = EventPublisher::default();
186 assert_eq!(publisher.subscriber_count(), 0);
187 }
188
189 struct CountingSubscriber {
190 count: AtomicU32,
191 }
192
193 impl CountingSubscriber {
194 fn new() -> Self {
195 Self {
196 count: AtomicU32::new(0),
197 }
198 }
199
200 fn count(&self) -> u32 {
201 self.count.load(Ordering::SeqCst)
202 }
203 }
204
205 impl EventSubscriber for CountingSubscriber {
206 fn name(&self) -> &str {
207 "counting"
208 }
209
210 fn handle<'a>(&'a self, _event: &'a Event) -> SubscriberFuture<'a> {
211 Box::pin(async move {
212 self.count.fetch_add(1, Ordering::SeqCst);
213 })
214 }
215 }
216
217 #[tokio::test]
218 async fn subscriber_receives_matching_events() {
219 let subscriber = Arc::new(CountingSubscriber::new());
220 let mut publisher = EventPublisher::new();
221
222 struct ArcSub(Arc<CountingSubscriber>);
223 impl EventSubscriber for ArcSub {
224 fn name(&self) -> &str {
225 self.0.name()
226 }
227 fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
228 self.0.handle(event)
229 }
230 }
231
232 publisher.subscribe(ArcSub(subscriber.clone()), &[Event::RUN_STATUS_CHANGED]);
233
234 publisher.publish(sample_run_status_changed()); publisher.publish(sample_user_signed_in()); sleep(Duration::from_millis(50)).await;
238
239 assert_eq!(subscriber.count(), 1);
240 }
241
242 #[tokio::test]
243 async fn all_filter_matches_everything() {
244 let subscriber = Arc::new(CountingSubscriber::new());
245 let mut publisher = EventPublisher::new();
246
247 struct ArcSub(Arc<CountingSubscriber>);
248 impl EventSubscriber for ArcSub {
249 fn name(&self) -> &str {
250 self.0.name()
251 }
252 fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
253 self.0.handle(event)
254 }
255 }
256
257 publisher.subscribe(ArcSub(subscriber.clone()), Event::ALL);
258
259 publisher.publish(sample_run_status_changed());
260 publisher.publish(sample_user_signed_in());
261
262 sleep(Duration::from_millis(50)).await;
263
264 assert_eq!(subscriber.count(), 2);
265 }
266
267 #[tokio::test]
268 async fn empty_filter_matches_nothing() {
269 let subscriber = Arc::new(CountingSubscriber::new());
270 let mut publisher = EventPublisher::new();
271
272 struct ArcSub(Arc<CountingSubscriber>);
273 impl EventSubscriber for ArcSub {
274 fn name(&self) -> &str {
275 self.0.name()
276 }
277 fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
278 self.0.handle(event)
279 }
280 }
281
282 publisher.subscribe(ArcSub(subscriber.clone()), &[]);
283
284 publisher.publish(sample_run_status_changed());
285 publisher.publish(sample_user_signed_in());
286
287 sleep(Duration::from_millis(50)).await;
288
289 assert_eq!(subscriber.count(), 0);
290 }
291}