execution_engine/
reactor.rs

1//! Register interest in events and wake futures when the event occurs
2//!
3//!
4//! ## Reactor
5//! The Reactor stores a register of all the events that are currently being listened for.
6//!
7//! This has been designed so that it is completed decoupled from the main Execution Engine logic
8//! meaning that you could extract the Reactor to a separate process entirely.
9//! By separating this to another process you could update/patch the main EE component and the
10//! reactor would store any events that came in and then we could resume jobs despite the core EE
11//! component being swapped out. Of course this will require the interface between the EE and Reactor
12//! remained consistent.
13//!
14//!
15//! ## Events
16//! An event is defined as an external message coming in notifying the EE that something has occurred.
17//! This could be something like:
18//! - The Bot is locked and unable to make progress
19//! - An activity response has come in
20
21use futures::StreamExt;
22use serde_json::from_str;
23use std::collections::HashMap;
24use std::str::from_utf8;
25use std::sync::{Arc, Mutex};
26use std::task::Waker;
27
28use tokio::sync::mpsc::Receiver;
29
30/// A sum type/algebraic data type containing all the different types of Event that could occur
31#[derive(Debug, Clone)]
32pub enum Event {
33    /// Defines the fields needed to execute an activity
34    Activity {
35        node_id: String,
36        activity_id: String,
37        waker: Waker,
38    },
39}
40
41/// The Reactor struct stores event references for the events currently being waited on
42///
43/// This collection is safe to access across thread boundaries as we have wrapped with an [`Arc`](std::sync::Arc)
44/// to satisfy borrowing the value across threads and it is also wrapped in a [`Mutex`](std::sync::Mutex)
45/// to ensure that only one thread can write to it at a time
46///
47/// ---
48/// Safety: We must make sure when a job is prematurely cancelled we drop any events being waited on
49/// We could do this by implementing the drop trait on the [`Job`](crate::workflow::Job)
50pub struct Reactor {
51    /// A dictionary of events where the key is a unique identifier to the event
52    /// and the value contains a `struct` with a mechanism to resume the `future`
53    pub events: Arc<Mutex<HashMap<String, Waker>>>,
54}
55
56impl Reactor {
57    /// Create a new reactor with an empty events register
58    pub fn new() -> Self {
59        Reactor {
60            events: Default::default(),
61        }
62    }
63
64    /// Connect to the external message broker to listen for events and react to them
65    pub async fn run(self, mut internal_rx: Receiver<Event>) -> Result<(), std::io::Error> {
66        let nats_client = nats::connect("127.0.0.1:4222")?;
67        let mut response_sub = nats_client.subscribe("activity.response")?;
68        let event_collection = self.events.clone();
69        let response_collection = self.events.clone();
70        let client_clone = nats_client.clone();
71        let internal_handle = tokio::task::spawn(async move {
72            while let Some(event) = internal_rx.recv().await {
73                let _ = register_event(event_collection.clone(), event, &client_clone).await;
74            }
75        });
76
77        let external_handle = tokio::task::spawn(async move {
78            while let Some(msg) = response_sub.next() {
79                let move_msg = msg;
80                let node_id: String =
81                    from_str::<String>(from_utf8(&move_msg.data).expect("Unable to read msg"))
82                        .expect("Unable to deserialize to string");
83                let mut inner = response_collection.lock().expect("Locking failed");
84                if let Some(waker) = inner.remove(&node_id) {
85                    waker.wake();
86                }
87            }
88        });
89        let _ = (internal_handle.await, external_handle.await);
90        Ok(())
91    }
92}
93
94async fn register_event(
95    event_collection: Arc<Mutex<HashMap<String, Waker>>>,
96    event: Event,
97    nats_client: &nats::Connection,
98) -> Result<(), std::io::Error> {
99    match event {
100        Event::Activity {
101            node_id,
102            activity_id: _activity_id,
103            waker,
104        } => {
105            {
106                let mut inner = event_collection.lock().expect("Locking failed");
107                inner.insert(node_id.clone(), waker.clone());
108            }
109            let _ = nats_client.publish("activity.execute", &node_id)?;
110        }
111    }
112    Ok(())
113}