wdl_engine/backend/slurm_apptainer.rs
1//! Experimental Slurm + Apptainer (aka Singularity) task execution backend.
2//!
3//! This experimental backend submits each task as an Slurm job which invokes
4//! Apptainer to provide the appropriate container environment for the WDL
5//! command to execute.
6//!
7//! Due to the difficulty of getting a Slurm test cluster spun up, and limited
8//! ability to install Apptainer locally or in CI, this is currently tested by
9//! hand; expect (and report) bugs! In follow-up work, we hope to build a
10//! limited test suite based on mocking CLI invocations and/or golden testing of
11//! generated `srun`/`apptainer` scripts.
12
13use std::collections::HashMap;
14use std::path::Path;
15use std::process::Stdio;
16use std::sync::Arc;
17
18use anyhow::Context as _;
19use anyhow::anyhow;
20use anyhow::bail;
21use anyhow::ensure;
22use bytesize::ByteSize;
23use crankshaft::events::Event;
24use nonempty::NonEmpty;
25use tokio::fs::File;
26use tokio::fs::{self};
27use tokio::io::AsyncBufReadExt;
28use tokio::io::BufReader;
29use tokio::process::Command;
30use tokio::sync::broadcast;
31use tokio_util::sync::CancellationToken;
32use tracing::debug;
33use tracing::error;
34use tracing::trace;
35use tracing::warn;
36
37use super::ApptainerConfig;
38use super::ApptainerState;
39use super::TaskExecutionBackend;
40use super::TaskManager;
41use super::TaskManagerRequest;
42use super::TaskSpawnRequest;
43use crate::ONE_GIBIBYTE;
44use crate::PrimitiveValue;
45use crate::TaskExecutionResult;
46use crate::Value;
47use crate::config::Config;
48use crate::config::TaskResourceLimitBehavior;
49use crate::path::EvaluationPath;
50use crate::v1;
51
52/// The name of the file where the Apptainer command invocation will be written.
53const APPTAINER_COMMAND_FILE_NAME: &str = "apptainer_command";
54
55/// The root guest path for inputs.
56const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs/";
57
58/// The maximum length of a Slurm job name.
59// TODO ACF 2025-10-13: I worked this out experimentally on the cluster I happen
60// to have access to. I do not know whether this translates to other Slurm
61// installations, and cannot find documentation about what this limit should be
62// or whether it's configurable.
63const SLURM_JOB_NAME_MAX_LENGTH: usize = 1024;
64
65/// A request to execute a task on a Slurm + Apptainer backend.
66#[derive(Debug)]
67struct SlurmApptainerTaskRequest {
68 /// The desired configuration of the backend.
69 backend_config: Arc<SlurmApptainerBackendConfig>,
70 /// The Apptainer state for the backend,
71 apptainer_state: Arc<ApptainerState>,
72 /// The name of the task, potentially truncated to fit within the Slurm job
73 /// name length limit.
74 name: String,
75 /// The task spawn request.
76 spawn_request: TaskSpawnRequest,
77 /// The requested container for the task.
78 container: String,
79 /// The requested CPU reservation for the task.
80 required_cpu: f64,
81 /// The requested memory reservation for the task.
82 required_memory: ByteSize,
83 /// The broadcast channel to update interested parties with the status of
84 /// executing tasks.
85 ///
86 /// This backend does not yet take advantage of the full Crankshaft
87 /// machinery, but we send rudimentary messages on this channel which helps
88 /// with UI presentation.
89 crankshaft_events: Option<broadcast::Sender<Event>>,
90 /// The cancellation token for this task execution request.
91 cancellation_token: CancellationToken,
92}
93
94impl TaskManagerRequest for SlurmApptainerTaskRequest {
95 fn cpu(&self) -> f64 {
96 self.required_cpu
97 }
98
99 fn memory(&self) -> u64 {
100 self.required_memory.as_u64()
101 }
102
103 async fn run(self) -> anyhow::Result<super::TaskExecutionResult> {
104 let crankshaft_task_id = crankshaft::events::next_task_id();
105
106 let attempt_dir = self.spawn_request.attempt_dir();
107
108 // Create the host directory that will be mapped to the WDL working directory.
109 let wdl_work_dir = self.spawn_request.wdl_work_dir_host_path();
110 fs::create_dir_all(&wdl_work_dir).await.with_context(|| {
111 format!(
112 "failed to create WDL working directory `{path}`",
113 path = wdl_work_dir.display()
114 )
115 })?;
116
117 // Create an empty file for the WDL command's stdout.
118 let wdl_stdout_path = self.spawn_request.wdl_stdout_host_path();
119 let _ = File::create(&wdl_stdout_path).await.with_context(|| {
120 format!(
121 "failed to create WDL stdout file `{path}`",
122 path = wdl_stdout_path.display()
123 )
124 })?;
125
126 // Create an empty file for the WDL command's stderr.
127 let wdl_stderr_path = self.spawn_request.wdl_stderr_host_path();
128 let _ = File::create(&wdl_stderr_path).await.with_context(|| {
129 format!(
130 "failed to create WDL stderr file `{path}`",
131 path = wdl_stderr_path.display()
132 )
133 })?;
134
135 // Write the evaluated WDL command section to a host file.
136 let wdl_command_path = self.spawn_request.wdl_command_host_path();
137 fs::write(&wdl_command_path, self.spawn_request.command())
138 .await
139 .with_context(|| {
140 format!(
141 "failed to write WDL command contents to `{path}`",
142 path = wdl_command_path.display()
143 )
144 })?;
145 #[cfg(unix)]
146 tokio::fs::set_permissions(
147 &wdl_command_path,
148 <std::fs::Permissions as std::os::unix::fs::PermissionsExt>::from_mode(0o770),
149 )
150 .await?;
151
152 let apptainer_command = self
153 .apptainer_state
154 .prepare_apptainer_command(
155 &self.container,
156 self.cancellation_token.clone(),
157 &self.spawn_request,
158 )
159 .await?;
160
161 let apptainer_command_path = attempt_dir.join(APPTAINER_COMMAND_FILE_NAME);
162 fs::write(&apptainer_command_path, apptainer_command)
163 .await
164 .with_context(|| {
165 format!(
166 "failed to write Apptainer command file `{}`",
167 apptainer_command_path.display()
168 )
169 })?;
170 #[cfg(unix)]
171 tokio::fs::set_permissions(
172 &apptainer_command_path,
173 <std::fs::Permissions as std::os::unix::fs::PermissionsExt>::from_mode(0o770),
174 )
175 .await?;
176
177 // The path for the Slurm-level stdout and stderr. This primarily contains the
178 // job report, as we redirect Apptainer and WDL output separately.
179 let slurm_stdout_path = attempt_dir.join("slurm.stdout");
180 let slurm_stderr_path = attempt_dir.join("slurm.stderr");
181
182 let mut sbatch_command = Command::new("sbatch");
183
184 // If a Slurm partition has been configured, specify it. Otherwise, the job will
185 // end up on the cluster's default partition.
186 if let Some(partition) = self.backend_config.slurm_partition_for_task(
187 self.spawn_request.requirements(),
188 self.spawn_request.hints(),
189 ) {
190 sbatch_command.arg("--partition").arg(partition.name());
191 }
192
193 // If GPUs are required, use the gpu helper to determine the count and pass
194 // it to `sbatch` via `--gpus-per-task`.
195 if let Some(gpu_count) = v1::gpu(
196 self.spawn_request.requirements(),
197 self.spawn_request.hints(),
198 ) {
199 sbatch_command.arg(format!("--gpus-per-task={gpu_count}"));
200 }
201
202 // Add any user-configured extra arguments.
203 if let Some(args) = &self.backend_config.extra_sbatch_args {
204 sbatch_command.args(args);
205 }
206
207 sbatch_command
208 // Use verbose output that we can check later on
209 .arg("-v")
210 // Keep `sbatch` running until the job terminates
211 .arg("--wait")
212 // Pipe stdout and stderr so we can identify when a job begins, and can trace any other
213 // output. This should just be the `sbatch` verbose output on stderr.
214 .stdout(Stdio::piped())
215 .stderr(Stdio::piped())
216 // Name the Slurm job after the task ID, which has already been shortened to fit into
217 // the Slurm requirements.
218 .arg("--job-name")
219 .arg(&self.name)
220 // Send Slurm job stdout and stderr streams to these files. Since we redirect the
221 // Apptainer invocation's stdio to separate files, this will typically not contain
222 // anything, but can be useful for debugging if the scripts get modified.
223 .arg("-o")
224 .arg(slurm_stdout_path)
225 .arg("-e")
226 .arg(slurm_stderr_path)
227 // An explicit task count is required for some options
228 .arg("--ntasks=1")
229 // CPU request is rounded up to the nearest whole CPU
230 .arg(format!(
231 "--cpus-per-task={}",
232 self.required_cpu.ceil() as u64
233 ))
234 // Memory request is specified per node in mebibytes; we round the request up to the
235 // next mebibyte.
236 //
237 // Note that the Slurm documentation says "megabyte" (i.e., the base-10 unit), but the
238 // other explanations of the unit suffixes in the first-party documentation show the use
239 // of base-2 units, and multiple third-party sources available through online searches
240 // back the base-2 interpretation, for example:
241 //
242 // https://info.nrao.edu/computing/guide/cluster-processing/appendix/memory-options
243 // https://wcmscu.atlassian.net/wiki/spaces/WIKI/pages/327731/Using+Slurm
244 .arg(format!(
245 "--mem={}M",
246 (self.required_memory.as_u64() as f64 / bytesize::MIB as f64).ceil() as u64
247 ))
248 .arg(apptainer_command_path);
249
250 debug!(?sbatch_command, "spawning `sbatch` command");
251
252 let mut sbatch_child = sbatch_command.spawn()?;
253
254 crankshaft::events::send_event!(
255 self.crankshaft_events,
256 crankshaft::events::Event::TaskCreated {
257 id: crankshaft_task_id,
258 name: self.name.clone(),
259 tes_id: None,
260 token: self.cancellation_token.clone(),
261 },
262 );
263
264 // Take the stdio pipes from the child process and consume them for event
265 // reporting and tracing purposes.
266 //
267 // TODO ACF 2025-10-13: generate `sbatch`-compatible scripts instead and use a
268 // polling mechanism to watch for job status changes? `squeue` can emit
269 // json suitable for this.
270 let sbatch_stdout = sbatch_child
271 .stdout
272 .take()
273 .ok_or_else(|| anyhow!("sbatch child stdout missing"))?;
274 let task_name = self.name.clone();
275 let stdout_crankshaft_events = self.crankshaft_events.clone();
276 tokio::spawn(async move {
277 let mut lines = BufReader::new(sbatch_stdout).lines();
278 while let Ok(Some(line)) = lines.next_line().await {
279 // TODO ACF 2025-10-14: `sbatch --wait` even on high verbosity doesn't tell us
280 // when a job has actually started, only when it's been
281 // submitted. Unless we can figure out a way to get that info
282 // out directly, we'll have to set up a separate task to
283 // poll job statuses. For the moment, this is potentially misleading about what
284 // work has actually begun computation.
285 if line.starts_with("Submitted batch job") {
286 crankshaft::events::send_event!(
287 stdout_crankshaft_events,
288 crankshaft::events::Event::TaskStarted {
289 id: crankshaft_task_id
290 },
291 );
292 }
293 trace!(stdout = line, task_name);
294 }
295 });
296 let sbatch_stderr = sbatch_child
297 .stderr
298 .take()
299 .ok_or_else(|| anyhow!("sbatch child stderr missing"))?;
300 let task_name = self.name.clone();
301 tokio::spawn(async move {
302 let mut lines = BufReader::new(sbatch_stderr).lines();
303 while let Ok(Some(line)) = lines.next_line().await {
304 trace!(stderr = line, task_name);
305 }
306 });
307
308 // Await the result of the `sbatch` command, which will only exit on error or
309 // once the containerized command has completed.
310 let sbatch_result = tokio::select! {
311 _ = self.cancellation_token.cancelled() => {
312 crankshaft::events::send_event!(
313 self.crankshaft_events,
314 crankshaft::events::Event::TaskCanceled {
315 id: crankshaft_task_id
316 },
317 );
318 Err(anyhow!("task execution cancelled"))
319 }
320 result = sbatch_child.wait() => result.map_err(Into::into),
321 }?;
322
323 crankshaft::events::send_event!(
324 self.crankshaft_events,
325 crankshaft::events::Event::TaskCompleted {
326 id: crankshaft_task_id,
327 exit_statuses: NonEmpty::new(sbatch_result),
328 }
329 );
330
331 Ok(TaskExecutionResult {
332 // Under normal circumstances, the exit code of `sbatch --wait` is the exit code of its
333 // command, and the exit code of `apptainer exec` is likewise the exit code of its
334 // command. One potential subtlety/problem here is that if `sbatch` or `apptainer` exit
335 // due to an error before running the WDL command, we could be erroneously ascribing an
336 // exit code to the WDL command.
337 exit_code: sbatch_result
338 .code()
339 .ok_or(anyhow!("task did not return an exit code"))?,
340 work_dir: EvaluationPath::Local(wdl_work_dir),
341 stdout: PrimitiveValue::new_file(
342 wdl_stdout_path
343 .into_os_string()
344 .into_string()
345 .expect("path should be UTF-8"),
346 )
347 .into(),
348 stderr: PrimitiveValue::new_file(
349 wdl_stderr_path
350 .into_os_string()
351 .into_string()
352 .expect("path should be UTF-8"),
353 )
354 .into(),
355 })
356 }
357}
358
359/// The experimental Slurm + Apptainer backend.
360///
361/// See the module-level documentation for details.
362#[derive(Debug)]
363pub struct SlurmApptainerBackend {
364 /// The configuration of the overall engine being executed.
365 engine_config: Arc<Config>,
366 /// The configuration of this backend.
367 backend_config: Arc<SlurmApptainerBackendConfig>,
368 /// The task manager for the backend.
369 manager: TaskManager<SlurmApptainerTaskRequest>,
370 /// Sender for crankshaft events.
371 crankshaft_events: Option<broadcast::Sender<Event>>,
372 /// Apptainer state.
373 apptainer_state: Arc<ApptainerState>,
374}
375
376impl SlurmApptainerBackend {
377 /// Create a new backend.
378 pub fn new(
379 run_root_dir: &Path,
380 engine_config: Arc<Config>,
381 backend_config: Arc<SlurmApptainerBackendConfig>,
382 crankshaft_events: Option<broadcast::Sender<Event>>,
383 ) -> Self {
384 let apptainer_state =
385 ApptainerState::new(&backend_config.apptainer_config, run_root_dir).into();
386 Self {
387 engine_config,
388 backend_config,
389 // TODO ACF 2025-10-13: the `MAX` values here mean that in addition to not limiting the
390 // overall number of CPU and memory used, we don't limit per-task consumption. There is
391 // potentially a path to pulling partition limits from Slurm for these, but for now we
392 // just throw jobs at the cluster.
393 manager: TaskManager::new_unlimited(u64::MAX, u64::MAX),
394 crankshaft_events,
395 apptainer_state,
396 }
397 }
398}
399
400impl TaskExecutionBackend for SlurmApptainerBackend {
401 fn max_concurrency(&self) -> u64 {
402 self.backend_config.max_scatter_concurrency
403 }
404
405 fn constraints(
406 &self,
407 requirements: &HashMap<String, Value>,
408 hints: &HashMap<String, crate::Value>,
409 ) -> anyhow::Result<super::TaskExecutionConstraints> {
410 let mut required_cpu = v1::cpu(requirements);
411 let mut required_memory = ByteSize::b(v1::memory(requirements)? as u64);
412
413 // Determine whether CPU or memory limits are set for this partition, and clamp
414 // or deny them as appropriate if the limits are exceeded
415 //
416 // TODO ACF 2025-10-16: refactor so that we're not duplicating logic here (for
417 // the in-WDL `task` values) and below in `spawn` (for the actual
418 // resource request)
419 if let Some(partition) = self
420 .backend_config
421 .slurm_partition_for_task(requirements, hints)
422 {
423 if let Some(max_cpu) = partition.max_cpu_per_task()
424 && required_cpu > max_cpu as f64
425 {
426 let env_specific = if self.engine_config.suppress_env_specific_output {
427 String::new()
428 } else {
429 format!(", but the execution backend has a maximum of {max_cpu}",)
430 };
431 match self.engine_config.task.cpu_limit_behavior {
432 TaskResourceLimitBehavior::TryWithMax => {
433 warn!(
434 "task requires at least {required_cpu} CPU{s}{env_specific}",
435 s = if required_cpu == 1.0 { "" } else { "s" },
436 );
437 // clamp the reported constraint to what's available
438 required_cpu = max_cpu as f64;
439 }
440 TaskResourceLimitBehavior::Deny => {
441 bail!(
442 "task requires at least {required_cpu} CPU{s}{env_specific}",
443 s = if required_cpu == 1.0 { "" } else { "s" },
444 );
445 }
446 }
447 }
448 if let Some(max_memory) = partition.max_memory_per_task()
449 && required_memory > max_memory
450 {
451 let env_specific = if self.engine_config.suppress_env_specific_output {
452 String::new()
453 } else {
454 format!(
455 ", but the execution backend has a maximum of {max_memory} GiB",
456 max_memory = max_memory.as_u64() as f64 / ONE_GIBIBYTE
457 )
458 };
459 match self.engine_config.task.memory_limit_behavior {
460 TaskResourceLimitBehavior::TryWithMax => {
461 warn!(
462 "task requires at least {required_memory} GiB of memory{env_specific}",
463 required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
464 );
465 // clamp the reported constraint to what's available
466 required_memory = max_memory;
467 }
468 TaskResourceLimitBehavior::Deny => {
469 bail!(
470 "task requires at least {required_memory} GiB of memory{env_specific}",
471 required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
472 );
473 }
474 }
475 }
476 }
477 Ok(super::TaskExecutionConstraints {
478 container: Some(
479 v1::container(requirements, self.engine_config.task.container.as_deref())
480 .into_owned(),
481 ),
482 // TODO ACF 2025-10-13: populate more meaningful values for these based on the given
483 // Slurm partition.
484 //
485 // sinfo -p <partition> -s --json | jq .sinfo[0].cpus
486 // sinfo -p <partition> -s --json | jq .sinfo[0].memory
487 cpu: required_cpu,
488 memory: required_memory.as_u64().try_into().unwrap_or(i64::MAX),
489 // TODO ACF 2025-10-16: these are almost certainly wrong
490 gpu: Default::default(),
491 fpga: Default::default(),
492 disks: Default::default(),
493 })
494 }
495
496 fn guest_inputs_dir(&self) -> Option<&'static str> {
497 Some(GUEST_INPUTS_DIR)
498 }
499
500 fn needs_local_inputs(&self) -> bool {
501 true
502 }
503
504 fn spawn(
505 &self,
506 request: TaskSpawnRequest,
507 cancellation_token: CancellationToken,
508 ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<TaskExecutionResult>>> {
509 let (completed_tx, completed_rx) = tokio::sync::oneshot::channel();
510
511 let requirements = request.requirements();
512 let hints = request.hints();
513
514 let container =
515 v1::container(requirements, self.engine_config.task.container.as_deref()).into_owned();
516
517 let mut required_cpu = v1::cpu(requirements);
518 let mut required_memory = ByteSize::b(v1::memory(requirements)? as u64);
519
520 // Determine whether CPU or memory limits are set for this partition, and clamp
521 // or deny them as appropriate if the limits are exceeded
522 //
523 // TODO ACF 2025-10-16: refactor so that we're not duplicating logic here (for
524 // the in-WDL `task` values) and below in `spawn` (for the actual
525 // resource request)
526 if let Some(partition) = self
527 .backend_config
528 .slurm_partition_for_task(requirements, hints)
529 {
530 if let Some(max_cpu) = partition.max_cpu_per_task()
531 && required_cpu > max_cpu as f64
532 {
533 let env_specific = if self.engine_config.suppress_env_specific_output {
534 String::new()
535 } else {
536 format!(", but the execution backend has a maximum of {max_cpu}",)
537 };
538 match self.engine_config.task.cpu_limit_behavior {
539 TaskResourceLimitBehavior::TryWithMax => {
540 warn!(
541 "task requires at least {required_cpu} CPU{s}{env_specific}",
542 s = if required_cpu == 1.0 { "" } else { "s" },
543 );
544 // clamp the reported constraint to what's available
545 required_cpu = max_cpu as f64;
546 }
547 TaskResourceLimitBehavior::Deny => {
548 bail!(
549 "task requires at least {required_cpu} CPU{s}{env_specific}",
550 s = if required_cpu == 1.0 { "" } else { "s" },
551 );
552 }
553 }
554 }
555 if let Some(max_memory) = partition.max_memory_per_task()
556 && required_memory > max_memory
557 {
558 let env_specific = if self.engine_config.suppress_env_specific_output {
559 String::new()
560 } else {
561 format!(
562 ", but the execution backend has a maximum of {max_memory} GiB",
563 max_memory = max_memory.as_u64() as f64 / ONE_GIBIBYTE
564 )
565 };
566 match self.engine_config.task.memory_limit_behavior {
567 TaskResourceLimitBehavior::TryWithMax => {
568 warn!(
569 "task requires at least {required_memory} GiB of memory{env_specific}",
570 required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
571 );
572 // clamp the reported constraint to what's available
573 required_memory = max_memory;
574 }
575 TaskResourceLimitBehavior::Deny => {
576 bail!(
577 "task requires at least {required_memory} GiB of memory{env_specific}",
578 required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
579 );
580 }
581 }
582 }
583 }
584
585 // TODO ACF 2025-10-23: investigate whether Slurm offers hard vs soft limits for
586 // CPU and memory
587 let _max_cpu = v1::max_cpu(hints);
588 let _max_memory = v1::max_memory(hints)?.map(|i| i as u64);
589
590 // Truncate the request ID to fit in the Slurm job name length limit.
591 let request_id = request.id();
592 let name = if request_id.len() > SLURM_JOB_NAME_MAX_LENGTH {
593 request_id
594 .chars()
595 .take(SLURM_JOB_NAME_MAX_LENGTH)
596 .collect::<String>()
597 } else {
598 request_id.to_string()
599 };
600
601 self.manager.send(
602 SlurmApptainerTaskRequest {
603 backend_config: self.backend_config.clone(),
604 apptainer_state: self.apptainer_state.clone(),
605 spawn_request: request,
606 name,
607 container,
608 required_cpu,
609 required_memory,
610 crankshaft_events: self.crankshaft_events.clone(),
611 cancellation_token,
612 },
613 completed_tx,
614 );
615
616 Ok(completed_rx)
617 }
618
619 fn cleanup<'a>(
620 &'a self,
621 _work_dir: &'a EvaluationPath,
622 _token: CancellationToken,
623 ) -> Option<futures::future::BoxFuture<'a, ()>> {
624 // TODO ACF 2025-09-11: determine whether we need cleanup logic here;
625 // Apptainer's security model is fairly different from Docker so
626 // uid/gids on files shouldn't be as much of an issue, and using only
627 // `apptainer exec` means no longer-running containers to tear down
628 None
629 }
630}
631
632/// Configuration for a Slurm partition.
633///
634/// Each partition can optionally have per-task CPU and memory limits set so
635/// that tasks which are too large to be scheduled on that partition will fail
636/// immediately instead of pending indefinitely. In the future, these limits may
637/// be populated or validated by live information from the cluster, but
638/// for now they must be manually based on the user's understanding of the
639/// cluster configuration.
640#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
641pub struct SlurmPartitionConfig {
642 /// The name of the partition; this is the string passed to `sbatch
643 /// --partition=<partition_name>`.
644 name: String,
645 /// The maximum number of CPUs this partition can provision for a single
646 /// task.
647 max_cpu_per_task: Option<u64>,
648 /// The maximum memory this partition can provision for a single task.
649 max_memory_per_task: Option<ByteSize>,
650}
651
652impl SlurmPartitionConfig {
653 /// Create a [`SlurmPartitionConfig`].
654 pub fn new(
655 name: String,
656 max_cpu_per_task: Option<u64>,
657 max_memory_per_task: Option<ByteSize>,
658 ) -> Self {
659 Self {
660 name,
661 max_cpu_per_task,
662 max_memory_per_task,
663 }
664 }
665
666 /// The name of the partition; this is the string passed to `sbatch
667 /// --partition=<partition_name>`.
668 pub fn name(&self) -> &str {
669 &self.name
670 }
671
672 /// The maximum number of CPUs this partition can provision for a single
673 /// task.
674 pub fn max_cpu_per_task(&self) -> Option<u64> {
675 self.max_cpu_per_task
676 }
677
678 /// The maximum memory this partition can provision for a single task.
679 pub fn max_memory_per_task(&self) -> Option<ByteSize> {
680 self.max_memory_per_task
681 }
682
683 /// Validate that this Slurm partition exists according to the local
684 /// `sinfo`.
685 async fn validate(&self, name: &str) -> Result<(), anyhow::Error> {
686 let partition = self.name();
687 ensure!(
688 !partition.is_empty(),
689 "{name}_slurm_partition name cannot be empty"
690 );
691 if let Some(max_cpu_per_task) = self.max_cpu_per_task() {
692 ensure!(
693 max_cpu_per_task > 0,
694 "{name}_slurm_partition `{partition}` must allow at least 1 CPU to be provisioned"
695 );
696 }
697 if let Some(max_memory_per_task) = self.max_memory_per_task() {
698 ensure!(
699 max_memory_per_task.as_u64() > 0,
700 "{name}_slurm_partition `{partition}` must allow at least some memory to be \
701 provisioned"
702 );
703 }
704 match tokio::time::timeout(
705 // 10 seconds is rather arbitrary; `scontrol` ordinarily returns extremely quickly, but
706 // we don't want things to run away on a misconfigured system
707 std::time::Duration::from_secs(10),
708 Command::new("scontrol")
709 .arg("show")
710 .arg("partition")
711 .arg(partition)
712 .output(),
713 )
714 .await
715 {
716 Ok(output) => {
717 let output = output.context("validating Slurm partition")?;
718 if !output.status.success() {
719 let stdout = String::from_utf8_lossy(&output.stdout);
720 let stderr = String::from_utf8_lossy(&output.stderr);
721 error!(%stdout, %stderr, %partition, "failed to validate {name}_slurm_partition");
722 Err(anyhow!(
723 "failed to validate {name}_slurm_partition `{partition}`"
724 ))
725 } else {
726 Ok(())
727 }
728 }
729 Err(_) => Err(anyhow!(
730 "timed out trying to validate {name}_slurm_partition `{partition}`"
731 )),
732 }
733 }
734}
735
736/// Configuration for the Slurm + Apptainer backend.
737// TODO ACF 2025-09-23: add a Apptainer/Singularity mode config that switches around executable
738// name, env var names, etc.
739#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
740pub struct SlurmApptainerBackendConfig {
741 /// Which partition, if any, to specify when submitting normal jobs to
742 /// Slurm.
743 ///
744 /// This may be superseded by
745 /// [`short_task_slurm_partition`][Self::short_task_slurm_partition],
746 /// [`gpu_slurm_partition`][Self::gpu_slurm_partition], or
747 /// [`fpga_slurm_partition`][Self::fpga_slurm_partition] for corresponding
748 /// tasks.
749 pub default_slurm_partition: Option<SlurmPartitionConfig>,
750 /// Which partition, if any, to specify when submitting [short
751 /// tasks](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#short_task) to Slurm.
752 ///
753 /// This may be superseded by
754 /// [`gpu_slurm_partition`][Self::gpu_slurm_partition] or
755 /// [`fpga_slurm_partition`][Self::fpga_slurm_partition] for tasks which
756 /// require specialized hardware.
757 pub short_task_slurm_partition: Option<SlurmPartitionConfig>,
758 /// Which partition, if any, to specify when submitting [tasks which require
759 /// a GPU](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#hardware-accelerators-gpu-and--fpga)
760 /// to Slurm.
761 pub gpu_slurm_partition: Option<SlurmPartitionConfig>,
762 /// Which partition, if any, to specify when submitting [tasks which require
763 /// an FPGA](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#hardware-accelerators-gpu-and--fpga)
764 /// to Slurm.
765 pub fpga_slurm_partition: Option<SlurmPartitionConfig>,
766 /// Additional command-line arguments to pass to `sbatch` when submitting
767 /// jobs to Slurm.
768 pub extra_sbatch_args: Option<Vec<String>>,
769 /// The maximum number of scatter subtasks that can be evaluated
770 /// concurrently.
771 ///
772 /// By default, this is 200.
773 #[serde(default = "default_max_scatter_concurrency")]
774 pub max_scatter_concurrency: u64,
775 /// The configuration of Apptainer, which is used as the container runtime
776 /// on the compute nodes where Slurm dispatches tasks.
777 ///
778 /// Note that this will likely be replaced by an abstraction over multiple
779 /// container execution runtimes in the future, rather than being
780 /// hardcoded to Apptainer.
781 #[serde(default)]
782 // TODO ACF 2025-10-16: temporarily flatten this into the overall config so that it doesn't
783 // break existing serialized configs. We'll save breaking the config file format for when we
784 // actually have meaningful composition of in-place runtimes.
785 #[serde(flatten)]
786 pub apptainer_config: ApptainerConfig,
787}
788
789fn default_max_scatter_concurrency() -> u64 {
790 200
791}
792
793impl Default for SlurmApptainerBackendConfig {
794 fn default() -> Self {
795 Self {
796 default_slurm_partition: None,
797 short_task_slurm_partition: None,
798 gpu_slurm_partition: None,
799 fpga_slurm_partition: None,
800 extra_sbatch_args: None,
801 max_scatter_concurrency: default_max_scatter_concurrency(),
802 apptainer_config: ApptainerConfig::default(),
803 }
804 }
805}
806
807impl SlurmApptainerBackendConfig {
808 /// Validate that the backend is appropriately configured.
809 pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
810 if cfg!(not(unix)) {
811 bail!("Slurm + Apptainer backend is not supported on non-unix platforms");
812 }
813 if !engine_config.experimental_features_enabled {
814 bail!("Slurm + Apptainer backend requires enabling experimental features");
815 }
816
817 // Do what we can to validate options that are dependent on the dynamic
818 // environment. These are a bit fraught, particularly if the behavior of
819 // the external tools changes based on where a job gets dispatched, but
820 // querying from the perspective of the current node allows
821 // us to get better error messages in circumstances typical to a cluster.
822 if let Some(partition) = &self.default_slurm_partition {
823 partition.validate("default").await?;
824 }
825 if let Some(partition) = &self.short_task_slurm_partition {
826 partition.validate("short_task").await?;
827 }
828 if let Some(partition) = &self.gpu_slurm_partition {
829 partition.validate("gpu").await?;
830 }
831 if let Some(partition) = &self.fpga_slurm_partition {
832 partition.validate("fpga").await?;
833 }
834
835 self.apptainer_config.validate().await?;
836
837 Ok(())
838 }
839
840 /// Get the appropriate Slurm partition for a task under this configuration.
841 ///
842 /// Specialized hardware requirements are prioritized over other
843 /// characteristics, with FPGA taking precedence over GPU.
844 fn slurm_partition_for_task(
845 &self,
846 requirements: &HashMap<String, Value>,
847 hints: &HashMap<String, Value>,
848 ) -> Option<&SlurmPartitionConfig> {
849 // TODO ACF 2025-09-26: what's the relationship between this code and
850 // `TaskExecutionConstraints`? Should this be there instead, or be pulling
851 // values from that instead of directly from `requirements` and `hints`?
852
853 // Specialized hardware gets priority.
854 if let Some(partition) = self.fpga_slurm_partition.as_ref()
855 && let Some(true) = requirements
856 .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
857 .and_then(Value::as_boolean)
858 {
859 return Some(partition);
860 }
861
862 if let Some(partition) = self.gpu_slurm_partition.as_ref()
863 && let Some(true) = requirements
864 .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
865 .and_then(Value::as_boolean)
866 {
867 return Some(partition);
868 }
869
870 // Then short tasks.
871 if let Some(partition) = self.short_task_slurm_partition.as_ref()
872 && let Some(true) = hints
873 .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
874 .and_then(Value::as_boolean)
875 {
876 return Some(partition);
877 }
878
879 // Finally the default partition. If this is `None`, `sbatch` gets run without a
880 // partition argument and the cluster's default is used.
881 self.default_slurm_partition.as_ref()
882 }
883}