wdl_engine/backend/
slurm_apptainer.rs

1//! Experimental Slurm + Apptainer (aka Singularity) task execution backend.
2//!
3//! This experimental backend submits each task as an Slurm job which invokes
4//! Apptainer to provide the appropriate container environment for the WDL
5//! command to execute.
6//!
7//! Due to the difficulty of getting a Slurm test cluster spun up, and limited
8//! ability to install Apptainer locally or in CI, this is currently tested by
9//! hand; expect (and report) bugs! In follow-up work, we hope to build a
10//! limited test suite based on mocking CLI invocations and/or golden testing of
11//! generated `srun`/`apptainer` scripts.
12
13use std::collections::HashMap;
14use std::path::Path;
15use std::process::Stdio;
16use std::sync::Arc;
17
18use anyhow::Context as _;
19use anyhow::anyhow;
20use anyhow::bail;
21use anyhow::ensure;
22use bytesize::ByteSize;
23use crankshaft::events::Event;
24use nonempty::NonEmpty;
25use tokio::fs::File;
26use tokio::fs::{self};
27use tokio::io::AsyncBufReadExt;
28use tokio::io::BufReader;
29use tokio::process::Command;
30use tokio::sync::broadcast;
31use tokio_util::sync::CancellationToken;
32use tracing::debug;
33use tracing::error;
34use tracing::trace;
35use tracing::warn;
36
37use super::ApptainerConfig;
38use super::ApptainerState;
39use super::TaskExecutionBackend;
40use super::TaskManager;
41use super::TaskManagerRequest;
42use super::TaskSpawnRequest;
43use crate::ONE_GIBIBYTE;
44use crate::PrimitiveValue;
45use crate::TaskExecutionResult;
46use crate::Value;
47use crate::config::Config;
48use crate::config::TaskResourceLimitBehavior;
49use crate::path::EvaluationPath;
50use crate::v1;
51
52/// The name of the file where the Apptainer command invocation will be written.
53const APPTAINER_COMMAND_FILE_NAME: &str = "apptainer_command";
54
55/// The root guest path for inputs.
56const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs/";
57
58/// The maximum length of a Slurm job name.
59// TODO ACF 2025-10-13: I worked this out experimentally on the cluster I happen
60// to have access to. I do not know whether this translates to other Slurm
61// installations, and cannot find documentation about what this limit should be
62// or whether it's configurable.
63const SLURM_JOB_NAME_MAX_LENGTH: usize = 1024;
64
65/// A request to execute a task on a Slurm + Apptainer backend.
66#[derive(Debug)]
67struct SlurmApptainerTaskRequest {
68    /// The desired configuration of the backend.
69    backend_config: Arc<SlurmApptainerBackendConfig>,
70    /// The Apptainer state for the backend,
71    apptainer_state: Arc<ApptainerState>,
72    /// The name of the task, potentially truncated to fit within the Slurm job
73    /// name length limit.
74    name: String,
75    /// The task spawn request.
76    spawn_request: TaskSpawnRequest,
77    /// The requested container for the task.
78    container: String,
79    /// The requested CPU reservation for the task.
80    required_cpu: f64,
81    /// The requested memory reservation for the task.
82    required_memory: ByteSize,
83    /// The broadcast channel to update interested parties with the status of
84    /// executing tasks.
85    ///
86    /// This backend does not yet take advantage of the full Crankshaft
87    /// machinery, but we send rudimentary messages on this channel which helps
88    /// with UI presentation.
89    crankshaft_events: Option<broadcast::Sender<Event>>,
90    /// The cancellation token for this task execution request.
91    cancellation_token: CancellationToken,
92}
93
94impl TaskManagerRequest for SlurmApptainerTaskRequest {
95    fn cpu(&self) -> f64 {
96        self.required_cpu
97    }
98
99    fn memory(&self) -> u64 {
100        self.required_memory.as_u64()
101    }
102
103    async fn run(self) -> anyhow::Result<super::TaskExecutionResult> {
104        let crankshaft_task_id = crankshaft::events::next_task_id();
105
106        let attempt_dir = self.spawn_request.attempt_dir();
107
108        // Create the host directory that will be mapped to the WDL working directory.
109        let wdl_work_dir = self.spawn_request.wdl_work_dir_host_path();
110        fs::create_dir_all(&wdl_work_dir).await.with_context(|| {
111            format!(
112                "failed to create WDL working directory `{path}`",
113                path = wdl_work_dir.display()
114            )
115        })?;
116
117        // Create an empty file for the WDL command's stdout.
118        let wdl_stdout_path = self.spawn_request.wdl_stdout_host_path();
119        let _ = File::create(&wdl_stdout_path).await.with_context(|| {
120            format!(
121                "failed to create WDL stdout file `{path}`",
122                path = wdl_stdout_path.display()
123            )
124        })?;
125
126        // Create an empty file for the WDL command's stderr.
127        let wdl_stderr_path = self.spawn_request.wdl_stderr_host_path();
128        let _ = File::create(&wdl_stderr_path).await.with_context(|| {
129            format!(
130                "failed to create WDL stderr file `{path}`",
131                path = wdl_stderr_path.display()
132            )
133        })?;
134
135        // Write the evaluated WDL command section to a host file.
136        let wdl_command_path = self.spawn_request.wdl_command_host_path();
137        fs::write(&wdl_command_path, self.spawn_request.command())
138            .await
139            .with_context(|| {
140                format!(
141                    "failed to write WDL command contents to `{path}`",
142                    path = wdl_command_path.display()
143                )
144            })?;
145        #[cfg(unix)]
146        tokio::fs::set_permissions(
147            &wdl_command_path,
148            <std::fs::Permissions as std::os::unix::fs::PermissionsExt>::from_mode(0o770),
149        )
150        .await?;
151
152        let apptainer_command = self
153            .apptainer_state
154            .prepare_apptainer_command(
155                &self.container,
156                self.cancellation_token.clone(),
157                &self.spawn_request,
158            )
159            .await?;
160
161        let apptainer_command_path = attempt_dir.join(APPTAINER_COMMAND_FILE_NAME);
162        fs::write(&apptainer_command_path, apptainer_command)
163            .await
164            .with_context(|| {
165                format!(
166                    "failed to write Apptainer command file `{}`",
167                    apptainer_command_path.display()
168                )
169            })?;
170        #[cfg(unix)]
171        tokio::fs::set_permissions(
172            &apptainer_command_path,
173            <std::fs::Permissions as std::os::unix::fs::PermissionsExt>::from_mode(0o770),
174        )
175        .await?;
176
177        // The path for the Slurm-level stdout and stderr. This primarily contains the
178        // job report, as we redirect Apptainer and WDL output separately.
179        let slurm_stdout_path = attempt_dir.join("slurm.stdout");
180        let slurm_stderr_path = attempt_dir.join("slurm.stderr");
181
182        let mut sbatch_command = Command::new("sbatch");
183
184        // If a Slurm partition has been configured, specify it. Otherwise, the job will
185        // end up on the cluster's default partition.
186        if let Some(partition) = self.backend_config.slurm_partition_for_task(
187            self.spawn_request.requirements(),
188            self.spawn_request.hints(),
189        ) {
190            sbatch_command.arg("--partition").arg(partition.name());
191        }
192
193        // If GPUs are required, use the gpu helper to determine the count and pass
194        // it to `sbatch` via `--gpus-per-task`.
195        if let Some(gpu_count) = v1::gpu(
196            self.spawn_request.requirements(),
197            self.spawn_request.hints(),
198        ) {
199            sbatch_command.arg(format!("--gpus-per-task={gpu_count}"));
200        }
201
202        // Add any user-configured extra arguments.
203        if let Some(args) = &self.backend_config.extra_sbatch_args {
204            sbatch_command.args(args);
205        }
206
207        sbatch_command
208            // Use verbose output that we can check later on
209            .arg("-v")
210            // Keep `sbatch` running until the job terminates
211            .arg("--wait")
212            // Pipe stdout and stderr so we can identify when a job begins, and can trace any other
213            // output. This should just be the `sbatch` verbose output on stderr.
214            .stdout(Stdio::piped())
215            .stderr(Stdio::piped())
216            // Name the Slurm job after the task ID, which has already been shortened to fit into
217            // the Slurm requirements.
218            .arg("--job-name")
219            .arg(&self.name)
220            // Send Slurm job stdout and stderr streams to these files. Since we redirect the
221            // Apptainer invocation's stdio to separate files, this will typically not contain
222            // anything, but can be useful for debugging if the scripts get modified.
223            .arg("-o")
224            .arg(slurm_stdout_path)
225            .arg("-e")
226            .arg(slurm_stderr_path)
227            // An explicit task count is required for some options
228            .arg("--ntasks=1")
229            // CPU request is rounded up to the nearest whole CPU
230            .arg(format!(
231                "--cpus-per-task={}",
232                self.required_cpu.ceil() as u64
233            ))
234            // Memory request is specified per node in mebibytes; we round the request up to the
235            // next mebibyte.
236            //
237            // Note that the Slurm documentation says "megabyte" (i.e., the base-10 unit), but the
238            // other explanations of the unit suffixes in the first-party documentation show the use
239            // of base-2 units, and multiple third-party sources available through online searches
240            // back the base-2 interpretation, for example:
241            //
242            // https://info.nrao.edu/computing/guide/cluster-processing/appendix/memory-options
243            // https://wcmscu.atlassian.net/wiki/spaces/WIKI/pages/327731/Using+Slurm
244            .arg(format!(
245                "--mem={}M",
246                (self.required_memory.as_u64() as f64 / bytesize::MIB as f64).ceil() as u64
247            ))
248            .arg(apptainer_command_path);
249
250        debug!(?sbatch_command, "spawning `sbatch` command");
251
252        let mut sbatch_child = sbatch_command.spawn()?;
253
254        crankshaft::events::send_event!(
255            self.crankshaft_events,
256            crankshaft::events::Event::TaskCreated {
257                id: crankshaft_task_id,
258                name: self.name.clone(),
259                tes_id: None,
260                token: self.cancellation_token.clone(),
261            },
262        );
263
264        // Take the stdio pipes from the child process and consume them for event
265        // reporting and tracing purposes.
266        //
267        // TODO ACF 2025-10-13: generate `sbatch`-compatible scripts instead and use a
268        // polling mechanism to watch for job status changes? `squeue` can emit
269        // json suitable for this.
270        let sbatch_stdout = sbatch_child
271            .stdout
272            .take()
273            .ok_or_else(|| anyhow!("sbatch child stdout missing"))?;
274        let task_name = self.name.clone();
275        let stdout_crankshaft_events = self.crankshaft_events.clone();
276        tokio::spawn(async move {
277            let mut lines = BufReader::new(sbatch_stdout).lines();
278            while let Ok(Some(line)) = lines.next_line().await {
279                // TODO ACF 2025-10-14: `sbatch --wait` even on high verbosity doesn't tell us
280                // when a job has actually started, only when it's been
281                // submitted.  Unless we can figure out a way to get that info
282                // out directly, we'll have to set up a separate task to
283                // poll job statuses. For the moment, this is potentially misleading about what
284                // work has actually begun computation.
285                if line.starts_with("Submitted batch job") {
286                    crankshaft::events::send_event!(
287                        stdout_crankshaft_events,
288                        crankshaft::events::Event::TaskStarted {
289                            id: crankshaft_task_id
290                        },
291                    );
292                }
293                trace!(stdout = line, task_name);
294            }
295        });
296        let sbatch_stderr = sbatch_child
297            .stderr
298            .take()
299            .ok_or_else(|| anyhow!("sbatch child stderr missing"))?;
300        let task_name = self.name.clone();
301        tokio::spawn(async move {
302            let mut lines = BufReader::new(sbatch_stderr).lines();
303            while let Ok(Some(line)) = lines.next_line().await {
304                trace!(stderr = line, task_name);
305            }
306        });
307
308        // Await the result of the `sbatch` command, which will only exit on error or
309        // once the containerized command has completed.
310        let sbatch_result = tokio::select! {
311            _ = self.cancellation_token.cancelled() => {
312                crankshaft::events::send_event!(
313                    self.crankshaft_events,
314                    crankshaft::events::Event::TaskCanceled {
315                        id: crankshaft_task_id
316                    },
317                );
318                Err(anyhow!("task execution cancelled"))
319            }
320            result = sbatch_child.wait() => result.map_err(Into::into),
321        }?;
322
323        crankshaft::events::send_event!(
324            self.crankshaft_events,
325            crankshaft::events::Event::TaskCompleted {
326                id: crankshaft_task_id,
327                exit_statuses: NonEmpty::new(sbatch_result),
328            }
329        );
330
331        Ok(TaskExecutionResult {
332            // Under normal circumstances, the exit code of `sbatch --wait` is the exit code of its
333            // command, and the exit code of `apptainer exec` is likewise the exit code of its
334            // command. One potential subtlety/problem here is that if `sbatch` or `apptainer` exit
335            // due to an error before running the WDL command, we could be erroneously ascribing an
336            // exit code to the WDL command.
337            exit_code: sbatch_result
338                .code()
339                .ok_or(anyhow!("task did not return an exit code"))?,
340            work_dir: EvaluationPath::Local(wdl_work_dir),
341            stdout: PrimitiveValue::new_file(
342                wdl_stdout_path
343                    .into_os_string()
344                    .into_string()
345                    .expect("path should be UTF-8"),
346            )
347            .into(),
348            stderr: PrimitiveValue::new_file(
349                wdl_stderr_path
350                    .into_os_string()
351                    .into_string()
352                    .expect("path should be UTF-8"),
353            )
354            .into(),
355        })
356    }
357}
358
359/// The experimental Slurm + Apptainer backend.
360///
361/// See the module-level documentation for details.
362#[derive(Debug)]
363pub struct SlurmApptainerBackend {
364    /// The configuration of the overall engine being executed.
365    engine_config: Arc<Config>,
366    /// The configuration of this backend.
367    backend_config: Arc<SlurmApptainerBackendConfig>,
368    /// The task manager for the backend.
369    manager: TaskManager<SlurmApptainerTaskRequest>,
370    /// Sender for crankshaft events.
371    crankshaft_events: Option<broadcast::Sender<Event>>,
372    /// Apptainer state.
373    apptainer_state: Arc<ApptainerState>,
374}
375
376impl SlurmApptainerBackend {
377    /// Create a new backend.
378    pub fn new(
379        run_root_dir: &Path,
380        engine_config: Arc<Config>,
381        backend_config: Arc<SlurmApptainerBackendConfig>,
382        crankshaft_events: Option<broadcast::Sender<Event>>,
383    ) -> Self {
384        let apptainer_state =
385            ApptainerState::new(&backend_config.apptainer_config, run_root_dir).into();
386        Self {
387            engine_config,
388            backend_config,
389            // TODO ACF 2025-10-13: the `MAX` values here mean that in addition to not limiting the
390            // overall number of CPU and memory used, we don't limit per-task consumption. There is
391            // potentially a path to pulling partition limits from Slurm for these, but for now we
392            // just throw jobs at the cluster.
393            manager: TaskManager::new_unlimited(u64::MAX, u64::MAX),
394            crankshaft_events,
395            apptainer_state,
396        }
397    }
398}
399
400impl TaskExecutionBackend for SlurmApptainerBackend {
401    fn max_concurrency(&self) -> u64 {
402        self.backend_config.max_scatter_concurrency
403    }
404
405    fn constraints(
406        &self,
407        requirements: &HashMap<String, Value>,
408        hints: &HashMap<String, crate::Value>,
409    ) -> anyhow::Result<super::TaskExecutionConstraints> {
410        let mut required_cpu = v1::cpu(requirements);
411        let mut required_memory = ByteSize::b(v1::memory(requirements)? as u64);
412
413        // Determine whether CPU or memory limits are set for this partition, and clamp
414        // or deny them as appropriate if the limits are exceeded
415        //
416        // TODO ACF 2025-10-16: refactor so that we're not duplicating logic here (for
417        // the in-WDL `task` values) and below in `spawn` (for the actual
418        // resource request)
419        if let Some(partition) = self
420            .backend_config
421            .slurm_partition_for_task(requirements, hints)
422        {
423            if let Some(max_cpu) = partition.max_cpu_per_task()
424                && required_cpu > max_cpu as f64
425            {
426                let env_specific = if self.engine_config.suppress_env_specific_output {
427                    String::new()
428                } else {
429                    format!(", but the execution backend has a maximum of {max_cpu}",)
430                };
431                match self.engine_config.task.cpu_limit_behavior {
432                    TaskResourceLimitBehavior::TryWithMax => {
433                        warn!(
434                            "task requires at least {required_cpu} CPU{s}{env_specific}",
435                            s = if required_cpu == 1.0 { "" } else { "s" },
436                        );
437                        // clamp the reported constraint to what's available
438                        required_cpu = max_cpu as f64;
439                    }
440                    TaskResourceLimitBehavior::Deny => {
441                        bail!(
442                            "task requires at least {required_cpu} CPU{s}{env_specific}",
443                            s = if required_cpu == 1.0 { "" } else { "s" },
444                        );
445                    }
446                }
447            }
448            if let Some(max_memory) = partition.max_memory_per_task()
449                && required_memory > max_memory
450            {
451                let env_specific = if self.engine_config.suppress_env_specific_output {
452                    String::new()
453                } else {
454                    format!(
455                        ", but the execution backend has a maximum of {max_memory} GiB",
456                        max_memory = max_memory.as_u64() as f64 / ONE_GIBIBYTE
457                    )
458                };
459                match self.engine_config.task.memory_limit_behavior {
460                    TaskResourceLimitBehavior::TryWithMax => {
461                        warn!(
462                            "task requires at least {required_memory} GiB of memory{env_specific}",
463                            required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
464                        );
465                        // clamp the reported constraint to what's available
466                        required_memory = max_memory;
467                    }
468                    TaskResourceLimitBehavior::Deny => {
469                        bail!(
470                            "task requires at least {required_memory} GiB of memory{env_specific}",
471                            required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
472                        );
473                    }
474                }
475            }
476        }
477        Ok(super::TaskExecutionConstraints {
478            container: Some(
479                v1::container(requirements, self.engine_config.task.container.as_deref())
480                    .into_owned(),
481            ),
482            // TODO ACF 2025-10-13: populate more meaningful values for these based on the given
483            // Slurm partition.
484            //
485            // sinfo -p <partition> -s --json | jq .sinfo[0].cpus
486            // sinfo -p <partition> -s --json | jq .sinfo[0].memory
487            cpu: required_cpu,
488            memory: required_memory.as_u64().try_into().unwrap_or(i64::MAX),
489            // TODO ACF 2025-10-16: these are almost certainly wrong
490            gpu: Default::default(),
491            fpga: Default::default(),
492            disks: Default::default(),
493        })
494    }
495
496    fn guest_inputs_dir(&self) -> Option<&'static str> {
497        Some(GUEST_INPUTS_DIR)
498    }
499
500    fn needs_local_inputs(&self) -> bool {
501        true
502    }
503
504    fn spawn(
505        &self,
506        request: TaskSpawnRequest,
507        cancellation_token: CancellationToken,
508    ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<TaskExecutionResult>>> {
509        let (completed_tx, completed_rx) = tokio::sync::oneshot::channel();
510
511        let requirements = request.requirements();
512        let hints = request.hints();
513
514        let container =
515            v1::container(requirements, self.engine_config.task.container.as_deref()).into_owned();
516
517        let mut required_cpu = v1::cpu(requirements);
518        let mut required_memory = ByteSize::b(v1::memory(requirements)? as u64);
519
520        // Determine whether CPU or memory limits are set for this partition, and clamp
521        // or deny them as appropriate if the limits are exceeded
522        //
523        // TODO ACF 2025-10-16: refactor so that we're not duplicating logic here (for
524        // the in-WDL `task` values) and below in `spawn` (for the actual
525        // resource request)
526        if let Some(partition) = self
527            .backend_config
528            .slurm_partition_for_task(requirements, hints)
529        {
530            if let Some(max_cpu) = partition.max_cpu_per_task()
531                && required_cpu > max_cpu as f64
532            {
533                let env_specific = if self.engine_config.suppress_env_specific_output {
534                    String::new()
535                } else {
536                    format!(", but the execution backend has a maximum of {max_cpu}",)
537                };
538                match self.engine_config.task.cpu_limit_behavior {
539                    TaskResourceLimitBehavior::TryWithMax => {
540                        warn!(
541                            "task requires at least {required_cpu} CPU{s}{env_specific}",
542                            s = if required_cpu == 1.0 { "" } else { "s" },
543                        );
544                        // clamp the reported constraint to what's available
545                        required_cpu = max_cpu as f64;
546                    }
547                    TaskResourceLimitBehavior::Deny => {
548                        bail!(
549                            "task requires at least {required_cpu} CPU{s}{env_specific}",
550                            s = if required_cpu == 1.0 { "" } else { "s" },
551                        );
552                    }
553                }
554            }
555            if let Some(max_memory) = partition.max_memory_per_task()
556                && required_memory > max_memory
557            {
558                let env_specific = if self.engine_config.suppress_env_specific_output {
559                    String::new()
560                } else {
561                    format!(
562                        ", but the execution backend has a maximum of {max_memory} GiB",
563                        max_memory = max_memory.as_u64() as f64 / ONE_GIBIBYTE
564                    )
565                };
566                match self.engine_config.task.memory_limit_behavior {
567                    TaskResourceLimitBehavior::TryWithMax => {
568                        warn!(
569                            "task requires at least {required_memory} GiB of memory{env_specific}",
570                            required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
571                        );
572                        // clamp the reported constraint to what's available
573                        required_memory = max_memory;
574                    }
575                    TaskResourceLimitBehavior::Deny => {
576                        bail!(
577                            "task requires at least {required_memory} GiB of memory{env_specific}",
578                            required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
579                        );
580                    }
581                }
582            }
583        }
584
585        // TODO ACF 2025-10-23: investigate whether Slurm offers hard vs soft limits for
586        // CPU and memory
587        let _max_cpu = v1::max_cpu(hints);
588        let _max_memory = v1::max_memory(hints)?.map(|i| i as u64);
589
590        // Truncate the request ID to fit in the Slurm job name length limit.
591        let request_id = request.id();
592        let name = if request_id.len() > SLURM_JOB_NAME_MAX_LENGTH {
593            request_id
594                .chars()
595                .take(SLURM_JOB_NAME_MAX_LENGTH)
596                .collect::<String>()
597        } else {
598            request_id.to_string()
599        };
600
601        self.manager.send(
602            SlurmApptainerTaskRequest {
603                backend_config: self.backend_config.clone(),
604                apptainer_state: self.apptainer_state.clone(),
605                spawn_request: request,
606                name,
607                container,
608                required_cpu,
609                required_memory,
610                crankshaft_events: self.crankshaft_events.clone(),
611                cancellation_token,
612            },
613            completed_tx,
614        );
615
616        Ok(completed_rx)
617    }
618
619    fn cleanup<'a>(
620        &'a self,
621        _work_dir: &'a EvaluationPath,
622        _token: CancellationToken,
623    ) -> Option<futures::future::BoxFuture<'a, ()>> {
624        // TODO ACF 2025-09-11: determine whether we need cleanup logic here;
625        // Apptainer's security model is fairly different from Docker so
626        // uid/gids on files shouldn't be as much of an issue, and using only
627        // `apptainer exec` means no longer-running containers to tear down
628        None
629    }
630}
631
632/// Configuration for a Slurm partition.
633///
634/// Each partition can optionally have per-task CPU and memory limits set so
635/// that tasks which are too large to be scheduled on that partition will fail
636/// immediately instead of pending indefinitely. In the future, these limits may
637/// be populated or validated by live information from the cluster, but
638/// for now they must be manually based on the user's understanding of the
639/// cluster configuration.
640#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
641pub struct SlurmPartitionConfig {
642    /// The name of the partition; this is the string passed to `sbatch
643    /// --partition=<partition_name>`.
644    name: String,
645    /// The maximum number of CPUs this partition can provision for a single
646    /// task.
647    max_cpu_per_task: Option<u64>,
648    /// The maximum memory this partition can provision for a single task.
649    max_memory_per_task: Option<ByteSize>,
650}
651
652impl SlurmPartitionConfig {
653    /// Create a [`SlurmPartitionConfig`].
654    pub fn new(
655        name: String,
656        max_cpu_per_task: Option<u64>,
657        max_memory_per_task: Option<ByteSize>,
658    ) -> Self {
659        Self {
660            name,
661            max_cpu_per_task,
662            max_memory_per_task,
663        }
664    }
665
666    /// The name of the partition; this is the string passed to `sbatch
667    /// --partition=<partition_name>`.
668    pub fn name(&self) -> &str {
669        &self.name
670    }
671
672    /// The maximum number of CPUs this partition can provision for a single
673    /// task.
674    pub fn max_cpu_per_task(&self) -> Option<u64> {
675        self.max_cpu_per_task
676    }
677
678    /// The maximum memory this partition can provision for a single task.
679    pub fn max_memory_per_task(&self) -> Option<ByteSize> {
680        self.max_memory_per_task
681    }
682
683    /// Validate that this Slurm partition exists according to the local
684    /// `sinfo`.
685    async fn validate(&self, name: &str) -> Result<(), anyhow::Error> {
686        let partition = self.name();
687        ensure!(
688            !partition.is_empty(),
689            "{name}_slurm_partition name cannot be empty"
690        );
691        if let Some(max_cpu_per_task) = self.max_cpu_per_task() {
692            ensure!(
693                max_cpu_per_task > 0,
694                "{name}_slurm_partition `{partition}` must allow at least 1 CPU to be provisioned"
695            );
696        }
697        if let Some(max_memory_per_task) = self.max_memory_per_task() {
698            ensure!(
699                max_memory_per_task.as_u64() > 0,
700                "{name}_slurm_partition `{partition}` must allow at least some memory to be \
701                 provisioned"
702            );
703        }
704        match tokio::time::timeout(
705            // 10 seconds is rather arbitrary; `scontrol` ordinarily returns extremely quickly, but
706            // we don't want things to run away on a misconfigured system
707            std::time::Duration::from_secs(10),
708            Command::new("scontrol")
709                .arg("show")
710                .arg("partition")
711                .arg(partition)
712                .output(),
713        )
714        .await
715        {
716            Ok(output) => {
717                let output = output.context("validating Slurm partition")?;
718                if !output.status.success() {
719                    let stdout = String::from_utf8_lossy(&output.stdout);
720                    let stderr = String::from_utf8_lossy(&output.stderr);
721                    error!(%stdout, %stderr, %partition, "failed to validate {name}_slurm_partition");
722                    Err(anyhow!(
723                        "failed to validate {name}_slurm_partition `{partition}`"
724                    ))
725                } else {
726                    Ok(())
727                }
728            }
729            Err(_) => Err(anyhow!(
730                "timed out trying to validate {name}_slurm_partition `{partition}`"
731            )),
732        }
733    }
734}
735
736/// Configuration for the Slurm + Apptainer backend.
737// TODO ACF 2025-09-23: add a Apptainer/Singularity mode config that switches around executable
738// name, env var names, etc.
739#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
740pub struct SlurmApptainerBackendConfig {
741    /// Which partition, if any, to specify when submitting normal jobs to
742    /// Slurm.
743    ///
744    /// This may be superseded by
745    /// [`short_task_slurm_partition`][Self::short_task_slurm_partition],
746    /// [`gpu_slurm_partition`][Self::gpu_slurm_partition], or
747    /// [`fpga_slurm_partition`][Self::fpga_slurm_partition] for corresponding
748    /// tasks.
749    pub default_slurm_partition: Option<SlurmPartitionConfig>,
750    /// Which partition, if any, to specify when submitting [short
751    /// tasks](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#short_task) to Slurm.
752    ///
753    /// This may be superseded by
754    /// [`gpu_slurm_partition`][Self::gpu_slurm_partition] or
755    /// [`fpga_slurm_partition`][Self::fpga_slurm_partition] for tasks which
756    /// require specialized hardware.
757    pub short_task_slurm_partition: Option<SlurmPartitionConfig>,
758    /// Which partition, if any, to specify when submitting [tasks which require
759    /// a GPU](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#hardware-accelerators-gpu-and--fpga)
760    /// to Slurm.
761    pub gpu_slurm_partition: Option<SlurmPartitionConfig>,
762    /// Which partition, if any, to specify when submitting [tasks which require
763    /// an FPGA](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#hardware-accelerators-gpu-and--fpga)
764    /// to Slurm.
765    pub fpga_slurm_partition: Option<SlurmPartitionConfig>,
766    /// Additional command-line arguments to pass to `sbatch` when submitting
767    /// jobs to Slurm.
768    pub extra_sbatch_args: Option<Vec<String>>,
769    /// The maximum number of scatter subtasks that can be evaluated
770    /// concurrently.
771    ///
772    /// By default, this is 200.
773    #[serde(default = "default_max_scatter_concurrency")]
774    pub max_scatter_concurrency: u64,
775    /// The configuration of Apptainer, which is used as the container runtime
776    /// on the compute nodes where Slurm dispatches tasks.
777    ///
778    /// Note that this will likely be replaced by an abstraction over multiple
779    /// container execution runtimes in the future, rather than being
780    /// hardcoded to Apptainer.
781    #[serde(default)]
782    // TODO ACF 2025-10-16: temporarily flatten this into the overall config so that it doesn't
783    // break existing serialized configs. We'll save breaking the config file format for when we
784    // actually have meaningful composition of in-place runtimes.
785    #[serde(flatten)]
786    pub apptainer_config: ApptainerConfig,
787}
788
789fn default_max_scatter_concurrency() -> u64 {
790    200
791}
792
793impl Default for SlurmApptainerBackendConfig {
794    fn default() -> Self {
795        Self {
796            default_slurm_partition: None,
797            short_task_slurm_partition: None,
798            gpu_slurm_partition: None,
799            fpga_slurm_partition: None,
800            extra_sbatch_args: None,
801            max_scatter_concurrency: default_max_scatter_concurrency(),
802            apptainer_config: ApptainerConfig::default(),
803        }
804    }
805}
806
807impl SlurmApptainerBackendConfig {
808    /// Validate that the backend is appropriately configured.
809    pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
810        if cfg!(not(unix)) {
811            bail!("Slurm + Apptainer backend is not supported on non-unix platforms");
812        }
813        if !engine_config.experimental_features_enabled {
814            bail!("Slurm + Apptainer backend requires enabling experimental features");
815        }
816
817        // Do what we can to validate options that are dependent on the dynamic
818        // environment. These are a bit fraught, particularly if the behavior of
819        // the external tools changes based on where a job gets dispatched, but
820        // querying from the perspective of the current node allows
821        // us to get better error messages in circumstances typical to a cluster.
822        if let Some(partition) = &self.default_slurm_partition {
823            partition.validate("default").await?;
824        }
825        if let Some(partition) = &self.short_task_slurm_partition {
826            partition.validate("short_task").await?;
827        }
828        if let Some(partition) = &self.gpu_slurm_partition {
829            partition.validate("gpu").await?;
830        }
831        if let Some(partition) = &self.fpga_slurm_partition {
832            partition.validate("fpga").await?;
833        }
834
835        self.apptainer_config.validate().await?;
836
837        Ok(())
838    }
839
840    /// Get the appropriate Slurm partition for a task under this configuration.
841    ///
842    /// Specialized hardware requirements are prioritized over other
843    /// characteristics, with FPGA taking precedence over GPU.
844    fn slurm_partition_for_task(
845        &self,
846        requirements: &HashMap<String, Value>,
847        hints: &HashMap<String, Value>,
848    ) -> Option<&SlurmPartitionConfig> {
849        // TODO ACF 2025-09-26: what's the relationship between this code and
850        // `TaskExecutionConstraints`? Should this be there instead, or be pulling
851        // values from that instead of directly from `requirements` and `hints`?
852
853        // Specialized hardware gets priority.
854        if let Some(partition) = self.fpga_slurm_partition.as_ref()
855            && let Some(true) = requirements
856                .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
857                .and_then(Value::as_boolean)
858        {
859            return Some(partition);
860        }
861
862        if let Some(partition) = self.gpu_slurm_partition.as_ref()
863            && let Some(true) = requirements
864                .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
865                .and_then(Value::as_boolean)
866        {
867            return Some(partition);
868        }
869
870        // Then short tasks.
871        if let Some(partition) = self.short_task_slurm_partition.as_ref()
872            && let Some(true) = hints
873                .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
874                .and_then(Value::as_boolean)
875        {
876            return Some(partition);
877        }
878
879        // Finally the default partition. If this is `None`, `sbatch` gets run without a
880        // partition argument and the cluster's default is used.
881        self.default_slurm_partition.as_ref()
882    }
883}