execution_engine/
lib.rs

1//! Execution Engine MkII
2//!
3//! ---
4//!
5//! The EE is split into 2 main parts
6//! 1. [Executor](crate::executor) - This is responsible for driving the workflows to completion and should contain
7//!    all the objects required for each workflow to be executed, think of this as a runtime.
8//! 2. [Reactor](crate::reactor) - The reactor is responsible for notifying the executor when a future can make
9//!    progress, this is done via the Waker API.
10//!
11//! When a workflow is sent to the EE, the flow is as follows:
12//! 1. Spawn a new task which will perform all of the work associated with executing a wf to
13//! completion
14//! 2. Deserialize the workflow into a Job, the Job type should describe the entity as accurately
15//! as possible
16//! 3. Drive the workflow forward, this uses an event based stream to do so
17//!
18//! When a workflow reaches a point where it cannot make progress (e.g. waiting for Bots to Lock or
19//! waiting for an Activity to complete) it should yield execution using the underlying mechanics
20//! of Rust's async/await.
21//!
22//!
23pub(crate) use std::io;
24
25pub(crate) use tokio::task;
26
27use crate::reactor::{Event, Reactor};
28pub mod executor;
29pub mod node;
30pub mod reactor;
31pub mod spawner;
32pub mod workflow;
33
34/// Sets up the runtime and starts the message broker listeners
35///
36/// The `start` function creates the structures used by the Runtime and starts the Reactor, Spawner and MsgBroker consumers.
37///
38/// 1. The [Reactor](crate::reactor) - This includes the channel to send messages to the reactor and
39/// the Reactor struct itself. The Reactor is then run in a separate task and we `await` on the task's `Handle`.
40/// 2. The [Spawner](crate::spawner) - First, a channel to communicate to the Spawner is created, then
41/// a new Spawner struct is created, passing in the `Receiver` for the channel. The Spawner is run in
42/// a background task and we `await` on the task's `Handle`.
43/// 3. The [NATs](https://docs.nats.io/) consumer - This creates a NATs consumer listening for messages
44/// on the `jobs.execute` and `jobs.cancel` topics.
45pub async fn start() -> Result<(), io::Error> {
46    // Create reactor channel
47    let (reactor_tx, reactor_rx) = tokio::sync::mpsc::channel::<Event>(20);
48    // Create Reactor
49    let reactor = Reactor::new();
50    let reactor_handle = task::spawn(async move {
51        let _ = reactor.run(reactor_rx).await;
52    });
53    // create spawner channel
54    let (spawn_tx, spawn_rx) = spawner::spawner_channel();
55    // Create Spawner
56    let mut spawner = spawner::Spawner::new(reactor_tx.clone(), spawn_rx);
57    let spawner_handle = task::spawn(async move {
58        spawner
59            .run()
60            .await
61            .expect("Something went critically wrong");
62    });
63
64    /*
65
66    let loop_no = 10;
67    let start = std::time::Instant::now();
68
69    for _ in 0..loop_no {
70        let _ = spawn_tx.send(spawner::execute_msg("workflow.json")).await;
71    }
72    println!(
73        "{:#?}------------------#################",
74        start.elapsed() / loop_no / 3
75    );
76    println!(
77        "{:#?}------------------#################",
78        start.elapsed() / loop_no
79    );
80    println!("{:#?}------------------#################", start.elapsed());
81    */
82
83    // Connect to NATs server
84    let server = nats::connect("127.0.0.1:4222")?;
85    // Set up subscribers
86    let mut _execution_subscriber = server.subscribe("jobs.execute")?;
87    let mut _cancellation_subscriber = server.subscribe("jobs.cancel")?;
88
89    // let _ = spawn_tx.send(spawner::execute_msg(&uuid::Uuid::new_v4().to_string())).await;
90    // tokio::time::sleep(Duration::from_secs(10)).await;
91    // let _ = spawn_tx.send(spawner::execute_msg(&uuid::Uuid::new_v4().to_string())).await;
92    for _ in 0..1 {
93        //tokio::time::sleep(Duration::from_millis(1000)).await;
94        println!("Spawned");
95        spawn_tx
96            .send(spawner::execute_msg(&uuid::Uuid::new_v4().to_string()))
97            .await;
98    }
99
100    //spawn_tx.send(spawner::cancel_msg("workflow.json")).await;
101    // Await the handles to reactor and spawner to make sure all tasks run to completion
102    let _ = (reactor_handle.await, spawner_handle.await);
103    Ok(())
104}