ironflow_engine/notify/
subscriber.rs1use std::future::Future;
4use std::pin::Pin;
5
6use super::Event;
7
8pub type SubscriberFuture<'a> = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
10
11pub trait EventSubscriber: Send + Sync {
42 fn name(&self) -> &str;
44
45 fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a>;
50}
51
52#[cfg(test)]
53mod tests {
54 use super::*;
55
56 struct TestSubscriber {
57 name: String,
58 }
59
60 impl EventSubscriber for TestSubscriber {
61 fn name(&self) -> &str {
62 &self.name
63 }
64
65 fn handle<'a>(&'a self, _event: &'a Event) -> SubscriberFuture<'a> {
66 Box::pin(async move {
67 })
69 }
70 }
71
72 struct CountingSubscriber {
73 name: String,
74 }
75
76 impl EventSubscriber for CountingSubscriber {
77 fn name(&self) -> &str {
78 &self.name
79 }
80
81 fn handle<'a>(&'a self, _event: &'a Event) -> SubscriberFuture<'a> {
82 Box::pin(async move {
83 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
85 })
86 }
87 }
88
89 #[test]
90 fn subscriber_has_identifier_name() {
91 let sub = TestSubscriber {
92 name: "test_sub".to_string(),
93 };
94 assert_eq!(sub.name(), "test_sub");
95 }
96
97 #[test]
98 fn subscriber_name_is_consistent() {
99 let sub = TestSubscriber {
100 name: "my_subscriber".to_string(),
101 };
102 assert_eq!(sub.name(), "my_subscriber");
103 assert_eq!(sub.name(), "my_subscriber");
104 }
105
106 #[test]
107 fn different_subscribers_have_different_names() {
108 let sub1 = TestSubscriber {
109 name: "sub1".to_string(),
110 };
111 let sub2 = TestSubscriber {
112 name: "sub2".to_string(),
113 };
114
115 assert_ne!(sub1.name(), sub2.name());
116 }
117
118 #[tokio::test]
119 async fn subscriber_handle_completes_successfully() {
120 use chrono::Utc;
121 let sub = TestSubscriber {
122 name: "test".to_string(),
123 };
124
125 let event = Event::RunCreated {
127 run_id: uuid::Uuid::now_v7(),
128 workflow_name: "test-wf".to_string(),
129 at: Utc::now(),
130 };
131
132 sub.handle(&event).await;
134 }
135
136 #[tokio::test]
137 async fn subscriber_handle_is_async() {
138 use chrono::Utc;
139 let sub = CountingSubscriber {
140 name: "async_test".to_string(),
141 };
142
143 let event = Event::RunCreated {
144 run_id: uuid::Uuid::now_v7(),
145 workflow_name: "test".to_string(),
146 at: Utc::now(),
147 };
148
149 let start = std::time::Instant::now();
150 sub.handle(&event).await;
151 let elapsed = start.elapsed();
152
153 assert!(elapsed.as_millis() >= 1);
155 }
156
157 #[tokio::test]
158 async fn multiple_subscribers_can_handle_same_event() {
159 use chrono::Utc;
160 let sub1 = TestSubscriber {
161 name: "sub1".to_string(),
162 };
163 let sub2 = TestSubscriber {
164 name: "sub2".to_string(),
165 };
166
167 let event = Event::RunCreated {
168 run_id: uuid::Uuid::now_v7(),
169 workflow_name: "test".to_string(),
170 at: Utc::now(),
171 };
172
173 sub1.handle(&event).await;
175 sub2.handle(&event).await;
176 }
177
178 #[test]
179 fn subscriber_implements_send_sync() {
180 fn assert_send_sync<T: Send + Sync>() {}
181 assert_send_sync::<TestSubscriber>();
182 assert_send_sync::<CountingSubscriber>();
183 }
184
185 #[tokio::test]
186 async fn subscriber_future_is_boxed() {
187 use chrono::Utc;
188 let sub = TestSubscriber {
189 name: "boxed_test".to_string(),
190 };
191
192 let event = Event::RunCreated {
193 run_id: uuid::Uuid::now_v7(),
194 workflow_name: "test".to_string(),
195 at: Utc::now(),
196 };
197
198 let future = sub.handle(&event);
199 let _ = future.await;
201 }
202
203 #[test]
204 fn subscriber_name_borrowed_lifetime() {
205 let sub = TestSubscriber {
206 name: "lifetime_test".to_string(),
207 };
208
209 let name1 = sub.name();
210 let name2 = sub.name();
211
212 assert_eq!(name1, name2);
214 assert_eq!(name1, "lifetime_test");
215 }
216}