captain_workflow_manager/executor/
thread_local.rs1use crate::ExpandedDescr;
2
3use super::*;
4pub struct ThreadLocalExecutorBuilder {
5 pub n_workers: u16,
6}
7
8impl ExecutorBuilder for ThreadLocalExecutorBuilder {
9 type Executor = ThreadLocalExecutor;
10
11 fn init<J: JobUnit>(
12 self,
13 ) -> (
14 Self::Executor,
15 Sender<ToRun<J>>,
16 Sender<ExecutorResult>,
17 Receiver<ExecutorResult>,
18 ) {
19 let to_run_chan = crossbeam_channel::bounded::<ToRun<J>>(2 * (self.n_workers as usize));
20 let done_chan = crossbeam_channel::unbounded();
21 let mut worker_threads = Vec::new();
22 for i in 0..self.n_workers {
23 let rcvr = to_run_chan.1.clone();
24 let sndr = done_chan.0.clone();
25 let handle = thread::Builder::new()
26 .name(format!("worlflow_manager_worker_{}", i))
27 .spawn(move || {
28 for to_run in rcvr.into_iter() {
29 let j_idx = to_run.job_idx;
30
31 let stdout = io::stdout();
32 {
33 let mut lock = stdout.lock();
34 writeln!(lock, "Starting job: {:?}", to_run.job_unit).unwrap();
35 write!(lock, "Starting job: {:#?} interpreted as: {:#?} ", to_run.job_unit, ExpandedDescr(to_run.job_unit())).unwrap();
36 writeln!(lock, "\n").unwrap();
37 }
38 super::create_out_dir(&to_run.job_unit);
39 let done_running = Command::new("bash")
40 .arg("-c")
41 .arg(to_run.job_unit.cmd().unwrap())
42 .output()
43 .unwrap();
44 if done_running.status.success() {
45 {
46 let mut lock = stdout.lock();
47 writeln!(lock, "Finished job: {:?}\n", to_run.job_unit).unwrap();
48 }
49 sndr.send(ExecutorResult::new_ok(j_idx)).unwrap()
50 } else {
51 sndr.send(ExecutorResult {
52 job_idx: j_idx,
53 result: Err(format!(
54 "Process exited with status {:?} and stderr:\n{}",
55 done_running.status.code(),
56 String::from_utf8_lossy(&done_running.stderr)
57 )),
58 })
59 .unwrap();
60 }
61 }
62 })
63 .unwrap();
64 worker_threads.push(handle);
65 }
66 (
67 ThreadLocalExecutor {
68 threads: worker_threads,
69 },
70 to_run_chan.0,
71 done_chan.0,
72 done_chan.1,
73 )
74 }
75}
76
77pub struct ThreadLocalExecutor {
78 threads: Vec<JoinHandle<()>>,
79}
80
81impl Executor for ThreadLocalExecutor {
82 fn join(self)-> Vec<std::thread::Result<()>> {
83 self.threads.into_iter().map(|jh| jh.join()).collect()
84 }
85}