execution_engine/spawner.rs
1//! Spawns new jobs and handles cancellations
2//!
3//! The job of the Spawner is to receive either a message indicating to execute a workflow or cancel a job.
4//! It executes each new job in a new `Task` allowing for parallelism so multiple jobs can be submitted to the
5//! async executor at once.
6
7use std::collections::HashMap;
8use std::io;
9use std::sync::{Arc, Mutex};
10use std::sync::atomic::{AtomicUsize, Ordering};
11
12use tokio::select;
13use tokio::sync::mpsc::{Receiver, Sender};
14use tokio::sync::oneshot;
15use tokio::task;
16use tokio::time::Instant;
17
18use crate::executor;
19use crate::reactor::Event;
20
21/// Different types of message that can be received by the Spawner
22pub enum SpawnerMsg {
23 /// Execute contains the workflow ID to be executed
24 Execute(String),
25 /// Cancel contains the job ID which needs to be cancelled
26 Cancel(String),
27}
28
29static JOB_COUNTER: AtomicUsize = AtomicUsize::new(0);
30
31/// Create channel required for communication with the Spawner
32pub fn spawner_channel() -> (Sender<SpawnerMsg>, Receiver<SpawnerMsg>) {
33 tokio::sync::mpsc::channel::<SpawnerMsg>(20)
34}
35
36/// Creates `SpawnerMsg::Execute` variant from `&str`
37pub fn execute_msg(wf_id: &str) -> SpawnerMsg {
38 SpawnerMsg::Execute(wf_id.to_string())
39}
40
41/// Creates `SpawnerMsg::Cancel` variant from `&str`
42pub fn cancel_msg(job_id: &str) -> SpawnerMsg {
43 SpawnerMsg::Cancel(job_id.to_string())
44}
45
46/// The Spawner struct contains the state required to create and cancel jobs
47pub struct Spawner {
48 /// The sender half of the reactor channel, this is cloned for each new job
49 reactor_channel: Sender<Event>,
50 /// The receiver half for the Spawner to receive `SpawnerMsg`'s
51 rx: Receiver<SpawnerMsg>,
52 /// A thread safe register of job ids and their cancellation channel, to be used when the Spawner
53 /// receives a `SpawnerMsg::Cancel` message
54 jobs: Arc<Mutex<HashMap<String, tokio::sync::broadcast::Sender<()>>>>,
55}
56
57impl Spawner {
58 /// Create a new Spawner from the `Sender` half of the receiver channel and the `Receiver` half
59 /// of the Spawner channel
60 pub fn new(
61 rc: tokio::sync::mpsc::Sender<Event>,
62 listener: tokio::sync::mpsc::Receiver<SpawnerMsg>,
63 ) -> Self {
64 Spawner {
65 reactor_channel: rc,
66 rx: listener,
67 jobs: Arc::new(Mutex::new(HashMap::new())),
68 }
69 }
70 /// Run the spawner
71 ///
72 /// When running, the spawner `await`s messages on `self.rx` and if the messages are:
73 /// 1. [`SpawnerMsg::Execute`](SpawnerMsg::Execute) - it will create the cancellation channel for this job and run the executor
74 /// for this job in a new `task`
75 /// 2. [`SpawnerMsg::Cancel`](SpawnerMsg::Cancel) - it will broadcast the cancellation message to the appropriate job so
76 /// that the job is aborted gracefully
77 pub async fn run(&mut self) -> Result<(), io::Error> {
78 while let Some(msg) = self.rx.recv().await {
79 match msg {
80 SpawnerMsg::Execute(file) => {
81 JOB_COUNTER.fetch_add(1, Ordering::Acquire);
82 let rc_clone = self.reactor_channel.clone();
83 let cloned_jobs = self.jobs.clone();
84 let (cancellation_tx, mut cancellation_rx) = tokio::sync::broadcast::channel::<()>(100);
85
86 {
87 let mut write_jobs = cloned_jobs.lock().expect("Locking failed");
88 write_jobs.insert(file.clone(), cancellation_tx.clone());
89 }
90 task::spawn(async move {
91 select!(
92 _ = executor::execute_handler("parallel.json", rc_clone, cancellation_tx) => {
93 cloned_jobs.lock().unwrap().remove(&file);
94 }
95 temp = cancellation_rx.recv() => {
96 match temp {
97 Ok(_) => println!("The job was cancelled"),
98 Err(_) => println!("The sender was dropped")
99 }
100 }
101 );
102 JOB_COUNTER.fetch_sub(1, Ordering::Acquire);
103 });
104 }
105
106 SpawnerMsg::Cancel(job_id) => {
107 let cloned_jobs = self.jobs.clone();
108 let mut n = cloned_jobs.lock().expect("Failed to lock");
109 if let Some(sender) = n.remove(&job_id) {
110 sender.send(()).unwrap();
111 }
112 }
113 }
114 }
115 Ok(())
116 }
117}