mmids_core/
event_hub.rs

1//! The event hub is a central actor that receives events from all type of mmids subsystems and
2//! allows them to be published to interested subscribers.
3
4use crate::workflows::manager::WorkflowManagerRequest;
5use crate::workflows::WorkflowRequest;
6use futures::future::BoxFuture;
7use futures::stream::FuturesUnordered;
8use futures::{FutureExt, StreamExt};
9use std::collections::{HashMap, HashSet};
10use std::num::Wrapping;
11use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
12use tracing::{info, instrument, warn};
13
14/// A request to publish a notification to the event hub
15#[derive(Debug)]
16pub enum PublishEventRequest {
17    WorkflowStartedOrStopped(WorkflowStartedOrStoppedEvent),
18    WorkflowManagerEvent(WorkflowManagerEvent),
19}
20
21/// A request to subscribe to a category of events
22#[derive(Debug)]
23pub enum SubscriptionRequest {
24    WorkflowStartedOrStopped {
25        channel: UnboundedSender<WorkflowStartedOrStoppedEvent>,
26    },
27
28    WorkflowManagerEvents {
29        channel: UnboundedSender<WorkflowManagerEvent>,
30    },
31}
32
33/// Events relating to workflows being started or stopped
34#[derive(Clone, Debug)]
35pub enum WorkflowStartedOrStoppedEvent {
36    WorkflowStarted {
37        name: String,
38        channel: UnboundedSender<WorkflowRequest>,
39    },
40
41    WorkflowEnded {
42        name: String,
43    },
44}
45
46// Events relating to workflow managers
47#[derive(Clone, Debug)]
48pub enum WorkflowManagerEvent {
49    WorkflowManagerRegistered {
50        channel: UnboundedSender<WorkflowManagerRequest>,
51    },
52}
53
54pub fn start_event_hub() -> (
55    UnboundedSender<PublishEventRequest>,
56    UnboundedSender<SubscriptionRequest>,
57) {
58    let (publish_sender, publish_receiver) = unbounded_channel();
59    let (sub_sender, sub_receiver) = unbounded_channel();
60    let actor = Actor::new(publish_receiver, sub_receiver);
61    tokio::spawn(actor.run());
62
63    (publish_sender, sub_sender)
64}
65
66enum FutureResult {
67    AllPublishConsumersGone,
68    AllSubscriptionRequestConsumersGone,
69    NewPublishRequest(PublishEventRequest, UnboundedReceiver<PublishEventRequest>),
70    NewSubscriptionRequest(SubscriptionRequest, UnboundedReceiver<SubscriptionRequest>),
71    WorkflowStartStopSubscriberGone(usize),
72    WorkflowManagerSubscriberGone(usize),
73}
74
75struct Actor {
76    futures: FuturesUnordered<BoxFuture<'static, FutureResult>>,
77    next_subscriber_id: Wrapping<usize>,
78    active_subscriber_ids: HashSet<usize>,
79    workflow_start_stop_subscribers: HashMap<usize, UnboundedSender<WorkflowStartedOrStoppedEvent>>,
80    workflow_manager_subscribers: HashMap<usize, UnboundedSender<WorkflowManagerEvent>>,
81    new_subscribers_can_join: bool,
82    active_workflows: HashMap<String, UnboundedSender<WorkflowRequest>>,
83    active_workflow_manager: Option<UnboundedSender<WorkflowManagerRequest>>,
84}
85
86impl Actor {
87    fn new(
88        publish_receiver: UnboundedReceiver<PublishEventRequest>,
89        subscribe_receiver: UnboundedReceiver<SubscriptionRequest>,
90    ) -> Self {
91        let futures = FuturesUnordered::new();
92        futures.push(wait_for_publish_request(publish_receiver).boxed());
93        futures.push(wait_for_subscription_request(subscribe_receiver).boxed());
94
95        Actor {
96            futures,
97            next_subscriber_id: Wrapping(0),
98            active_subscriber_ids: HashSet::new(),
99            workflow_start_stop_subscribers: HashMap::new(),
100            workflow_manager_subscribers: HashMap::new(),
101            new_subscribers_can_join: true,
102            active_workflows: HashMap::new(),
103            active_workflow_manager: None,
104        }
105    }
106
107    #[instrument(name = "Event Hub Execution", skip(self))]
108    async fn run(mut self) {
109        info!("Starting event hub");
110
111        while let Some(result) = self.futures.next().await {
112            match result {
113                FutureResult::AllPublishConsumersGone => {
114                    info!("All publish request consumers are gone.  No new events can come in");
115                    break;
116                }
117
118                FutureResult::AllSubscriptionRequestConsumersGone => {
119                    warn!("All subscription request consumers gone.  No new subscribers can join");
120
121                    // Theoretically this should only happen when everything is shutting down.  I
122                    // guess technically we might still have valid subscribers to send new events to
123                    // still so we don't have to shut this down until all subscribers are gone
124                    self.new_subscribers_can_join = false;
125                }
126
127                FutureResult::WorkflowStartStopSubscriberGone(id) => {
128                    self.active_subscriber_ids.remove(&id);
129                    self.workflow_start_stop_subscribers.remove(&id);
130                }
131
132                FutureResult::WorkflowManagerSubscriberGone(id) => {
133                    self.active_subscriber_ids.remove(&id);
134                    self.workflow_manager_subscribers.remove(&id);
135                }
136
137                FutureResult::NewPublishRequest(request, receiver) => {
138                    self.futures
139                        .push(wait_for_publish_request(receiver).boxed());
140                    self.handle_publish_request(request);
141                }
142
143                FutureResult::NewSubscriptionRequest(request, receiver) => {
144                    self.futures
145                        .push(wait_for_subscription_request(receiver).boxed());
146                    self.handle_subscription_request(request);
147                }
148            }
149
150            if !self.new_subscribers_can_join && self.total_subscriber_count() == 0 {
151                info!("All subscribers are gone and no new subscribers can join.  Closing");
152                break;
153            }
154        }
155
156        info!("Closing event hub");
157    }
158
159    fn handle_publish_request(&mut self, request: PublishEventRequest) {
160        match request {
161            PublishEventRequest::WorkflowStartedOrStopped(event) => {
162                for subscriber in self.workflow_start_stop_subscribers.values() {
163                    let _ = subscriber.send(event.clone());
164                }
165
166                // We want to maintain a list of active workflows, so if a subscriber joins after
167                // we receive the notification of a workflow starting they don't miss that event.
168                match event {
169                    WorkflowStartedOrStoppedEvent::WorkflowStarted { name, channel } => {
170                        self.active_workflows.insert(name, channel);
171                    }
172
173                    WorkflowStartedOrStoppedEvent::WorkflowEnded { name } => {
174                        self.active_workflows.remove(&name);
175                    }
176                }
177            }
178
179            PublishEventRequest::WorkflowManagerEvent(event) => {
180                for subscriber in self.workflow_manager_subscribers.values() {
181                    let _ = subscriber.send(event.clone());
182                }
183
184                match event {
185                    WorkflowManagerEvent::WorkflowManagerRegistered { channel } => {
186                        self.active_workflow_manager = Some(channel);
187                    }
188                }
189            }
190        }
191    }
192
193    fn handle_subscription_request(&mut self, request: SubscriptionRequest) {
194        let id = self.next_subscriber_id;
195        self.active_subscriber_ids.insert(id.0);
196
197        loop {
198            self.next_subscriber_id += Wrapping(1);
199            if !self
200                .active_subscriber_ids
201                .contains(&self.next_subscriber_id.0)
202            {
203                break;
204            }
205        }
206
207        match request {
208            SubscriptionRequest::WorkflowStartedOrStopped { channel } => {
209                for (name, workflow_channel) in &self.active_workflows {
210                    let _ = channel.send(WorkflowStartedOrStoppedEvent::WorkflowStarted {
211                        name: name.to_string(),
212                        channel: workflow_channel.clone(),
213                    });
214                }
215
216                self.workflow_start_stop_subscribers
217                    .insert(id.0, channel.clone());
218                self.futures
219                    .push(notify_workflow_start_stop_subscriber_gone(id.0, channel).boxed());
220            }
221
222            SubscriptionRequest::WorkflowManagerEvents { channel } => {
223                if let Some(sender) = &self.active_workflow_manager {
224                    let _ = channel.send(WorkflowManagerEvent::WorkflowManagerRegistered {
225                        channel: sender.clone(),
226                    });
227                }
228
229                self.workflow_manager_subscribers
230                    .insert(id.0, channel.clone());
231                self.futures
232                    .push(notify_workflow_manager_subscriber_gone(id.0, channel).boxed());
233            }
234        }
235    }
236
237    fn total_subscriber_count(&self) -> usize {
238        self.workflow_start_stop_subscribers.len()
239    }
240}
241
242async fn wait_for_publish_request(
243    mut receiver: UnboundedReceiver<PublishEventRequest>,
244) -> FutureResult {
245    match receiver.recv().await {
246        Some(request) => FutureResult::NewPublishRequest(request, receiver),
247        None => FutureResult::AllPublishConsumersGone,
248    }
249}
250
251async fn wait_for_subscription_request(
252    mut receiver: UnboundedReceiver<SubscriptionRequest>,
253) -> FutureResult {
254    match receiver.recv().await {
255        Some(request) => FutureResult::NewSubscriptionRequest(request, receiver),
256        None => FutureResult::AllSubscriptionRequestConsumersGone,
257    }
258}
259
260async fn notify_workflow_start_stop_subscriber_gone(
261    id: usize,
262    sender: UnboundedSender<WorkflowStartedOrStoppedEvent>,
263) -> FutureResult {
264    sender.closed().await;
265    FutureResult::WorkflowStartStopSubscriberGone(id)
266}
267
268async fn notify_workflow_manager_subscriber_gone(
269    id: usize,
270    sender: UnboundedSender<WorkflowManagerEvent>,
271) -> FutureResult {
272    sender.closed().await;
273    FutureResult::WorkflowManagerSubscriberGone(id)
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279    use crate::test_utils;
280    use std::time::Duration;
281
282    #[tokio::test]
283    async fn can_receive_workflow_started_notifications() {
284        let (publish_channel, subscribe_channel) = start_event_hub();
285        let (subscriber_sender, mut subscriber_receiver) = unbounded_channel();
286        let (workflow_sender, _workflow_receiver) = unbounded_channel();
287
288        subscribe_channel
289            .send(SubscriptionRequest::WorkflowStartedOrStopped {
290                channel: subscriber_sender,
291            })
292            .expect("Failed to subscribe to workflow start/stop events");
293
294        tokio::time::sleep(Duration::from_millis(10)).await;
295
296        publish_channel
297            .send(PublishEventRequest::WorkflowStartedOrStopped(
298                WorkflowStartedOrStoppedEvent::WorkflowStarted {
299                    name: "test".to_string(),
300                    channel: workflow_sender,
301                },
302            ))
303            .expect("Failed to publish workflow started event");
304
305        let response = test_utils::expect_mpsc_response(&mut subscriber_receiver).await;
306        match response {
307            WorkflowStartedOrStoppedEvent::WorkflowStarted { name, channel: _ } => {
308                assert_eq!(&name, "test", "Unexpected workflow name");
309            }
310
311            event => panic!("Unexpected event received: {:?}", event),
312        }
313    }
314
315    #[tokio::test]
316    async fn can_receive_workflow_started_notification_when_subscribed_after_published() {
317        let (publish_channel, subscribe_channel) = start_event_hub();
318        let (subscriber_sender, mut subscriber_receiver) = unbounded_channel();
319        let (workflow_sender, _workflow_receiver) = unbounded_channel();
320
321        publish_channel
322            .send(PublishEventRequest::WorkflowStartedOrStopped(
323                WorkflowStartedOrStoppedEvent::WorkflowStarted {
324                    name: "test".to_string(),
325                    channel: workflow_sender,
326                },
327            ))
328            .expect("Failed to publish workflow started event");
329
330        tokio::time::sleep(Duration::from_millis(10)).await;
331
332        subscribe_channel
333            .send(SubscriptionRequest::WorkflowStartedOrStopped {
334                channel: subscriber_sender,
335            })
336            .expect("Failed to subscribe to workflow start/stop events");
337
338        let response = test_utils::expect_mpsc_response(&mut subscriber_receiver).await;
339        match response {
340            WorkflowStartedOrStoppedEvent::WorkflowStarted { name, channel: _ } => {
341                assert_eq!(&name, "test", "Unexpected workflow name");
342            }
343
344            event => panic!("Unexpected event received: {:?}", event),
345        }
346    }
347
348    #[tokio::test]
349    async fn can_receive_workflow_stopped_notifications() {
350        let (publish_channel, subscribe_channel) = start_event_hub();
351        let (subscriber_sender, mut subscriber_receiver) = unbounded_channel();
352
353        subscribe_channel
354            .send(SubscriptionRequest::WorkflowStartedOrStopped {
355                channel: subscriber_sender,
356            })
357            .expect("Failed to subscribe to workflow start/stop events");
358
359        tokio::time::sleep(Duration::from_millis(10)).await;
360
361        publish_channel
362            .send(PublishEventRequest::WorkflowStartedOrStopped(
363                WorkflowStartedOrStoppedEvent::WorkflowEnded {
364                    name: "test".to_string(),
365                },
366            ))
367            .expect("Failed to publish workflow ended event");
368
369        let response = test_utils::expect_mpsc_response(&mut subscriber_receiver).await;
370        match response {
371            WorkflowStartedOrStoppedEvent::WorkflowEnded { name } => {
372                assert_eq!(&name, "test", "Unexpected workflow name");
373            }
374
375            event => panic!("Unexpected event received: {:?}", event),
376        }
377    }
378
379    #[tokio::test]
380    async fn no_events_when_workflow_started_and_stopped_prior_to_subscription() {
381        let (publish_channel, subscribe_channel) = start_event_hub();
382        let (subscriber_sender, mut subscriber_receiver) = unbounded_channel();
383        let (workflow_sender, _workflow_receiver) = unbounded_channel();
384
385        publish_channel
386            .send(PublishEventRequest::WorkflowStartedOrStopped(
387                WorkflowStartedOrStoppedEvent::WorkflowStarted {
388                    name: "test".to_string(),
389                    channel: workflow_sender,
390                },
391            ))
392            .expect("Failed to publish workflow started event");
393
394        publish_channel
395            .send(PublishEventRequest::WorkflowStartedOrStopped(
396                WorkflowStartedOrStoppedEvent::WorkflowEnded {
397                    name: "test".to_string(),
398                },
399            ))
400            .expect("Failed to publish workflow ended event");
401
402        tokio::time::sleep(Duration::from_millis(10)).await;
403
404        subscribe_channel
405            .send(SubscriptionRequest::WorkflowStartedOrStopped {
406                channel: subscriber_sender,
407            })
408            .expect("Failed to subscribe to workflow start/stop events");
409
410        test_utils::expect_mpsc_timeout(&mut subscriber_receiver).await;
411    }
412
413    #[tokio::test]
414    async fn can_receive_workflow_manager_registered_event() {
415        let (publish_channel, subscribe_channel) = start_event_hub();
416        let (subscriber_sender, mut subscriber_receiver) = unbounded_channel();
417        let (manager_sender, _manager_receiver) = unbounded_channel();
418
419        subscribe_channel
420            .send(SubscriptionRequest::WorkflowManagerEvents {
421                channel: subscriber_sender,
422            })
423            .expect("Failed to send subscription request");
424
425        tokio::time::sleep(Duration::from_millis(10)).await;
426
427        publish_channel
428            .send(PublishEventRequest::WorkflowManagerEvent(
429                WorkflowManagerEvent::WorkflowManagerRegistered {
430                    channel: manager_sender,
431                },
432            ))
433            .expect("Failed to send publish request");
434
435        let response = test_utils::expect_mpsc_response(&mut subscriber_receiver).await;
436        match response {
437            WorkflowManagerEvent::WorkflowManagerRegistered { channel: _ } => (),
438        }
439    }
440}