captain_workflow_manager/
lib.rs

1//! This library helps you run pipelines of jobs that depend on each other.
2//! Its modularity allows to run jobs either locally, on a cluster or other remote computing resources.
3//! 
4//! Consider a set of jobs as illustrated below, where two kind of source files are used to generate
5//! two kind of intermediate files which are in turn used to generate a final result.
6//!
7//! ```text
8//!                          ┌─────────────────┐
9//! ┌─────────────────┐      │  Source File B: │
10//! │  Source File A: │      │    - Param1     │
11//! │    - Param1     │      │    - Param2     │
12//! └─────────┬───────┘      └───────┬─────────┘
13//!           │                      │
14//!           │                      │
15//!           │           ┌──────────┴───────────────┐
16//!           │           │                          │
17//!           │           │                          │
18//!           │           │                          │
19//!           ▼           ▼                          ▼
20//!       ┌──────────────────────┐       ┌──────────────────────┐
21//!       │ Intermediate File A: │       │ Intermediate File B: │
22//!       │   - Param1           │       │   - Param1           │
23//!       │   - Param2           │       │   - Param2           │
24//!       └────────────────┬─────┘       └──────┬───────────────┘
25//!                        │                    │
26//!                        └─────────┬──────────┘
27//!                                  │
28//!                                  │
29//!                                  ▼
30//!                          ┌───────────────┐
31//!                          │ Final Result: │
32//!                          │   - Param1    │
33//!                          │   - Param2    │
34//!                          └───────────────┘
35//! ```
36//! 
37//! Each kind of file has one or more "parameters".
38//! 
39//! This dependency could be represented by the following `Job` enum, assuming that `Param1` is an integer and `Param2` a string:
40//! 
41//! ```rust
42//! enum Job {
43//!     SourceFileA {param1: u16},
44//!     SourceFileB {param1: u16, param2: &'static str},
45//!     IntermediateFileA {param1: u16, param2: &'static str},
46//!     IntermediateFileB {param1: u16, param2: &'static str},
47//!     FinalResult {param1: u16, param2: &'static str},
48//! }
49//! ```
50//! 
51//! To manage this set of jobs using *captain*, one would first implement the [`JobUnit`] trait on it. And then run it using the job [`Scheduler`].
52//! The back-end to run it on is chosen by selecting an [`ExecutorBuilder`].
53use std::{any::Any, collections::{HashMap, HashSet}, fmt::{self, Debug}, hash::Hash, io::{self, Write}, path::PathBuf};
54
55use executor::ExecutorBuilder;
56use log::trace;
57use thiserror::Error;
58
59use crossbeam_channel::SendError;
60use petgraph::prelude::*;
61
62use crate::executor::{Executor, ExecutorResult};
63
64pub mod executor;
65
66/// A trait that must be implemented by struct representing job units.
67/// 
68/// A single struct is used to represent all the possible job units and their dependencies,
69/// so most of the checks are performed at run-time.
70pub trait JobUnit: Debug + Any + Sized + Copy + Hash + Eq + Send {
71    /// Generate a vector containing all dependencies of the `self` 
72    fn deps(&self) -> Vec<Self>;
73    /// Returns the command to run to generate the output file of the job
74    /// 
75    /// If no command is specified (ie. this function returns [`None`]),
76    /// the output file must be present on disk.
77    fn cmd(&self) -> Option<String>;
78    /// Path of the output file
79    fn out_file(&self) -> PathBuf;
80    /// Path of the log file of the job, if any
81    fn log_file(&self) -> Option<PathBuf>;
82    /// Name of the job
83    fn name(&self) -> &'static str;
84
85    /// A nice-ness score akin to Unix niceness: the higher the highest priority.
86    fn nice(&self) -> u16 {
87        0
88    }
89}
90
91struct ExpandedDescr<'a, J: JobUnit>(&'a J);
92
93impl<'a, J: JobUnit> Debug for ExpandedDescr<'a, J> {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        let inner = self.0;
96        let mut dbg_struct = f.debug_struct(inner.name());
97        dbg_struct
98        .field("dependencies", &inner.deps())
99        .field("output file",&inner.out_file());
100        match &inner.cmd() {
101            Some(cmd) =>  dbg_struct.field("command",cmd),
102            x@None => dbg_struct.field("command",&x),
103        };
104        match &inner.log_file() {
105            Some(log) => dbg_struct.field("log",log),
106            x@None => dbg_struct.field("log",&x),
107        };
108        dbg_struct.finish()
109    }
110}
111
112/// A job to be run by an Executor
113#[derive(Debug)]
114pub struct ToRun<J: JobUnit> {
115    job_idx: NodeIndex,
116    job_unit: J,
117}
118
119impl<J: JobUnit> ToRun<J> {
120    /// Get the job number of the job to run.
121    pub fn job_idx(&self) -> NodeIndex {
122        self.job_idx
123    }
124
125    /// The job to run description
126    pub fn job_unit(&self) -> &J {
127        &self.job_unit
128    }
129}
130
131/// The entry point of the crate, implementing the job scheduling logic
132/// 
133/// Example
134/// ------
135/// 
136/// ```no_run
137/// use captain_workflow_manager::{executor, Scheduler};
138/// # use std::path::PathBuf;
139/// # #[derive(Debug,Clone, Copy, PartialEq, Eq, Hash)]
140/// # struct J {}
141/// # impl captain_workflow_manager::JobUnit for J {
142/// # fn deps(&self) -> Vec<Self> { vec![]}
143/// # fn cmd(&self) -> Option<String> {None}
144/// # fn out_file(&self) -> PathBuf { PathBuf::new()}
145/// # fn log_file(&self) -> Option<PathBuf> {None}
146/// # fn name(&self) -> &'static str {&""}
147/// # }
148/// # let some_jobs = unimplemented!();
149/// let executor = executor::SlurmExecutorBuilder { max_jobs: 10_000 };
150/// let scheduler = Scheduler::new(executor);
151/// scheduler.run(some_jobs)?;
152/// # Ok::<(), captain_workflow_manager::Error<J>>(())
153/// ```
154pub struct Scheduler<ExB: ExecutorBuilder> {
155    ex_builder: ExB,
156}
157
158/// The Error type of this crate
159#[derive(Debug, Error)]
160pub enum Error<J: JobUnit> {
161    #[error("error sending job to worker")]
162    ErrorSendingJob(#[from] SendError<J>),
163    #[error("system I/O error")]
164    IoError(#[from] io::Error),
165    #[error("Error in subcommand: {msg}")]
166    SubCommandError { msg: String },
167    #[error("Command-less job unit file is absent: {file}")]
168    MissingCommandLessOutputFile { file: PathBuf },
169}
170
171impl<ExB: ExecutorBuilder> Scheduler<ExB> {
172    /// Creates a new scheduler objects, with the provided [`ExecutorBuilder`]
173    /// 
174    /// Executor builders can be found in the [`executor`] module, or custom ones can be implemented.
175    pub fn new(ex_builder: ExB) -> Self {
176        Scheduler { ex_builder }
177    }
178
179    /// Run each job in `final_job_units` and its transitive dependencies.
180    /// 
181    /// A job won't be run if its output file is already present.
182    /// (Currently no finer dependency freshness management is attempted.)
183    /// Moreover, dependencies of such jobs won't be run either.
184    pub fn run<J: JobUnit>(self, final_job_units: Vec<J>) -> Result<(), Error<J>> {
185        let mut graph = petgraph::stable_graph::StableDiGraph::new();
186        let mut stack = final_job_units;
187        let mut already_processed = HashSet::new();
188        let mut indexes = HashMap::new();
189        while let Some(job_unit) = stack.pop() {
190            let newly_seen = already_processed.insert(job_unit);
191            if !newly_seen {
192                continue;
193            }
194            if job_unit.out_file().exists() {
195                continue;
196            }
197            let &mut ju_index = indexes
198                .entry(job_unit)
199                .or_insert_with_key(|j| graph.add_node(Some(*j)));
200            let deps = job_unit.deps();
201            for dep in deps {
202                if dep.out_file().exists() {
203                    continue;
204                }
205                if !already_processed.contains(&dep) {
206                    stack.push(dep);
207                }
208                let &mut dep_index = indexes
209                    .entry(dep)
210                    .or_insert_with_key(|j| graph.add_node(Some(*j)));
211                debug_assert!(!graph.contains_edge(ju_index, dep_index));
212                graph.add_edge(ju_index, dep_index, ());
213            }
214        }
215        let (executor, to_run_channel_sndr, done_channel_sndr, done_channel_rcvr) =
216            self.ex_builder.init();
217
218        let no_deps_nodes: Vec<_> = graph
219            .node_indices()
220            .filter(|&idx| graph.neighbors_directed(idx, Outgoing).next().is_none())
221            .collect();
222        let start_idx = graph.add_node(None);
223        for job_idx in no_deps_nodes {
224            graph.add_edge(job_idx, start_idx, ());
225        }
226        let start_count: usize = graph.node_count();
227        let mut done_count: usize = 0;
228        let mut failed_count: usize = 0;
229        done_channel_sndr
230            .send(ExecutorResult {
231                job_idx: start_idx,
232                result: Ok(()),
233            })
234            .unwrap();
235
236        let done_iter = done_channel_rcvr.into_iter();
237        trace!("Entering done_iter main loop.");
238        for done in done_iter {
239            trace!("Iterating done_iter main loop.");
240            let j_idx = done.job_idx;
241            if let Err(msg) = done.result {
242                trace!("Job errored.");
243                let job_unit = graph.node_weight(j_idx).unwrap().unwrap();
244                {
245                    let stderr = io::stderr();
246                    let mut lock = stderr.lock();
247                    writeln!(
248                        lock,
249                        "There was an error running the following job: {:#?} interpreted as {:#?}\nMessage was: {}",
250                        job_unit, ExpandedDescr(&job_unit), msg
251                    )
252                    .unwrap();
253
254                    let mut stack = vec![j_idx];
255                    while let Some(idx) = stack.pop() {
256                        stack.extend(graph.neighbors_directed(idx, Incoming));
257                        writeln!(
258                            lock,
259                            "Because a dependency failed, deleting job: {:?}\n",
260                            graph.node_weight(idx).unwrap()
261                        )
262                        .unwrap();
263                        graph.remove_node(idx);
264                        failed_count += 1;
265                    }
266                }
267                continue;
268                // return Err(Error::SubCommandError { msg });
269            }
270            trace!("Job was succesful.");
271
272            let was_required_by: Vec<_> = graph.neighbors_directed(j_idx, Incoming).collect();
273
274            graph.remove_node(j_idx);
275            done_count += 1;
276            assert_eq!(done_count + failed_count + graph.node_count(), start_count);
277            // eprintln!("DEBUG:foo");
278            trace!("Writting progress.");
279            writeln!(
280                io::stdout().lock(),
281                "{} done ({:5.1}%), {} failed ({:5.1}%), {} remaining ({:5.1}%)\n",
282                done_count,
283                100. * (done_count as f64) / (start_count as f64),
284                failed_count,
285                100. * (failed_count as f64) / (start_count as f64),
286                graph.node_count(),
287                100. * (graph.node_count() as f64) / (start_count as f64)
288            )
289            .unwrap();
290            if graph.node_count() == 0 {
291                break;
292            }
293            // eprintln!("DEBUG:bar");
294            trace!("Submitting dependent jobs.");
295            for job_idx in was_required_by {
296                if graph.neighbors_directed(job_idx, Outgoing).count() == 0 {
297                    let j_u = graph.node_weight(job_idx).unwrap().unwrap();
298                    let out = j_u.out_file();
299                    if out.exists() {
300                        writeln!(
301                            io::stdout().lock(),
302                            "Output already present, skipping job: {:?}\n",
303                            j_u
304                        )
305                        .unwrap();
306                        done_channel_sndr
307                            .send(ExecutorResult {
308                                result: Ok(()),
309                                job_idx,
310                            })
311                            .unwrap();
312                    } else {
313                        if j_u.cmd().is_none() {
314                            return Err(Error::MissingCommandLessOutputFile {
315                                file: j_u.out_file(),
316                            });
317                        }
318                        // eprintln!("DEBUG:zob");
319                        to_run_channel_sndr
320                            .send(ToRun {
321                                job_idx,
322                                job_unit: j_u,
323                            })
324                            .unwrap();
325                        // eprintln!("DEBUG:kola");
326                    }
327                }
328            }
329        }
330        drop(to_run_channel_sndr);
331        executor.join();
332        Ok(())
333    }
334}