execution_engine/lib.rs
1//! Execution Engine MkII
2//!
3//! ---
4//!
5//! The EE is split into 2 main parts
6//! 1. [Executor](crate::executor) - This is responsible for driving the workflows to completion and should contain
7//! all the objects required for each workflow to be executed, think of this as a runtime.
8//! 2. [Reactor](crate::reactor) - The reactor is responsible for notifying the executor when a future can make
9//! progress, this is done via the Waker API.
10//!
11//! When a workflow is sent to the EE, the flow is as follows:
12//! 1. Spawn a new task which will perform all of the work associated with executing a wf to
13//! completion
14//! 2. Deserialize the workflow into a Job, the Job type should describe the entity as accurately
15//! as possible
16//! 3. Drive the workflow forward, this uses an event based stream to do so
17//!
18//! When a workflow reaches a point where it cannot make progress (e.g. waiting for Bots to Lock or
19//! waiting for an Activity to complete) it should yield execution using the underlying mechanics
20//! of Rust's async/await.
21//!
22//!
23pub(crate) use std::io;
24
25pub(crate) use tokio::task;
26
27use crate::reactor::{Event, Reactor};
28pub mod executor;
29pub mod node;
30pub mod reactor;
31pub mod spawner;
32pub mod workflow;
33
34/// Sets up the runtime and starts the message broker listeners
35///
36/// The `start` function creates the structures used by the Runtime and starts the Reactor, Spawner and MsgBroker consumers.
37///
38/// 1. The [Reactor](crate::reactor) - This includes the channel to send messages to the reactor and
39/// the Reactor struct itself. The Reactor is then run in a separate task and we `await` on the task's `Handle`.
40/// 2. The [Spawner](crate::spawner) - First, a channel to communicate to the Spawner is created, then
41/// a new Spawner struct is created, passing in the `Receiver` for the channel. The Spawner is run in
42/// a background task and we `await` on the task's `Handle`.
43/// 3. The [NATs](https://docs.nats.io/) consumer - This creates a NATs consumer listening for messages
44/// on the `jobs.execute` and `jobs.cancel` topics.
45pub async fn start() -> Result<(), io::Error> {
46 // Create reactor channel
47 let (reactor_tx, reactor_rx) = tokio::sync::mpsc::channel::<Event>(20);
48 // Create Reactor
49 let reactor = Reactor::new();
50 let reactor_handle = task::spawn(async move {
51 let _ = reactor.run(reactor_rx).await;
52 });
53 // create spawner channel
54 let (spawn_tx, spawn_rx) = spawner::spawner_channel();
55 // Create Spawner
56 let mut spawner = spawner::Spawner::new(reactor_tx.clone(), spawn_rx);
57 let spawner_handle = task::spawn(async move {
58 spawner
59 .run()
60 .await
61 .expect("Something went critically wrong");
62 });
63
64 /*
65
66 let loop_no = 10;
67 let start = std::time::Instant::now();
68
69 for _ in 0..loop_no {
70 let _ = spawn_tx.send(spawner::execute_msg("workflow.json")).await;
71 }
72 println!(
73 "{:#?}------------------#################",
74 start.elapsed() / loop_no / 3
75 );
76 println!(
77 "{:#?}------------------#################",
78 start.elapsed() / loop_no
79 );
80 println!("{:#?}------------------#################", start.elapsed());
81 */
82
83 // Connect to NATs server
84 let server = nats::connect("127.0.0.1:4222")?;
85 // Set up subscribers
86 let mut _execution_subscriber = server.subscribe("jobs.execute")?;
87 let mut _cancellation_subscriber = server.subscribe("jobs.cancel")?;
88
89 // let _ = spawn_tx.send(spawner::execute_msg(&uuid::Uuid::new_v4().to_string())).await;
90 // tokio::time::sleep(Duration::from_secs(10)).await;
91 // let _ = spawn_tx.send(spawner::execute_msg(&uuid::Uuid::new_v4().to_string())).await;
92 for _ in 0..1 {
93 //tokio::time::sleep(Duration::from_millis(1000)).await;
94 println!("Spawned");
95 spawn_tx
96 .send(spawner::execute_msg(&uuid::Uuid::new_v4().to_string()))
97 .await;
98 }
99
100 //spawn_tx.send(spawner::cancel_msg("workflow.json")).await;
101 // Await the handles to reactor and spawner to make sure all tasks run to completion
102 let _ = (reactor_handle.await, spawner_handle.await);
103 Ok(())
104}