captain_workflow_manager/executor/
slurm_executor.rs

1use std::{
2    collections::HashMap,
3    sync::{
4        atomic::{AtomicBool, AtomicU64, Ordering},
5        Arc, Mutex,
6    },
7};
8
9use regex::Regex;
10
11use crate::ExpandedDescr;
12
13use super::*;
14
15pub struct SlurmExecutorBuilder {
16    pub max_jobs: u64,
17}
18
19impl ExecutorBuilder for SlurmExecutorBuilder {
20    type Executor = SlurmExecutor;
21
22    fn init<J: JobUnit>(
23        self,
24    ) -> (
25        Self::Executor,
26        Sender<ToRun<J>>,
27        Sender<ExecutorResult>,
28        Receiver<ExecutorResult>,
29    ) {
30        let to_run_chan = crossbeam_channel::unbounded::<ToRun<J>>();
31        let done_chan = crossbeam_channel::unbounded();
32        let submitted_jobs = Arc::new(Mutex::new(HashMap::new()));
33        let done_submitting = Arc::new(AtomicBool::new(false));
34        let currently_submitted = Arc::new(AtomicU64::new(0));
35        // TODO: max submission at at time
36
37        let mut submitting_threads = Vec::new();
38        for _i in 0..32 {
39            let submitting_thread = {
40                let rcvr = to_run_chan.1.clone();
41                let done_sndr = done_chan.0.clone();
42                let submitted_jobs_ptr = submitted_jobs.clone();
43                let done_submitting = done_submitting.clone();
44                let currently_submitted = currently_submitted.clone();
45                let max_jobs = self.max_jobs;
46                thread::Builder::new()
47                .name("submitting_thread".into())
48                .spawn(move || {
49                    for to_run in rcvr.into_iter() {
50                        while currently_submitted.load(Ordering::Acquire) >= max_jobs {
51                            thread::park();
52                        }
53                        let j_idx = to_run.job_idx;
54                        let stdout = io::stdout();
55                        {
56                            let mut lock = stdout.lock();
57                            writeln!(lock, "Submitting job: {:?}", to_run.job_unit).unwrap();
58                            write!(lock, "Submitting job: {:#?} interpreted as: {:#?} ", to_run.job_unit, ExpandedDescr(to_run.job_unit())).unwrap();
59                            writeln!(lock, "\n").unwrap();
60                        }
61                        super::create_out_dir(&to_run.job_unit);
62                        let mut sbatch_cmd = Command::new("sbatch");
63                        sbatch_cmd.arg("-J")
64                            .arg(to_run.job_unit.name())
65                            .arg("--partition=common,dedicated,inbio")
66                            // .arg("--partition=inbio")
67                            .arg("--exclude=maestro-1101")
68                            .arg("--qos=fast")
69                            .arg(format!("--nice={}",to_run.job_unit.nice()))
70                            .arg("--parsable");
71                        let sbatch_cmd = match to_run.job_unit.log_file() {
72                            Some(p) => {sbatch_cmd.arg("--output").arg(p)}
73                            None => {sbatch_cmd.arg("--output=slurm-log-snakemake.out")
74                            .arg("--open-mode=append")}
75                        }.arg("--wrap")
76                            .arg(to_run.job_unit.cmd().unwrap())
77                            .output()
78                            .unwrap();
79                        if sbatch_cmd.status.success() {
80                            currently_submitted.fetch_add(1, Ordering::AcqRel);
81                            let slurm_job_id: u64 = std::str::from_utf8(&sbatch_cmd.stdout)
82                                .unwrap()
83                                .trim()
84                                .parse()
85                                .unwrap();
86                            submitted_jobs_ptr
87                                .lock()
88                                .unwrap()
89                                .insert(slurm_job_id, j_idx);
90                        } else {
91                            done_sndr
92                                .send(ExecutorResult {
93                                    job_idx: j_idx,
94                                    result: Err(format!(
95                                        "Could not queue job. Exited with status {:?} and stderr:\n{}",
96                                            sbatch_cmd.status.code(),
97                                            String::from_utf8_lossy(&sbatch_cmd.stderr)
98                                    )),
99                                })
100                                .unwrap();
101                        }
102                    }
103                    done_submitting.store(true, Ordering::Release);
104                })
105                .unwrap()
106            };
107            submitting_threads.push(submitting_thread);
108        }
109        let status_polling_thread = {
110            let sndr = done_chan.0.clone();
111            let submitting_threads: Vec<_> = submitting_threads
112                .iter()
113                .map(|jh| jh.thread().clone())
114                .collect();
115            let currently_submitted = currently_submitted;
116            thread::Builder::new()
117                .name("status_polling_thread".into())
118                .spawn(move || {
119                    let stdout = io::stdout();
120                    let re = Regex::new(r"(\d+)(\.[^\|]+)?\|([^\|]+)\|([^\|]+)\|").unwrap();
121                    let mut rate_limiter = fence::Fence::from_millis(100);
122                    loop {
123                        let jobs_slurm_ids: Vec<_> = submitted_jobs
124                            .lock()
125                            .unwrap()
126                            .keys()
127                            .map(|j| j.to_string())
128                            .collect();
129                        if jobs_slurm_ids.is_empty() {
130                            if done_submitting.load(Ordering::Acquire) {
131                                break;
132                            }
133                        } else {
134                            let ids_to_query = jobs_slurm_ids.join(",");
135                            dbg!(&ids_to_query);
136                            let sacct_cmd = Command::new("sacct")
137                                .arg("-j")
138                                .arg(ids_to_query)
139                                .arg("--noheader")
140                                .arg("--parsable")
141                                .arg("--format=JobId,State,Elapsed")
142                                .output()
143                                .unwrap();
144                            if !sacct_cmd.status.success() {
145                                panic!()
146                            }
147                            // Consider that all jobs will indeed be present
148                            for l in std::str::from_utf8(&sacct_cmd.stdout).unwrap().lines() {
149                                let cap = re.captures(l).unwrap();
150                                if cap.get(2).is_some() {
151                                    continue;
152                                }
153                                let jobid = cap.get(1).unwrap().as_str();
154                                let job_status = cap.get(3).unwrap().as_str();
155                                let elapsed = cap.get(4).unwrap().as_str();
156                                match job_status {
157                                    "PENDING" | "CONFIGURING" | "COMPLETING" | "RUNNING"
158                                    | "SUSPENDED" | "PREEMPTED" => {
159                                        //do nothing, the job is still running
160                                    }
161                                    "COMPLETED" => {
162                                        let graph_idx = submitted_jobs
163                                            .lock()
164                                            .unwrap()
165                                            .remove(&jobid.parse().unwrap())
166                                            .unwrap();
167                                        {
168                                            let mut lock = stdout.lock();
169                                            writeln!(
170                                                lock,
171                                                "Finished job: {:?}. Elapsed time: {}\n",
172                                                jobid, elapsed
173                                            )
174                                            .unwrap();
175                                        }
176                                        sndr.send(ExecutorResult {
177                                            job_idx: graph_idx,
178                                            result: Ok(()),
179                                        })
180                                        .unwrap();
181                                        submitted_jobs
182                                            .lock()
183                                            .unwrap()
184                                            .remove(&jobid.parse().unwrap());
185                                        currently_submitted.fetch_sub(1, Ordering::Acquire);
186                                        for t in &submitting_threads {
187                                            t.unpark();
188                                        }
189                                    }
190                                    s  => {
191                                        let graph_idx = submitted_jobs
192                                            .lock()
193                                            .unwrap()
194                                            .remove(&jobid.parse().unwrap())
195                                            .unwrap();
196                                        sndr.send(ExecutorResult {
197                                            job_idx: graph_idx,
198                                            result: Err(format!(
199                                                "Execution failed on the cluster. Slurm jobid: {}. State: {}",
200                                                jobid,
201                                                s
202                                            )),
203                                        })
204                                        .unwrap();
205                                        submitted_jobs
206                                            .lock()
207                                            .unwrap()
208                                            .remove(&jobid.parse().unwrap());
209                                    }
210                                }
211                            }
212                        }
213                        rate_limiter.sleep();
214                    }
215                })
216                .unwrap()
217        };
218        (
219            SlurmExecutor {
220                submitting_threads,
221                status_polling_thread,
222            },
223            to_run_chan.0,
224            done_chan.0,
225            done_chan.1,
226        )
227    }
228}
229
230pub struct SlurmExecutor {
231    submitting_threads: Vec<JoinHandle<()>>,
232    status_polling_thread: JoinHandle<()>,
233}
234
235impl Executor for SlurmExecutor {
236    fn join(self) -> Vec<std::thread::Result<()>>  {
237        let mut res = Vec::with_capacity(self.submitting_threads.len() + 1);
238        res.push(self.status_polling_thread.join());
239        res.extend(self.submitting_threads.into_iter().map(|jh| jh.join()));
240        res
241    }
242}