execution_engine/
spawner.rs

1//! Spawns new jobs and handles cancellations
2//!
3//! The job of the Spawner is to receive either a message indicating to execute a workflow or cancel a job.
4//! It executes each new job in a new `Task` allowing for parallelism so multiple jobs can be submitted to the
5//! async executor at once.
6
7use std::collections::HashMap;
8use std::io;
9use std::sync::{Arc, Mutex};
10use std::sync::atomic::{AtomicUsize, Ordering};
11
12use tokio::select;
13use tokio::sync::mpsc::{Receiver, Sender};
14use tokio::sync::oneshot;
15use tokio::task;
16use tokio::time::Instant;
17
18use crate::executor;
19use crate::reactor::Event;
20
21/// Different types of message that can be received by the Spawner
22pub enum SpawnerMsg {
23    /// Execute contains the workflow ID to be executed
24    Execute(String),
25    /// Cancel contains the job ID which needs to be cancelled
26    Cancel(String),
27}
28
29static JOB_COUNTER: AtomicUsize = AtomicUsize::new(0);
30
31/// Create channel required for communication with the Spawner
32pub fn spawner_channel() -> (Sender<SpawnerMsg>, Receiver<SpawnerMsg>) {
33    tokio::sync::mpsc::channel::<SpawnerMsg>(20)
34}
35
36/// Creates `SpawnerMsg::Execute` variant from `&str`
37pub fn execute_msg(wf_id: &str) -> SpawnerMsg {
38    SpawnerMsg::Execute(wf_id.to_string())
39}
40
41/// Creates `SpawnerMsg::Cancel` variant from `&str`
42pub fn cancel_msg(job_id: &str) -> SpawnerMsg {
43    SpawnerMsg::Cancel(job_id.to_string())
44}
45
46/// The Spawner struct contains the state required to create and cancel jobs
47pub struct Spawner {
48    /// The sender half of the reactor channel, this is cloned for each new job
49    reactor_channel: Sender<Event>,
50    /// The receiver half for the Spawner to receive `SpawnerMsg`'s
51    rx: Receiver<SpawnerMsg>,
52    /// A thread safe register of job ids and their cancellation channel, to be used when the Spawner
53    /// receives a `SpawnerMsg::Cancel` message
54    jobs: Arc<Mutex<HashMap<String, tokio::sync::broadcast::Sender<()>>>>,
55}
56
57impl Spawner {
58    /// Create a new Spawner from the `Sender` half of the receiver channel and the `Receiver` half
59    /// of the Spawner channel
60    pub fn new(
61        rc: tokio::sync::mpsc::Sender<Event>,
62        listener: tokio::sync::mpsc::Receiver<SpawnerMsg>,
63    ) -> Self {
64        Spawner {
65            reactor_channel: rc,
66            rx: listener,
67            jobs: Arc::new(Mutex::new(HashMap::new())),
68        }
69    }
70    /// Run the spawner
71    ///
72    /// When running, the spawner `await`s messages on `self.rx` and if the messages are:
73    /// 1. [`SpawnerMsg::Execute`](SpawnerMsg::Execute) - it will create the cancellation channel for this job and run the executor
74    /// for this job in a new `task`
75    /// 2. [`SpawnerMsg::Cancel`](SpawnerMsg::Cancel) - it will broadcast the cancellation message to the appropriate job so
76    /// that the job is aborted gracefully
77    pub async fn run(&mut self) -> Result<(), io::Error> {
78        while let Some(msg) = self.rx.recv().await {
79            match msg {
80                SpawnerMsg::Execute(file) => {
81                    JOB_COUNTER.fetch_add(1, Ordering::Acquire);
82                    let rc_clone = self.reactor_channel.clone();
83                    let cloned_jobs = self.jobs.clone();
84                    let (cancellation_tx, mut cancellation_rx) = tokio::sync::broadcast::channel::<()>(100);
85
86                    {
87                        let mut write_jobs = cloned_jobs.lock().expect("Locking failed");
88                        write_jobs.insert(file.clone(), cancellation_tx.clone());
89                    }
90                    task::spawn(async move {
91                        select!(
92                            _ = executor::execute_handler("parallel.json", rc_clone, cancellation_tx) => {
93                                cloned_jobs.lock().unwrap().remove(&file);
94                            }
95                            temp = cancellation_rx.recv() => {
96                                match temp {
97                                    Ok(_) => println!("The job was cancelled"),
98                                    Err(_) => println!("The sender was dropped")
99                                }
100                            }
101                        );
102                        JOB_COUNTER.fetch_sub(1, Ordering::Acquire);
103                    });
104                }
105
106                SpawnerMsg::Cancel(job_id) => {
107                    let cloned_jobs = self.jobs.clone();
108                    let mut n = cloned_jobs.lock().expect("Failed to lock");
109                    if let Some(sender) = n.remove(&job_id) {
110                        sender.send(()).unwrap();
111                    }
112                }
113            }
114        }
115        Ok(())
116    }
117}