use std::collections::HashMap;
use std::io;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::select;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::oneshot;
use tokio::task;
use tokio::time::Instant;
use crate::executor;
use crate::reactor::Event;
pub enum SpawnerMsg {
Execute(String),
Cancel(String),
}
static JOB_COUNTER: AtomicUsize = AtomicUsize::new(0);
pub fn spawner_channel() -> (Sender<SpawnerMsg>, Receiver<SpawnerMsg>) {
tokio::sync::mpsc::channel::<SpawnerMsg>(20)
}
pub fn execute_msg(wf_id: &str) -> SpawnerMsg {
SpawnerMsg::Execute(wf_id.to_string())
}
pub fn cancel_msg(job_id: &str) -> SpawnerMsg {
SpawnerMsg::Cancel(job_id.to_string())
}
pub struct Spawner {
reactor_channel: Sender<Event>,
rx: Receiver<SpawnerMsg>,
jobs: Arc<Mutex<HashMap<String, tokio::sync::broadcast::Sender<()>>>>,
}
impl Spawner {
pub fn new(
rc: tokio::sync::mpsc::Sender<Event>,
listener: tokio::sync::mpsc::Receiver<SpawnerMsg>,
) -> Self {
Spawner {
reactor_channel: rc,
rx: listener,
jobs: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn run(&mut self) -> Result<(), io::Error> {
while let Some(msg) = self.rx.recv().await {
match msg {
SpawnerMsg::Execute(file) => {
JOB_COUNTER.fetch_add(1, Ordering::Acquire);
let rc_clone = self.reactor_channel.clone();
let cloned_jobs = self.jobs.clone();
let (cancellation_tx, mut cancellation_rx) = tokio::sync::broadcast::channel::<()>(100);
{
let mut write_jobs = cloned_jobs.lock().expect("Locking failed");
write_jobs.insert(file.clone(), cancellation_tx.clone());
}
task::spawn(async move {
select!(
_ = executor::execute_handler("parallel.json", rc_clone, cancellation_tx) => {
cloned_jobs.lock().unwrap().remove(&file);
}
temp = cancellation_rx.recv() => {
match temp {
Ok(_) => println!("The job was cancelled"),
Err(_) => println!("The sender was dropped")
}
}
);
JOB_COUNTER.fetch_sub(1, Ordering::Acquire);
});
}
SpawnerMsg::Cancel(job_id) => {
let cloned_jobs = self.jobs.clone();
let mut n = cloned_jobs.lock().expect("Failed to lock");
if let Some(sender) = n.remove(&job_id) {
sender.send(()).unwrap();
}
}
}
}
Ok(())
}
}