1use crate::workflows::manager::WorkflowManagerRequest;
5use crate::workflows::WorkflowRequest;
6use futures::future::BoxFuture;
7use futures::stream::FuturesUnordered;
8use futures::{FutureExt, StreamExt};
9use std::collections::{HashMap, HashSet};
10use std::num::Wrapping;
11use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
12use tracing::{info, instrument, warn};
13
14#[derive(Debug)]
16pub enum PublishEventRequest {
17 WorkflowStartedOrStopped(WorkflowStartedOrStoppedEvent),
18 WorkflowManagerEvent(WorkflowManagerEvent),
19}
20
21#[derive(Debug)]
23pub enum SubscriptionRequest {
24 WorkflowStartedOrStopped {
25 channel: UnboundedSender<WorkflowStartedOrStoppedEvent>,
26 },
27
28 WorkflowManagerEvents {
29 channel: UnboundedSender<WorkflowManagerEvent>,
30 },
31}
32
33#[derive(Clone, Debug)]
35pub enum WorkflowStartedOrStoppedEvent {
36 WorkflowStarted {
37 name: String,
38 channel: UnboundedSender<WorkflowRequest>,
39 },
40
41 WorkflowEnded {
42 name: String,
43 },
44}
45
46#[derive(Clone, Debug)]
48pub enum WorkflowManagerEvent {
49 WorkflowManagerRegistered {
50 channel: UnboundedSender<WorkflowManagerRequest>,
51 },
52}
53
54pub fn start_event_hub() -> (
55 UnboundedSender<PublishEventRequest>,
56 UnboundedSender<SubscriptionRequest>,
57) {
58 let (publish_sender, publish_receiver) = unbounded_channel();
59 let (sub_sender, sub_receiver) = unbounded_channel();
60 let actor = Actor::new(publish_receiver, sub_receiver);
61 tokio::spawn(actor.run());
62
63 (publish_sender, sub_sender)
64}
65
66enum FutureResult {
67 AllPublishConsumersGone,
68 AllSubscriptionRequestConsumersGone,
69 NewPublishRequest(PublishEventRequest, UnboundedReceiver<PublishEventRequest>),
70 NewSubscriptionRequest(SubscriptionRequest, UnboundedReceiver<SubscriptionRequest>),
71 WorkflowStartStopSubscriberGone(usize),
72 WorkflowManagerSubscriberGone(usize),
73}
74
75struct Actor {
76 futures: FuturesUnordered<BoxFuture<'static, FutureResult>>,
77 next_subscriber_id: Wrapping<usize>,
78 active_subscriber_ids: HashSet<usize>,
79 workflow_start_stop_subscribers: HashMap<usize, UnboundedSender<WorkflowStartedOrStoppedEvent>>,
80 workflow_manager_subscribers: HashMap<usize, UnboundedSender<WorkflowManagerEvent>>,
81 new_subscribers_can_join: bool,
82 active_workflows: HashMap<String, UnboundedSender<WorkflowRequest>>,
83 active_workflow_manager: Option<UnboundedSender<WorkflowManagerRequest>>,
84}
85
86impl Actor {
87 fn new(
88 publish_receiver: UnboundedReceiver<PublishEventRequest>,
89 subscribe_receiver: UnboundedReceiver<SubscriptionRequest>,
90 ) -> Self {
91 let futures = FuturesUnordered::new();
92 futures.push(wait_for_publish_request(publish_receiver).boxed());
93 futures.push(wait_for_subscription_request(subscribe_receiver).boxed());
94
95 Actor {
96 futures,
97 next_subscriber_id: Wrapping(0),
98 active_subscriber_ids: HashSet::new(),
99 workflow_start_stop_subscribers: HashMap::new(),
100 workflow_manager_subscribers: HashMap::new(),
101 new_subscribers_can_join: true,
102 active_workflows: HashMap::new(),
103 active_workflow_manager: None,
104 }
105 }
106
107 #[instrument(name = "Event Hub Execution", skip(self))]
108 async fn run(mut self) {
109 info!("Starting event hub");
110
111 while let Some(result) = self.futures.next().await {
112 match result {
113 FutureResult::AllPublishConsumersGone => {
114 info!("All publish request consumers are gone. No new events can come in");
115 break;
116 }
117
118 FutureResult::AllSubscriptionRequestConsumersGone => {
119 warn!("All subscription request consumers gone. No new subscribers can join");
120
121 self.new_subscribers_can_join = false;
125 }
126
127 FutureResult::WorkflowStartStopSubscriberGone(id) => {
128 self.active_subscriber_ids.remove(&id);
129 self.workflow_start_stop_subscribers.remove(&id);
130 }
131
132 FutureResult::WorkflowManagerSubscriberGone(id) => {
133 self.active_subscriber_ids.remove(&id);
134 self.workflow_manager_subscribers.remove(&id);
135 }
136
137 FutureResult::NewPublishRequest(request, receiver) => {
138 self.futures
139 .push(wait_for_publish_request(receiver).boxed());
140 self.handle_publish_request(request);
141 }
142
143 FutureResult::NewSubscriptionRequest(request, receiver) => {
144 self.futures
145 .push(wait_for_subscription_request(receiver).boxed());
146 self.handle_subscription_request(request);
147 }
148 }
149
150 if !self.new_subscribers_can_join && self.total_subscriber_count() == 0 {
151 info!("All subscribers are gone and no new subscribers can join. Closing");
152 break;
153 }
154 }
155
156 info!("Closing event hub");
157 }
158
159 fn handle_publish_request(&mut self, request: PublishEventRequest) {
160 match request {
161 PublishEventRequest::WorkflowStartedOrStopped(event) => {
162 for subscriber in self.workflow_start_stop_subscribers.values() {
163 let _ = subscriber.send(event.clone());
164 }
165
166 match event {
169 WorkflowStartedOrStoppedEvent::WorkflowStarted { name, channel } => {
170 self.active_workflows.insert(name, channel);
171 }
172
173 WorkflowStartedOrStoppedEvent::WorkflowEnded { name } => {
174 self.active_workflows.remove(&name);
175 }
176 }
177 }
178
179 PublishEventRequest::WorkflowManagerEvent(event) => {
180 for subscriber in self.workflow_manager_subscribers.values() {
181 let _ = subscriber.send(event.clone());
182 }
183
184 match event {
185 WorkflowManagerEvent::WorkflowManagerRegistered { channel } => {
186 self.active_workflow_manager = Some(channel);
187 }
188 }
189 }
190 }
191 }
192
193 fn handle_subscription_request(&mut self, request: SubscriptionRequest) {
194 let id = self.next_subscriber_id;
195 self.active_subscriber_ids.insert(id.0);
196
197 loop {
198 self.next_subscriber_id += Wrapping(1);
199 if !self
200 .active_subscriber_ids
201 .contains(&self.next_subscriber_id.0)
202 {
203 break;
204 }
205 }
206
207 match request {
208 SubscriptionRequest::WorkflowStartedOrStopped { channel } => {
209 for (name, workflow_channel) in &self.active_workflows {
210 let _ = channel.send(WorkflowStartedOrStoppedEvent::WorkflowStarted {
211 name: name.to_string(),
212 channel: workflow_channel.clone(),
213 });
214 }
215
216 self.workflow_start_stop_subscribers
217 .insert(id.0, channel.clone());
218 self.futures
219 .push(notify_workflow_start_stop_subscriber_gone(id.0, channel).boxed());
220 }
221
222 SubscriptionRequest::WorkflowManagerEvents { channel } => {
223 if let Some(sender) = &self.active_workflow_manager {
224 let _ = channel.send(WorkflowManagerEvent::WorkflowManagerRegistered {
225 channel: sender.clone(),
226 });
227 }
228
229 self.workflow_manager_subscribers
230 .insert(id.0, channel.clone());
231 self.futures
232 .push(notify_workflow_manager_subscriber_gone(id.0, channel).boxed());
233 }
234 }
235 }
236
237 fn total_subscriber_count(&self) -> usize {
238 self.workflow_start_stop_subscribers.len()
239 }
240}
241
242async fn wait_for_publish_request(
243 mut receiver: UnboundedReceiver<PublishEventRequest>,
244) -> FutureResult {
245 match receiver.recv().await {
246 Some(request) => FutureResult::NewPublishRequest(request, receiver),
247 None => FutureResult::AllPublishConsumersGone,
248 }
249}
250
251async fn wait_for_subscription_request(
252 mut receiver: UnboundedReceiver<SubscriptionRequest>,
253) -> FutureResult {
254 match receiver.recv().await {
255 Some(request) => FutureResult::NewSubscriptionRequest(request, receiver),
256 None => FutureResult::AllSubscriptionRequestConsumersGone,
257 }
258}
259
260async fn notify_workflow_start_stop_subscriber_gone(
261 id: usize,
262 sender: UnboundedSender<WorkflowStartedOrStoppedEvent>,
263) -> FutureResult {
264 sender.closed().await;
265 FutureResult::WorkflowStartStopSubscriberGone(id)
266}
267
268async fn notify_workflow_manager_subscriber_gone(
269 id: usize,
270 sender: UnboundedSender<WorkflowManagerEvent>,
271) -> FutureResult {
272 sender.closed().await;
273 FutureResult::WorkflowManagerSubscriberGone(id)
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279 use crate::test_utils;
280 use std::time::Duration;
281
282 #[tokio::test]
283 async fn can_receive_workflow_started_notifications() {
284 let (publish_channel, subscribe_channel) = start_event_hub();
285 let (subscriber_sender, mut subscriber_receiver) = unbounded_channel();
286 let (workflow_sender, _workflow_receiver) = unbounded_channel();
287
288 subscribe_channel
289 .send(SubscriptionRequest::WorkflowStartedOrStopped {
290 channel: subscriber_sender,
291 })
292 .expect("Failed to subscribe to workflow start/stop events");
293
294 tokio::time::sleep(Duration::from_millis(10)).await;
295
296 publish_channel
297 .send(PublishEventRequest::WorkflowStartedOrStopped(
298 WorkflowStartedOrStoppedEvent::WorkflowStarted {
299 name: "test".to_string(),
300 channel: workflow_sender,
301 },
302 ))
303 .expect("Failed to publish workflow started event");
304
305 let response = test_utils::expect_mpsc_response(&mut subscriber_receiver).await;
306 match response {
307 WorkflowStartedOrStoppedEvent::WorkflowStarted { name, channel: _ } => {
308 assert_eq!(&name, "test", "Unexpected workflow name");
309 }
310
311 event => panic!("Unexpected event received: {:?}", event),
312 }
313 }
314
315 #[tokio::test]
316 async fn can_receive_workflow_started_notification_when_subscribed_after_published() {
317 let (publish_channel, subscribe_channel) = start_event_hub();
318 let (subscriber_sender, mut subscriber_receiver) = unbounded_channel();
319 let (workflow_sender, _workflow_receiver) = unbounded_channel();
320
321 publish_channel
322 .send(PublishEventRequest::WorkflowStartedOrStopped(
323 WorkflowStartedOrStoppedEvent::WorkflowStarted {
324 name: "test".to_string(),
325 channel: workflow_sender,
326 },
327 ))
328 .expect("Failed to publish workflow started event");
329
330 tokio::time::sleep(Duration::from_millis(10)).await;
331
332 subscribe_channel
333 .send(SubscriptionRequest::WorkflowStartedOrStopped {
334 channel: subscriber_sender,
335 })
336 .expect("Failed to subscribe to workflow start/stop events");
337
338 let response = test_utils::expect_mpsc_response(&mut subscriber_receiver).await;
339 match response {
340 WorkflowStartedOrStoppedEvent::WorkflowStarted { name, channel: _ } => {
341 assert_eq!(&name, "test", "Unexpected workflow name");
342 }
343
344 event => panic!("Unexpected event received: {:?}", event),
345 }
346 }
347
348 #[tokio::test]
349 async fn can_receive_workflow_stopped_notifications() {
350 let (publish_channel, subscribe_channel) = start_event_hub();
351 let (subscriber_sender, mut subscriber_receiver) = unbounded_channel();
352
353 subscribe_channel
354 .send(SubscriptionRequest::WorkflowStartedOrStopped {
355 channel: subscriber_sender,
356 })
357 .expect("Failed to subscribe to workflow start/stop events");
358
359 tokio::time::sleep(Duration::from_millis(10)).await;
360
361 publish_channel
362 .send(PublishEventRequest::WorkflowStartedOrStopped(
363 WorkflowStartedOrStoppedEvent::WorkflowEnded {
364 name: "test".to_string(),
365 },
366 ))
367 .expect("Failed to publish workflow ended event");
368
369 let response = test_utils::expect_mpsc_response(&mut subscriber_receiver).await;
370 match response {
371 WorkflowStartedOrStoppedEvent::WorkflowEnded { name } => {
372 assert_eq!(&name, "test", "Unexpected workflow name");
373 }
374
375 event => panic!("Unexpected event received: {:?}", event),
376 }
377 }
378
379 #[tokio::test]
380 async fn no_events_when_workflow_started_and_stopped_prior_to_subscription() {
381 let (publish_channel, subscribe_channel) = start_event_hub();
382 let (subscriber_sender, mut subscriber_receiver) = unbounded_channel();
383 let (workflow_sender, _workflow_receiver) = unbounded_channel();
384
385 publish_channel
386 .send(PublishEventRequest::WorkflowStartedOrStopped(
387 WorkflowStartedOrStoppedEvent::WorkflowStarted {
388 name: "test".to_string(),
389 channel: workflow_sender,
390 },
391 ))
392 .expect("Failed to publish workflow started event");
393
394 publish_channel
395 .send(PublishEventRequest::WorkflowStartedOrStopped(
396 WorkflowStartedOrStoppedEvent::WorkflowEnded {
397 name: "test".to_string(),
398 },
399 ))
400 .expect("Failed to publish workflow ended event");
401
402 tokio::time::sleep(Duration::from_millis(10)).await;
403
404 subscribe_channel
405 .send(SubscriptionRequest::WorkflowStartedOrStopped {
406 channel: subscriber_sender,
407 })
408 .expect("Failed to subscribe to workflow start/stop events");
409
410 test_utils::expect_mpsc_timeout(&mut subscriber_receiver).await;
411 }
412
413 #[tokio::test]
414 async fn can_receive_workflow_manager_registered_event() {
415 let (publish_channel, subscribe_channel) = start_event_hub();
416 let (subscriber_sender, mut subscriber_receiver) = unbounded_channel();
417 let (manager_sender, _manager_receiver) = unbounded_channel();
418
419 subscribe_channel
420 .send(SubscriptionRequest::WorkflowManagerEvents {
421 channel: subscriber_sender,
422 })
423 .expect("Failed to send subscription request");
424
425 tokio::time::sleep(Duration::from_millis(10)).await;
426
427 publish_channel
428 .send(PublishEventRequest::WorkflowManagerEvent(
429 WorkflowManagerEvent::WorkflowManagerRegistered {
430 channel: manager_sender,
431 },
432 ))
433 .expect("Failed to send publish request");
434
435 let response = test_utils::expect_mpsc_response(&mut subscriber_receiver).await;
436 match response {
437 WorkflowManagerEvent::WorkflowManagerRegistered { channel: _ } => (),
438 }
439 }
440}