execution_engine/executor.rs
1//! The logic for executing a job
2//!
3//! When a job is being executed, logically it is separated into two different areas:
4//! 1. retrieving the next node to be run
5//! 2. running the current node(s)
6//!
7//! ## Get Next Node
8//! The [`Job`](crate::workflow::Job) struct has a method defined on it called [`Job.next_node()`](crate::workflow::Job::next_node)
9//! which will return the next node or nodes. This works in an event based pattern whereby each
10//! node, on completion, will send a message to the `Job` struct informing it that the node has
11//! completed.
12//!
13//! The [`Message`](crate::workflow::Message) that is sent returns a status to let the job know whether the node completed
14//! successfully. On successful completion, the executor will call `job.next_node(msg.pointer)` to
15//! fetch the next node(s) to be executed.
16//!
17//! ## Run the current node(s)
18//! As each node is returned, it needs to be run. Every node implements the [`Node`](crate::node::Node) type
19//! which means that to run it we can simply call the `run` method. Each node is run in a new task
20//! so we can run multiple nodes in parallel. This is so we can accommodate for parallels and exclusives.
21//!
22//! ```
23//! for node in job.next_node(...) {
24//! task::spawn(node.run().await) // executes in background, immediately returning
25//! }
26//! ```
27
28use std::io;
29
30use tokio::select;
31use tokio::sync::{
32 broadcast,
33 mpsc::{Receiver, Sender},
34};
35use tokio::time::Instant;
36use tokio::{fs, task};
37
38use crate::reactor::Event;
39use crate::workflow;
40use crate::workflow::{Job, Message, NodeStatus};
41
42/// The execute_handler function takes a workflow ID, gets the workflow and creates a Job struct
43/// from that workflow. Once we have a Job struct, the job is driven to completion by the [`run`](self::run) function.
44pub async fn execute_handler(
45 file: &str,
46 rc_clone: Sender<Event>,
47 cancellation_tx: broadcast::Sender<()>,
48) -> Result<(), io::Error> {
49 let start = Instant::now();
50 let wf_json = fs::read_to_string(file).await?;
51 let wf: workflow::Workflow = serde_json::from_str(&wf_json)?;
52 let (job, job_rx) = Job::new(&wf, &rc_clone);
53 println!("{:#?}", job);
54 run(job, job_rx, cancellation_tx).await; // todo: return result here so we can bubble up the result
55 println!("Time taken to run Job: {:#?}", start.elapsed());
56 Ok(())
57}
58
59/// Take ownership of a job and the jobs receiver and drive the job to completion by getting
60/// each node and running them as they can be run.
61///
62/// Each node is responsible for notifying the job that it can move forward
63/// The next node function will need to take a pointer to the current node that has finished
64/// So it knows where to resume the job from
65pub async fn run(job: Job, mut rx: Receiver<Message>, cancellation_tx: broadcast::Sender<()>) {
66 if let Some(next_node) = job.next_node(None) {
67 // gets start node at very beginning
68 let _ = next_node.get(0).expect("Missing Start Node").run().await; // Waiting for start node to complete
69 }
70
71 while let Some(msg) = rx.recv().await {
72 match msg.status {
73 NodeStatus::Failed => { /* The job must now return with status of failed */ }
74 NodeStatus::Success => match job.next_node(Some(msg.pointer)) {
75 Some(nodes) => {
76 for node in nodes {
77 let mut cancellation_sub = cancellation_tx.clone().subscribe();
78 let node_clone = node.clone();
79 task::spawn(async move {
80 select! {
81 _ = node_clone.run() => {},
82 _ = cancellation_sub.recv() => {
83 println!("Node cancelled");
84 }
85 }
86 });
87 }
88 }
89 None => {
90 //drop(job);
91 break; // This is because we don't drop the Job when End is returned so we need to manually break out of this loop
92 // We could manually drop Job here which would have the same effect
93 }
94 },
95 }
96 }
97 //todo!() // what is there to do here? I'm not sure?!?
98}