captain_workflow_manager/executor/
slurm_executor.rs1use std::{
2 collections::HashMap,
3 sync::{
4 atomic::{AtomicBool, AtomicU64, Ordering},
5 Arc, Mutex,
6 },
7};
8
9use regex::Regex;
10
11use crate::ExpandedDescr;
12
13use super::*;
14
15pub struct SlurmExecutorBuilder {
16 pub max_jobs: u64,
17}
18
19impl ExecutorBuilder for SlurmExecutorBuilder {
20 type Executor = SlurmExecutor;
21
22 fn init<J: JobUnit>(
23 self,
24 ) -> (
25 Self::Executor,
26 Sender<ToRun<J>>,
27 Sender<ExecutorResult>,
28 Receiver<ExecutorResult>,
29 ) {
30 let to_run_chan = crossbeam_channel::unbounded::<ToRun<J>>();
31 let done_chan = crossbeam_channel::unbounded();
32 let submitted_jobs = Arc::new(Mutex::new(HashMap::new()));
33 let done_submitting = Arc::new(AtomicBool::new(false));
34 let currently_submitted = Arc::new(AtomicU64::new(0));
35 let mut submitting_threads = Vec::new();
38 for _i in 0..32 {
39 let submitting_thread = {
40 let rcvr = to_run_chan.1.clone();
41 let done_sndr = done_chan.0.clone();
42 let submitted_jobs_ptr = submitted_jobs.clone();
43 let done_submitting = done_submitting.clone();
44 let currently_submitted = currently_submitted.clone();
45 let max_jobs = self.max_jobs;
46 thread::Builder::new()
47 .name("submitting_thread".into())
48 .spawn(move || {
49 for to_run in rcvr.into_iter() {
50 while currently_submitted.load(Ordering::Acquire) >= max_jobs {
51 thread::park();
52 }
53 let j_idx = to_run.job_idx;
54 let stdout = io::stdout();
55 {
56 let mut lock = stdout.lock();
57 writeln!(lock, "Submitting job: {:?}", to_run.job_unit).unwrap();
58 write!(lock, "Submitting job: {:#?} interpreted as: {:#?} ", to_run.job_unit, ExpandedDescr(to_run.job_unit())).unwrap();
59 writeln!(lock, "\n").unwrap();
60 }
61 super::create_out_dir(&to_run.job_unit);
62 let mut sbatch_cmd = Command::new("sbatch");
63 sbatch_cmd.arg("-J")
64 .arg(to_run.job_unit.name())
65 .arg("--partition=common,dedicated,inbio")
66 .arg("--exclude=maestro-1101")
68 .arg("--qos=fast")
69 .arg(format!("--nice={}",to_run.job_unit.nice()))
70 .arg("--parsable");
71 let sbatch_cmd = match to_run.job_unit.log_file() {
72 Some(p) => {sbatch_cmd.arg("--output").arg(p)}
73 None => {sbatch_cmd.arg("--output=slurm-log-snakemake.out")
74 .arg("--open-mode=append")}
75 }.arg("--wrap")
76 .arg(to_run.job_unit.cmd().unwrap())
77 .output()
78 .unwrap();
79 if sbatch_cmd.status.success() {
80 currently_submitted.fetch_add(1, Ordering::AcqRel);
81 let slurm_job_id: u64 = std::str::from_utf8(&sbatch_cmd.stdout)
82 .unwrap()
83 .trim()
84 .parse()
85 .unwrap();
86 submitted_jobs_ptr
87 .lock()
88 .unwrap()
89 .insert(slurm_job_id, j_idx);
90 } else {
91 done_sndr
92 .send(ExecutorResult {
93 job_idx: j_idx,
94 result: Err(format!(
95 "Could not queue job. Exited with status {:?} and stderr:\n{}",
96 sbatch_cmd.status.code(),
97 String::from_utf8_lossy(&sbatch_cmd.stderr)
98 )),
99 })
100 .unwrap();
101 }
102 }
103 done_submitting.store(true, Ordering::Release);
104 })
105 .unwrap()
106 };
107 submitting_threads.push(submitting_thread);
108 }
109 let status_polling_thread = {
110 let sndr = done_chan.0.clone();
111 let submitting_threads: Vec<_> = submitting_threads
112 .iter()
113 .map(|jh| jh.thread().clone())
114 .collect();
115 let currently_submitted = currently_submitted;
116 thread::Builder::new()
117 .name("status_polling_thread".into())
118 .spawn(move || {
119 let stdout = io::stdout();
120 let re = Regex::new(r"(\d+)(\.[^\|]+)?\|([^\|]+)\|([^\|]+)\|").unwrap();
121 let mut rate_limiter = fence::Fence::from_millis(100);
122 loop {
123 let jobs_slurm_ids: Vec<_> = submitted_jobs
124 .lock()
125 .unwrap()
126 .keys()
127 .map(|j| j.to_string())
128 .collect();
129 if jobs_slurm_ids.is_empty() {
130 if done_submitting.load(Ordering::Acquire) {
131 break;
132 }
133 } else {
134 let ids_to_query = jobs_slurm_ids.join(",");
135 dbg!(&ids_to_query);
136 let sacct_cmd = Command::new("sacct")
137 .arg("-j")
138 .arg(ids_to_query)
139 .arg("--noheader")
140 .arg("--parsable")
141 .arg("--format=JobId,State,Elapsed")
142 .output()
143 .unwrap();
144 if !sacct_cmd.status.success() {
145 panic!()
146 }
147 for l in std::str::from_utf8(&sacct_cmd.stdout).unwrap().lines() {
149 let cap = re.captures(l).unwrap();
150 if cap.get(2).is_some() {
151 continue;
152 }
153 let jobid = cap.get(1).unwrap().as_str();
154 let job_status = cap.get(3).unwrap().as_str();
155 let elapsed = cap.get(4).unwrap().as_str();
156 match job_status {
157 "PENDING" | "CONFIGURING" | "COMPLETING" | "RUNNING"
158 | "SUSPENDED" | "PREEMPTED" => {
159 }
161 "COMPLETED" => {
162 let graph_idx = submitted_jobs
163 .lock()
164 .unwrap()
165 .remove(&jobid.parse().unwrap())
166 .unwrap();
167 {
168 let mut lock = stdout.lock();
169 writeln!(
170 lock,
171 "Finished job: {:?}. Elapsed time: {}\n",
172 jobid, elapsed
173 )
174 .unwrap();
175 }
176 sndr.send(ExecutorResult {
177 job_idx: graph_idx,
178 result: Ok(()),
179 })
180 .unwrap();
181 submitted_jobs
182 .lock()
183 .unwrap()
184 .remove(&jobid.parse().unwrap());
185 currently_submitted.fetch_sub(1, Ordering::Acquire);
186 for t in &submitting_threads {
187 t.unpark();
188 }
189 }
190 s => {
191 let graph_idx = submitted_jobs
192 .lock()
193 .unwrap()
194 .remove(&jobid.parse().unwrap())
195 .unwrap();
196 sndr.send(ExecutorResult {
197 job_idx: graph_idx,
198 result: Err(format!(
199 "Execution failed on the cluster. Slurm jobid: {}. State: {}",
200 jobid,
201 s
202 )),
203 })
204 .unwrap();
205 submitted_jobs
206 .lock()
207 .unwrap()
208 .remove(&jobid.parse().unwrap());
209 }
210 }
211 }
212 }
213 rate_limiter.sleep();
214 }
215 })
216 .unwrap()
217 };
218 (
219 SlurmExecutor {
220 submitting_threads,
221 status_polling_thread,
222 },
223 to_run_chan.0,
224 done_chan.0,
225 done_chan.1,
226 )
227 }
228}
229
230pub struct SlurmExecutor {
231 submitting_threads: Vec<JoinHandle<()>>,
232 status_polling_thread: JoinHandle<()>,
233}
234
235impl Executor for SlurmExecutor {
236 fn join(self) -> Vec<std::thread::Result<()>> {
237 let mut res = Vec::with_capacity(self.submitting_threads.len() + 1);
238 res.push(self.status_polling_thread.join());
239 res.extend(self.submitting_threads.into_iter().map(|jh| jh.join()));
240 res
241 }
242}