captain_workflow_manager/executor/
thread_local.rs

1use crate::ExpandedDescr;
2
3use super::*;
4pub struct ThreadLocalExecutorBuilder {
5    pub n_workers: u16,
6}
7
8impl ExecutorBuilder for ThreadLocalExecutorBuilder {
9    type Executor = ThreadLocalExecutor;
10
11    fn init<J: JobUnit>(
12        self,
13    ) -> (
14        Self::Executor,
15        Sender<ToRun<J>>,
16        Sender<ExecutorResult>,
17        Receiver<ExecutorResult>,
18    ) {
19        let to_run_chan = crossbeam_channel::bounded::<ToRun<J>>(2 * (self.n_workers as usize));
20        let done_chan = crossbeam_channel::unbounded();
21        let mut worker_threads = Vec::new();
22        for i in 0..self.n_workers {
23            let rcvr = to_run_chan.1.clone();
24            let sndr = done_chan.0.clone();
25            let handle = thread::Builder::new()
26                .name(format!("worlflow_manager_worker_{}", i))
27                .spawn(move || {
28                    for to_run in rcvr.into_iter() {
29                        let j_idx = to_run.job_idx;
30
31                        let stdout = io::stdout();
32                        {
33                            let mut lock = stdout.lock();
34                            writeln!(lock, "Starting job: {:?}", to_run.job_unit).unwrap();
35                            write!(lock, "Starting job: {:#?} interpreted as: {:#?} ", to_run.job_unit, ExpandedDescr(to_run.job_unit())).unwrap();
36                            writeln!(lock, "\n").unwrap();
37                        }
38                        super::create_out_dir(&to_run.job_unit);
39                        let done_running = Command::new("bash")
40                            .arg("-c")
41                            .arg(to_run.job_unit.cmd().unwrap())
42                            .output()
43                            .unwrap();
44                        if done_running.status.success() {
45                            {
46                                let mut lock = stdout.lock();
47                                writeln!(lock, "Finished job: {:?}\n", to_run.job_unit).unwrap();
48                            }
49                            sndr.send(ExecutorResult::new_ok(j_idx)).unwrap()
50                        } else {
51                            sndr.send(ExecutorResult {
52                                job_idx: j_idx,
53                                result: Err(format!(
54                                    "Process exited with status {:?} and stderr:\n{}",
55                                    done_running.status.code(),
56                                    String::from_utf8_lossy(&done_running.stderr)
57                                )),
58                            })
59                            .unwrap();
60                        }
61                    }
62                })
63                .unwrap();
64            worker_threads.push(handle);
65        }
66        (
67            ThreadLocalExecutor {
68                threads: worker_threads,
69            },
70            to_run_chan.0,
71            done_chan.0,
72            done_chan.1,
73        )
74    }
75}
76
77pub struct ThreadLocalExecutor {
78    threads: Vec<JoinHandle<()>>,
79}
80
81impl Executor for ThreadLocalExecutor {
82    fn join(self)-> Vec<std::thread::Result<()>>  {
83        self.threads.into_iter().map(|jh| jh.join()).collect()
84    }
85}