Skip to main content

ironflow_engine/notify/
publisher.rs

1//! [`EventPublisher`] -- broadcasts events to filtered subscribers.
2
3use std::sync::Arc;
4
5use tokio::spawn;
6
7use super::{Event, EventSubscriber};
8
9/// A subscriber paired with its event type filter.
10struct Subscription {
11    subscriber: Arc<dyn EventSubscriber>,
12    event_types: Vec<&'static str>,
13}
14
15impl Subscription {
16    /// Returns `true` if this subscription accepts the given event.
17    fn accepts(&self, event: &Event) -> bool {
18        self.event_types.contains(&event.event_type())
19    }
20}
21
22/// Broadcasts [`Event`]s to registered [`EventSubscriber`]s.
23///
24/// Each subscriber is paired with an event type filter at subscription
25/// time. Only matching events are dispatched. Each call runs in a
26/// spawned task so that slow subscribers do not block the engine.
27///
28/// # Examples
29///
30/// ```no_run
31/// use ironflow_engine::notify::{EventPublisher, WebhookSubscriber, Event};
32///
33/// let mut publisher = EventPublisher::new();
34/// publisher.subscribe(
35///     WebhookSubscriber::new("https://hooks.example.com/events"),
36///     &[Event::RUN_STATUS_CHANGED, Event::STEP_FAILED],
37/// );
38/// ```
39pub struct EventPublisher {
40    subscriptions: Vec<Subscription>,
41}
42
43impl EventPublisher {
44    /// Create an empty publisher with no subscribers.
45    ///
46    /// # Examples
47    ///
48    /// ```
49    /// use ironflow_engine::notify::EventPublisher;
50    ///
51    /// let publisher = EventPublisher::new();
52    /// assert_eq!(publisher.subscriber_count(), 0);
53    /// ```
54    pub fn new() -> Self {
55        Self {
56            subscriptions: Vec::new(),
57        }
58    }
59
60    /// Register a subscriber with an event type filter.
61    ///
62    /// The subscriber is called only for events whose
63    /// [`event_type()`](Event::event_type) is in `event_types`.
64    /// Pass [`Event::ALL`] to receive every event.
65    ///
66    /// Use the `Event::*` constants for the filter values.
67    ///
68    /// # Examples
69    ///
70    /// ```no_run
71    /// use ironflow_engine::notify::{EventPublisher, WebhookSubscriber, Event};
72    ///
73    /// let mut publisher = EventPublisher::new();
74    ///
75    /// // Only on specific event types:
76    /// publisher.subscribe(
77    ///     WebhookSubscriber::new("https://example.com/hook"),
78    ///     &[Event::RUN_STATUS_CHANGED, Event::STEP_FAILED],
79    /// );
80    ///
81    /// // On all events:
82    /// publisher.subscribe(
83    ///     WebhookSubscriber::new("https://example.com/all"),
84    ///     Event::ALL,
85    /// );
86    /// ```
87    pub fn subscribe(
88        &mut self,
89        subscriber: impl EventSubscriber + 'static,
90        event_types: &[&'static str],
91    ) {
92        self.subscriptions.push(Subscription {
93            subscriber: Arc::new(subscriber),
94            event_types: event_types.to_vec(),
95        });
96    }
97
98    /// Number of registered subscribers.
99    pub fn subscriber_count(&self) -> usize {
100        self.subscriptions.len()
101    }
102
103    /// Broadcast an event to all matching subscribers.
104    ///
105    /// Each matching subscriber runs in its own spawned task. This
106    /// method returns immediately and never blocks.
107    pub fn publish(&self, event: Event) {
108        for subscription in &self.subscriptions {
109            if !subscription.accepts(&event) {
110                continue;
111            }
112            let subscriber = subscription.subscriber.clone();
113            let event = event.clone();
114            spawn(async move {
115                subscriber.handle(&event).await;
116            });
117        }
118    }
119}
120
121impl Default for EventPublisher {
122    fn default() -> Self {
123        Self::new()
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130    use crate::notify::{SubscriberFuture, WebhookSubscriber};
131    use rust_decimal::Decimal;
132    use std::sync::atomic::{AtomicU32, Ordering};
133    use std::time::Duration;
134    use tokio::time::sleep;
135
136    use chrono::Utc;
137    use ironflow_store::models::RunStatus;
138    use uuid::Uuid;
139
140    fn sample_run_status_changed() -> Event {
141        Event::RunStatusChanged {
142            run_id: Uuid::now_v7(),
143            workflow_name: "deploy".to_string(),
144            from: RunStatus::Running,
145            to: RunStatus::Completed,
146            error: None,
147            cost_usd: Decimal::new(42, 2),
148            duration_ms: 5000,
149            at: Utc::now(),
150        }
151    }
152
153    fn sample_user_signed_in() -> Event {
154        Event::UserSignedIn {
155            user_id: Uuid::now_v7(),
156            username: "alice".to_string(),
157            at: Utc::now(),
158        }
159    }
160
161    #[test]
162    fn starts_empty() {
163        let publisher = EventPublisher::new();
164        assert_eq!(publisher.subscriber_count(), 0);
165    }
166
167    #[test]
168    fn subscribe_increments_count() {
169        let mut publisher = EventPublisher::new();
170        publisher.subscribe(
171            WebhookSubscriber::new("https://example.com"),
172            &[Event::RUN_STATUS_CHANGED],
173        );
174        assert_eq!(publisher.subscriber_count(), 1);
175    }
176
177    #[test]
178    fn publish_with_no_subscribers_is_noop() {
179        let publisher = EventPublisher::new();
180        publisher.publish(sample_run_status_changed());
181    }
182
183    #[test]
184    fn default_is_empty() {
185        let publisher = EventPublisher::default();
186        assert_eq!(publisher.subscriber_count(), 0);
187    }
188
189    struct CountingSubscriber {
190        count: AtomicU32,
191    }
192
193    impl CountingSubscriber {
194        fn new() -> Self {
195            Self {
196                count: AtomicU32::new(0),
197            }
198        }
199
200        fn count(&self) -> u32 {
201            self.count.load(Ordering::SeqCst)
202        }
203    }
204
205    impl EventSubscriber for CountingSubscriber {
206        fn name(&self) -> &str {
207            "counting"
208        }
209
210        fn handle<'a>(&'a self, _event: &'a Event) -> SubscriberFuture<'a> {
211            Box::pin(async move {
212                self.count.fetch_add(1, Ordering::SeqCst);
213            })
214        }
215    }
216
217    #[tokio::test]
218    async fn subscriber_receives_matching_events() {
219        let subscriber = Arc::new(CountingSubscriber::new());
220        let mut publisher = EventPublisher::new();
221
222        struct ArcSub(Arc<CountingSubscriber>);
223        impl EventSubscriber for ArcSub {
224            fn name(&self) -> &str {
225                self.0.name()
226            }
227            fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
228                self.0.handle(event)
229            }
230        }
231
232        publisher.subscribe(ArcSub(subscriber.clone()), &[Event::RUN_STATUS_CHANGED]);
233
234        publisher.publish(sample_run_status_changed()); // matches
235        publisher.publish(sample_user_signed_in()); // filtered out
236
237        sleep(Duration::from_millis(50)).await;
238
239        assert_eq!(subscriber.count(), 1);
240    }
241
242    #[tokio::test]
243    async fn all_filter_matches_everything() {
244        let subscriber = Arc::new(CountingSubscriber::new());
245        let mut publisher = EventPublisher::new();
246
247        struct ArcSub(Arc<CountingSubscriber>);
248        impl EventSubscriber for ArcSub {
249            fn name(&self) -> &str {
250                self.0.name()
251            }
252            fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
253                self.0.handle(event)
254            }
255        }
256
257        publisher.subscribe(ArcSub(subscriber.clone()), Event::ALL);
258
259        publisher.publish(sample_run_status_changed());
260        publisher.publish(sample_user_signed_in());
261
262        sleep(Duration::from_millis(50)).await;
263
264        assert_eq!(subscriber.count(), 2);
265    }
266
267    #[tokio::test]
268    async fn empty_filter_matches_nothing() {
269        let subscriber = Arc::new(CountingSubscriber::new());
270        let mut publisher = EventPublisher::new();
271
272        struct ArcSub(Arc<CountingSubscriber>);
273        impl EventSubscriber for ArcSub {
274            fn name(&self) -> &str {
275                self.0.name()
276            }
277            fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
278                self.0.handle(event)
279            }
280        }
281
282        publisher.subscribe(ArcSub(subscriber.clone()), &[]);
283
284        publisher.publish(sample_run_status_changed());
285        publisher.publish(sample_user_signed_in());
286
287        sleep(Duration::from_millis(50)).await;
288
289        assert_eq!(subscriber.count(), 0);
290    }
291}