execution_engine/reactor.rs
1//! Register interest in events and wake futures when the event occurs
2//!
3//!
4//! ## Reactor
5//! The Reactor stores a register of all the events that are currently being listened for.
6//!
7//! This has been designed so that it is completed decoupled from the main Execution Engine logic
8//! meaning that you could extract the Reactor to a separate process entirely.
9//! By separating this to another process you could update/patch the main EE component and the
10//! reactor would store any events that came in and then we could resume jobs despite the core EE
11//! component being swapped out. Of course this will require the interface between the EE and Reactor
12//! remained consistent.
13//!
14//!
15//! ## Events
16//! An event is defined as an external message coming in notifying the EE that something has occurred.
17//! This could be something like:
18//! - The Bot is locked and unable to make progress
19//! - An activity response has come in
20
21use futures::StreamExt;
22use serde_json::from_str;
23use std::collections::HashMap;
24use std::str::from_utf8;
25use std::sync::{Arc, Mutex};
26use std::task::Waker;
27
28use tokio::sync::mpsc::Receiver;
29
30/// A sum type/algebraic data type containing all the different types of Event that could occur
31#[derive(Debug, Clone)]
32pub enum Event {
33 /// Defines the fields needed to execute an activity
34 Activity {
35 node_id: String,
36 activity_id: String,
37 waker: Waker,
38 },
39}
40
41/// The Reactor struct stores event references for the events currently being waited on
42///
43/// This collection is safe to access across thread boundaries as we have wrapped with an [`Arc`](std::sync::Arc)
44/// to satisfy borrowing the value across threads and it is also wrapped in a [`Mutex`](std::sync::Mutex)
45/// to ensure that only one thread can write to it at a time
46///
47/// ---
48/// Safety: We must make sure when a job is prematurely cancelled we drop any events being waited on
49/// We could do this by implementing the drop trait on the [`Job`](crate::workflow::Job)
50pub struct Reactor {
51 /// A dictionary of events where the key is a unique identifier to the event
52 /// and the value contains a `struct` with a mechanism to resume the `future`
53 pub events: Arc<Mutex<HashMap<String, Waker>>>,
54}
55
56impl Reactor {
57 /// Create a new reactor with an empty events register
58 pub fn new() -> Self {
59 Reactor {
60 events: Default::default(),
61 }
62 }
63
64 /// Connect to the external message broker to listen for events and react to them
65 pub async fn run(self, mut internal_rx: Receiver<Event>) -> Result<(), std::io::Error> {
66 let nats_client = nats::connect("127.0.0.1:4222")?;
67 let mut response_sub = nats_client.subscribe("activity.response")?;
68 let event_collection = self.events.clone();
69 let response_collection = self.events.clone();
70 let client_clone = nats_client.clone();
71 let internal_handle = tokio::task::spawn(async move {
72 while let Some(event) = internal_rx.recv().await {
73 let _ = register_event(event_collection.clone(), event, &client_clone).await;
74 }
75 });
76
77 let external_handle = tokio::task::spawn(async move {
78 while let Some(msg) = response_sub.next() {
79 let move_msg = msg;
80 let node_id: String =
81 from_str::<String>(from_utf8(&move_msg.data).expect("Unable to read msg"))
82 .expect("Unable to deserialize to string");
83 let mut inner = response_collection.lock().expect("Locking failed");
84 if let Some(waker) = inner.remove(&node_id) {
85 waker.wake();
86 }
87 }
88 });
89 let _ = (internal_handle.await, external_handle.await);
90 Ok(())
91 }
92}
93
94async fn register_event(
95 event_collection: Arc<Mutex<HashMap<String, Waker>>>,
96 event: Event,
97 nats_client: &nats::Connection,
98) -> Result<(), std::io::Error> {
99 match event {
100 Event::Activity {
101 node_id,
102 activity_id: _activity_id,
103 waker,
104 } => {
105 {
106 let mut inner = event_collection.lock().expect("Locking failed");
107 inner.insert(node_id.clone(), waker.clone());
108 }
109 let _ = nats_client.publish("activity.execute", &node_id)?;
110 }
111 }
112 Ok(())
113}