use std::io;
use tokio::select;
use tokio::sync::{
broadcast,
mpsc::{Receiver, Sender},
};
use tokio::time::Instant;
use tokio::{fs, task};
use crate::reactor::Event;
use crate::workflow;
use crate::workflow::{Job, Message, NodeStatus};
pub async fn execute_handler(
file: &str,
rc_clone: Sender<Event>,
cancellation_tx: broadcast::Sender<()>,
) -> Result<(), io::Error> {
let start = Instant::now();
let wf_json = fs::read_to_string(file).await?;
let wf: workflow::Workflow = serde_json::from_str(&wf_json)?;
let (job, job_rx) = Job::new(&wf, &rc_clone);
println!("{:#?}", job);
run(job, job_rx, cancellation_tx).await; println!("Time taken to run Job: {:#?}", start.elapsed());
Ok(())
}
pub async fn run(job: Job, mut rx: Receiver<Message>, cancellation_tx: broadcast::Sender<()>) {
if let Some(next_node) = job.next_node(None) {
let _ = next_node.get(0).expect("Missing Start Node").run().await; }
while let Some(msg) = rx.recv().await {
match msg.status {
NodeStatus::Failed => { }
NodeStatus::Success => match job.next_node(Some(msg.pointer)) {
Some(nodes) => {
for node in nodes {
let mut cancellation_sub = cancellation_tx.clone().subscribe();
let node_clone = node.clone();
task::spawn(async move {
select! {
_ = node_clone.run() => {},
_ = cancellation_sub.recv() => {
println!("Node cancelled");
}
}
});
}
}
None => {
break; }
},
}
}
}