captain_workflow_manager/executor.rs
1//! Module containing the traits needed to be a job executor as well as provided job executors
2use std::{
3 io,
4 io::Write,
5 process::Command,
6 thread::{self, JoinHandle},
7};
8
9use crossbeam_channel::{Receiver, Sender};
10use petgraph::prelude::NodeIndex;
11
12pub mod dry_run_executor;
13pub use dry_run_executor::DryRunExecutorBuilder;
14pub mod slurm_executor;
15pub use slurm_executor::SlurmExecutorBuilder;
16pub mod thread_local;
17pub use thread_local::ThreadLocalExecutorBuilder;
18
19
20use crate::{JobUnit, ToRun};
21
22type JobResult = Result<(), String>;
23
24/// The result of running the job on the executor
25#[derive(Debug)]
26pub struct ExecutorResult {
27 pub(crate) job_idx: NodeIndex,
28 pub(crate) result: JobResult,
29}
30
31
32impl ExecutorResult {
33 /// Signals that job number `job_idx` has run succesfully.
34 ///
35 /// `job_idx` must come from [`ToRun::job_idx()`].
36 pub fn new_ok(job_idx: NodeIndex) -> Self {
37 Self {
38 job_idx,
39 result: Ok(()),
40 }
41 }
42
43 /// Signals that job number `job_idx` has failed.
44 ///
45 /// `job_idx` must come from [`ToRun::job_idx()`], and `err` describes
46 /// what went wrong.
47 pub fn new_error(job_idx: NodeIndex, err: String) -> Self {
48 Self {
49 job_idx,
50 result: Err(err)
51 }
52 }
53}
54
55/// Trait used to implement the function creating the communication channels and starting the worker threads.
56pub trait ExecutorBuilder {
57 /// The associated [`Executor`] type.
58 type Executor: Executor;
59
60 /// Creates the communication channel *ToRun* and *Results*
61 ///
62 /// The executor should accept jobs to run on *ToRun*, run them, and send the executor result on *Results*.
63 ///
64 /// Returns
65 /// -------
66 /// 1. The [`Executor`] object,
67 /// 2. The send end of crossbeam communication channel *ToRun*,
68 /// 3. The send end of the crossbeam channel *Results*,
69 /// 4. The receive end of the crossbeam channel *Results*.
70 fn init<J: JobUnit>(
71 self,
72 ) -> (
73 Self::Executor,
74 Sender<ToRun<J>>,
75 Sender<ExecutorResult>,
76 Receiver<ExecutorResult>,
77 );
78}
79
80/// A trait representing a running executor
81pub trait Executor {
82 /// Similar to the standard [`join`](`std::thread::JoinHandle::join`), except that there can be several threads which must all return `()`.
83 fn join(self) -> Vec<std::thread::Result<()>>;
84}
85
86fn create_out_dir<J: JobUnit>(j_u: &J) {
87 std::fs::create_dir_all(j_u.out_file().parent().unwrap()).unwrap();
88 if let Some(p) = j_u.log_file() {
89 std::fs::create_dir_all(p.parent().unwrap()).unwrap();
90 }
91}