captain_workflow_manager/lib.rs
1//! This library helps you run pipelines of jobs that depend on each other.
2//! Its modularity allows to run jobs either locally, on a cluster or other remote computing resources.
3//!
4//! Consider a set of jobs as illustrated below, where two kind of source files are used to generate
5//! two kind of intermediate files which are in turn used to generate a final result.
6//!
7//! ```text
8//! ┌─────────────────┐
9//! ┌─────────────────┐ │ Source File B: │
10//! │ Source File A: │ │ - Param1 │
11//! │ - Param1 │ │ - Param2 │
12//! └─────────┬───────┘ └───────┬─────────┘
13//! │ │
14//! │ │
15//! │ ┌──────────┴───────────────┐
16//! │ │ │
17//! │ │ │
18//! │ │ │
19//! ▼ ▼ ▼
20//! ┌──────────────────────┐ ┌──────────────────────┐
21//! │ Intermediate File A: │ │ Intermediate File B: │
22//! │ - Param1 │ │ - Param1 │
23//! │ - Param2 │ │ - Param2 │
24//! └────────────────┬─────┘ └──────┬───────────────┘
25//! │ │
26//! └─────────┬──────────┘
27//! │
28//! │
29//! ▼
30//! ┌───────────────┐
31//! │ Final Result: │
32//! │ - Param1 │
33//! │ - Param2 │
34//! └───────────────┘
35//! ```
36//!
37//! Each kind of file has one or more "parameters".
38//!
39//! This dependency could be represented by the following `Job` enum, assuming that `Param1` is an integer and `Param2` a string:
40//!
41//! ```rust
42//! enum Job {
43//! SourceFileA {param1: u16},
44//! SourceFileB {param1: u16, param2: &'static str},
45//! IntermediateFileA {param1: u16, param2: &'static str},
46//! IntermediateFileB {param1: u16, param2: &'static str},
47//! FinalResult {param1: u16, param2: &'static str},
48//! }
49//! ```
50//!
51//! To manage this set of jobs using *captain*, one would first implement the [`JobUnit`] trait on it. And then run it using the job [`Scheduler`].
52//! The back-end to run it on is chosen by selecting an [`ExecutorBuilder`].
53use std::{any::Any, collections::{HashMap, HashSet}, fmt::{self, Debug}, hash::Hash, io::{self, Write}, path::PathBuf};
54
55use executor::ExecutorBuilder;
56use log::trace;
57use thiserror::Error;
58
59use crossbeam_channel::SendError;
60use petgraph::prelude::*;
61
62use crate::executor::{Executor, ExecutorResult};
63
64pub mod executor;
65
66/// A trait that must be implemented by struct representing job units.
67///
68/// A single struct is used to represent all the possible job units and their dependencies,
69/// so most of the checks are performed at run-time.
70pub trait JobUnit: Debug + Any + Sized + Copy + Hash + Eq + Send {
71 /// Generate a vector containing all dependencies of the `self`
72 fn deps(&self) -> Vec<Self>;
73 /// Returns the command to run to generate the output file of the job
74 ///
75 /// If no command is specified (ie. this function returns [`None`]),
76 /// the output file must be present on disk.
77 fn cmd(&self) -> Option<String>;
78 /// Path of the output file
79 fn out_file(&self) -> PathBuf;
80 /// Path of the log file of the job, if any
81 fn log_file(&self) -> Option<PathBuf>;
82 /// Name of the job
83 fn name(&self) -> &'static str;
84
85 /// A nice-ness score akin to Unix niceness: the higher the highest priority.
86 fn nice(&self) -> u16 {
87 0
88 }
89}
90
91struct ExpandedDescr<'a, J: JobUnit>(&'a J);
92
93impl<'a, J: JobUnit> Debug for ExpandedDescr<'a, J> {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 let inner = self.0;
96 let mut dbg_struct = f.debug_struct(inner.name());
97 dbg_struct
98 .field("dependencies", &inner.deps())
99 .field("output file",&inner.out_file());
100 match &inner.cmd() {
101 Some(cmd) => dbg_struct.field("command",cmd),
102 x@None => dbg_struct.field("command",&x),
103 };
104 match &inner.log_file() {
105 Some(log) => dbg_struct.field("log",log),
106 x@None => dbg_struct.field("log",&x),
107 };
108 dbg_struct.finish()
109 }
110}
111
112/// A job to be run by an Executor
113#[derive(Debug)]
114pub struct ToRun<J: JobUnit> {
115 job_idx: NodeIndex,
116 job_unit: J,
117}
118
119impl<J: JobUnit> ToRun<J> {
120 /// Get the job number of the job to run.
121 pub fn job_idx(&self) -> NodeIndex {
122 self.job_idx
123 }
124
125 /// The job to run description
126 pub fn job_unit(&self) -> &J {
127 &self.job_unit
128 }
129}
130
131/// The entry point of the crate, implementing the job scheduling logic
132///
133/// Example
134/// ------
135///
136/// ```no_run
137/// use captain_workflow_manager::{executor, Scheduler};
138/// # use std::path::PathBuf;
139/// # #[derive(Debug,Clone, Copy, PartialEq, Eq, Hash)]
140/// # struct J {}
141/// # impl captain_workflow_manager::JobUnit for J {
142/// # fn deps(&self) -> Vec<Self> { vec![]}
143/// # fn cmd(&self) -> Option<String> {None}
144/// # fn out_file(&self) -> PathBuf { PathBuf::new()}
145/// # fn log_file(&self) -> Option<PathBuf> {None}
146/// # fn name(&self) -> &'static str {&""}
147/// # }
148/// # let some_jobs = unimplemented!();
149/// let executor = executor::SlurmExecutorBuilder { max_jobs: 10_000 };
150/// let scheduler = Scheduler::new(executor);
151/// scheduler.run(some_jobs)?;
152/// # Ok::<(), captain_workflow_manager::Error<J>>(())
153/// ```
154pub struct Scheduler<ExB: ExecutorBuilder> {
155 ex_builder: ExB,
156}
157
158/// The Error type of this crate
159#[derive(Debug, Error)]
160pub enum Error<J: JobUnit> {
161 #[error("error sending job to worker")]
162 ErrorSendingJob(#[from] SendError<J>),
163 #[error("system I/O error")]
164 IoError(#[from] io::Error),
165 #[error("Error in subcommand: {msg}")]
166 SubCommandError { msg: String },
167 #[error("Command-less job unit file is absent: {file}")]
168 MissingCommandLessOutputFile { file: PathBuf },
169}
170
171impl<ExB: ExecutorBuilder> Scheduler<ExB> {
172 /// Creates a new scheduler objects, with the provided [`ExecutorBuilder`]
173 ///
174 /// Executor builders can be found in the [`executor`] module, or custom ones can be implemented.
175 pub fn new(ex_builder: ExB) -> Self {
176 Scheduler { ex_builder }
177 }
178
179 /// Run each job in `final_job_units` and its transitive dependencies.
180 ///
181 /// A job won't be run if its output file is already present.
182 /// (Currently no finer dependency freshness management is attempted.)
183 /// Moreover, dependencies of such jobs won't be run either.
184 pub fn run<J: JobUnit>(self, final_job_units: Vec<J>) -> Result<(), Error<J>> {
185 let mut graph = petgraph::stable_graph::StableDiGraph::new();
186 let mut stack = final_job_units;
187 let mut already_processed = HashSet::new();
188 let mut indexes = HashMap::new();
189 while let Some(job_unit) = stack.pop() {
190 let newly_seen = already_processed.insert(job_unit);
191 if !newly_seen {
192 continue;
193 }
194 if job_unit.out_file().exists() {
195 continue;
196 }
197 let &mut ju_index = indexes
198 .entry(job_unit)
199 .or_insert_with_key(|j| graph.add_node(Some(*j)));
200 let deps = job_unit.deps();
201 for dep in deps {
202 if dep.out_file().exists() {
203 continue;
204 }
205 if !already_processed.contains(&dep) {
206 stack.push(dep);
207 }
208 let &mut dep_index = indexes
209 .entry(dep)
210 .or_insert_with_key(|j| graph.add_node(Some(*j)));
211 debug_assert!(!graph.contains_edge(ju_index, dep_index));
212 graph.add_edge(ju_index, dep_index, ());
213 }
214 }
215 let (executor, to_run_channel_sndr, done_channel_sndr, done_channel_rcvr) =
216 self.ex_builder.init();
217
218 let no_deps_nodes: Vec<_> = graph
219 .node_indices()
220 .filter(|&idx| graph.neighbors_directed(idx, Outgoing).next().is_none())
221 .collect();
222 let start_idx = graph.add_node(None);
223 for job_idx in no_deps_nodes {
224 graph.add_edge(job_idx, start_idx, ());
225 }
226 let start_count: usize = graph.node_count();
227 let mut done_count: usize = 0;
228 let mut failed_count: usize = 0;
229 done_channel_sndr
230 .send(ExecutorResult {
231 job_idx: start_idx,
232 result: Ok(()),
233 })
234 .unwrap();
235
236 let done_iter = done_channel_rcvr.into_iter();
237 trace!("Entering done_iter main loop.");
238 for done in done_iter {
239 trace!("Iterating done_iter main loop.");
240 let j_idx = done.job_idx;
241 if let Err(msg) = done.result {
242 trace!("Job errored.");
243 let job_unit = graph.node_weight(j_idx).unwrap().unwrap();
244 {
245 let stderr = io::stderr();
246 let mut lock = stderr.lock();
247 writeln!(
248 lock,
249 "There was an error running the following job: {:#?} interpreted as {:#?}\nMessage was: {}",
250 job_unit, ExpandedDescr(&job_unit), msg
251 )
252 .unwrap();
253
254 let mut stack = vec![j_idx];
255 while let Some(idx) = stack.pop() {
256 stack.extend(graph.neighbors_directed(idx, Incoming));
257 writeln!(
258 lock,
259 "Because a dependency failed, deleting job: {:?}\n",
260 graph.node_weight(idx).unwrap()
261 )
262 .unwrap();
263 graph.remove_node(idx);
264 failed_count += 1;
265 }
266 }
267 continue;
268 // return Err(Error::SubCommandError { msg });
269 }
270 trace!("Job was succesful.");
271
272 let was_required_by: Vec<_> = graph.neighbors_directed(j_idx, Incoming).collect();
273
274 graph.remove_node(j_idx);
275 done_count += 1;
276 assert_eq!(done_count + failed_count + graph.node_count(), start_count);
277 // eprintln!("DEBUG:foo");
278 trace!("Writting progress.");
279 writeln!(
280 io::stdout().lock(),
281 "{} done ({:5.1}%), {} failed ({:5.1}%), {} remaining ({:5.1}%)\n",
282 done_count,
283 100. * (done_count as f64) / (start_count as f64),
284 failed_count,
285 100. * (failed_count as f64) / (start_count as f64),
286 graph.node_count(),
287 100. * (graph.node_count() as f64) / (start_count as f64)
288 )
289 .unwrap();
290 if graph.node_count() == 0 {
291 break;
292 }
293 // eprintln!("DEBUG:bar");
294 trace!("Submitting dependent jobs.");
295 for job_idx in was_required_by {
296 if graph.neighbors_directed(job_idx, Outgoing).count() == 0 {
297 let j_u = graph.node_weight(job_idx).unwrap().unwrap();
298 let out = j_u.out_file();
299 if out.exists() {
300 writeln!(
301 io::stdout().lock(),
302 "Output already present, skipping job: {:?}\n",
303 j_u
304 )
305 .unwrap();
306 done_channel_sndr
307 .send(ExecutorResult {
308 result: Ok(()),
309 job_idx,
310 })
311 .unwrap();
312 } else {
313 if j_u.cmd().is_none() {
314 return Err(Error::MissingCommandLessOutputFile {
315 file: j_u.out_file(),
316 });
317 }
318 // eprintln!("DEBUG:zob");
319 to_run_channel_sndr
320 .send(ToRun {
321 job_idx,
322 job_unit: j_u,
323 })
324 .unwrap();
325 // eprintln!("DEBUG:kola");
326 }
327 }
328 }
329 }
330 drop(to_run_channel_sndr);
331 executor.join();
332 Ok(())
333 }
334}