Skip to main content

ironflow_engine/notify/
subscriber.rs

1//! [`EventSubscriber`] trait -- react to domain events.
2
3use std::future::Future;
4use std::pin::Pin;
5
6use super::Event;
7
8/// Boxed future returned by [`EventSubscriber::handle`].
9pub type SubscriberFuture<'a> = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
10
11/// A subscriber that reacts to domain events.
12///
13/// Implement this trait to create custom notification channels (Slack,
14/// Discord, PagerDuty, etc.). The engine broadcasts events to all
15/// registered subscribers via [`EventPublisher`](super::EventPublisher).
16///
17/// # Contract
18///
19/// - [`handle`](EventSubscriber::handle) is called only for events that
20///   match the filter configured at subscription time.
21/// - Implementations must not block -- heavy work should be spawned.
22/// - Errors are logged internally; they must not propagate.
23///
24/// # Examples
25///
26/// ```no_run
27/// use ironflow_engine::notify::{EventSubscriber, Event, SubscriberFuture};
28///
29/// struct LogSubscriber;
30///
31/// impl EventSubscriber for LogSubscriber {
32///     fn name(&self) -> &str { "log" }
33///
34///     fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
35///         Box::pin(async move {
36///             println!("[{}] {:?}", event.event_type(), event);
37///         })
38///     }
39/// }
40/// ```
41pub trait EventSubscriber: Send + Sync {
42    /// A short identifier for this subscriber (used in logs).
43    fn name(&self) -> &str;
44
45    /// Handle a domain event.
46    ///
47    /// Only called for events matching the filter set at subscription
48    /// time. The subscriber does not need to filter.
49    fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a>;
50}
51
52#[cfg(test)]
53mod tests {
54    use super::*;
55
56    struct TestSubscriber {
57        name: String,
58    }
59
60    impl EventSubscriber for TestSubscriber {
61        fn name(&self) -> &str {
62            &self.name
63        }
64
65        fn handle<'a>(&'a self, _event: &'a Event) -> SubscriberFuture<'a> {
66            Box::pin(async move {
67                // No-op for testing
68            })
69        }
70    }
71
72    struct CountingSubscriber {
73        name: String,
74    }
75
76    impl EventSubscriber for CountingSubscriber {
77        fn name(&self) -> &str {
78            &self.name
79        }
80
81        fn handle<'a>(&'a self, _event: &'a Event) -> SubscriberFuture<'a> {
82            Box::pin(async move {
83                // Simulates async work
84                tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
85            })
86        }
87    }
88
89    #[test]
90    fn subscriber_has_identifier_name() {
91        let sub = TestSubscriber {
92            name: "test_sub".to_string(),
93        };
94        assert_eq!(sub.name(), "test_sub");
95    }
96
97    #[test]
98    fn subscriber_name_is_consistent() {
99        let sub = TestSubscriber {
100            name: "my_subscriber".to_string(),
101        };
102        assert_eq!(sub.name(), "my_subscriber");
103        assert_eq!(sub.name(), "my_subscriber");
104    }
105
106    #[test]
107    fn different_subscribers_have_different_names() {
108        let sub1 = TestSubscriber {
109            name: "sub1".to_string(),
110        };
111        let sub2 = TestSubscriber {
112            name: "sub2".to_string(),
113        };
114
115        assert_ne!(sub1.name(), sub2.name());
116    }
117
118    #[tokio::test]
119    async fn subscriber_handle_completes_successfully() {
120        use chrono::Utc;
121        let sub = TestSubscriber {
122            name: "test".to_string(),
123        };
124
125        // Create a dummy event for testing
126        let event = Event::RunCreated {
127            run_id: uuid::Uuid::now_v7(),
128            workflow_name: "test-wf".to_string(),
129            at: Utc::now(),
130        };
131
132        // Should complete without error
133        sub.handle(&event).await;
134    }
135
136    #[tokio::test]
137    async fn subscriber_handle_is_async() {
138        use chrono::Utc;
139        let sub = CountingSubscriber {
140            name: "async_test".to_string(),
141        };
142
143        let event = Event::RunCreated {
144            run_id: uuid::Uuid::now_v7(),
145            workflow_name: "test".to_string(),
146            at: Utc::now(),
147        };
148
149        let start = std::time::Instant::now();
150        sub.handle(&event).await;
151        let elapsed = start.elapsed();
152
153        // Should have taken at least 1ms due to the sleep
154        assert!(elapsed.as_millis() >= 1);
155    }
156
157    #[tokio::test]
158    async fn multiple_subscribers_can_handle_same_event() {
159        use chrono::Utc;
160        let sub1 = TestSubscriber {
161            name: "sub1".to_string(),
162        };
163        let sub2 = TestSubscriber {
164            name: "sub2".to_string(),
165        };
166
167        let event = Event::RunCreated {
168            run_id: uuid::Uuid::now_v7(),
169            workflow_name: "test".to_string(),
170            at: Utc::now(),
171        };
172
173        // Both should handle without issue
174        sub1.handle(&event).await;
175        sub2.handle(&event).await;
176    }
177
178    #[test]
179    fn subscriber_implements_send_sync() {
180        fn assert_send_sync<T: Send + Sync>() {}
181        assert_send_sync::<TestSubscriber>();
182        assert_send_sync::<CountingSubscriber>();
183    }
184
185    #[tokio::test]
186    async fn subscriber_future_is_boxed() {
187        use chrono::Utc;
188        let sub = TestSubscriber {
189            name: "boxed_test".to_string(),
190        };
191
192        let event = Event::RunCreated {
193            run_id: uuid::Uuid::now_v7(),
194            workflow_name: "test".to_string(),
195            at: Utc::now(),
196        };
197
198        let future = sub.handle(&event);
199        // The future should be a Pin<Box<_>> and awaitable
200        let _ = future.await;
201    }
202
203    #[test]
204    fn subscriber_name_borrowed_lifetime() {
205        let sub = TestSubscriber {
206            name: "lifetime_test".to_string(),
207        };
208
209        let name1 = sub.name();
210        let name2 = sub.name();
211
212        // Both should be valid references
213        assert_eq!(name1, name2);
214        assert_eq!(name1, "lifetime_test");
215    }
216}