wdl_engine/backend/
lsf_apptainer.rs

1#![allow(clippy::missing_docs_in_private_items)]
2
3//! Experimental LSF + Apptainer (aka Singularity) task execution backend.
4//!
5//! This experimental backend submits each task as an LSF job which invokes
6//! Apptainer to provide the appropriate container environment for the WDL
7//! command to execute.
8//!
9//! Due to the proprietary nature of LSF, and limited ability to install
10//! Apptainer locally or in CI, this is currently tested by hand; expect (and
11//! report) bugs! In follow-up work, we hope to build a limited test suite based
12//! on mocking CLI invocations and/or golden testing of generated
13//! `bsub`/`apptainer` scripts.
14
15use std::collections::HashMap;
16use std::fmt::Write as _;
17use std::path::PathBuf;
18use std::process::Stdio;
19use std::sync::Arc;
20
21use anyhow::Context as _;
22use anyhow::anyhow;
23use anyhow::bail;
24use crankshaft::events::Event;
25use images::sif_for_container;
26use nonempty::NonEmpty;
27use tokio::fs::File;
28use tokio::fs::{self};
29use tokio::io::AsyncBufReadExt;
30use tokio::io::BufReader;
31use tokio::process::Command;
32use tokio::sync::broadcast;
33use tokio_util::sync::CancellationToken;
34use tracing::error;
35use tracing::trace;
36use tracing::warn;
37
38use super::COMMAND_FILE_NAME;
39use super::TaskExecutionBackend;
40use super::TaskManager;
41use super::TaskManagerRequest;
42use super::TaskSpawnRequest;
43use super::WORK_DIR_NAME;
44use crate::PrimitiveValue;
45use crate::STDERR_FILE_NAME;
46use crate::STDOUT_FILE_NAME;
47use crate::TaskExecutionResult;
48use crate::Value;
49use crate::config::Config;
50use crate::path::EvaluationPath;
51use crate::v1;
52
53mod images;
54
55/// The name of the file where the Apptainer command invocation will be written.
56const APPTAINER_COMMAND_FILE_NAME: &str = "apptainer_command";
57
58/// The root guest path for inputs.
59const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs/";
60
61/// The guest working directory.
62const GUEST_WORK_DIR: &str = "/mnt/task/work";
63
64/// The guest path for the command file.
65const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
66
67/// The path to the container's stdout.
68const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
69
70/// The path to the container's stderr.
71const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
72
73/// The maximum length of an LSF job name.
74///
75/// See <https://www.ibm.com/docs/en/spectrum-lsf/10.1.0?topic=o-j>.
76const LSF_JOB_NAME_MAX_LENGTH: usize = 4094;
77
78#[derive(Debug)]
79struct LsfApptainerTaskRequest {
80    backend_config: Arc<LsfApptainerBackendConfig>,
81    name: String,
82    spawn_request: TaskSpawnRequest,
83    /// The requested container for the task.
84    container: String,
85    /// The requested CPU reservation for the task.
86    cpu: f64,
87    /// The requested memory reservation for the task, in bytes.
88    memory: u64,
89    /// The broadcast channel to update interested parties with the status of
90    /// executing tasks.
91    ///
92    /// This backend does not yet take advantage of the full Crankshaft
93    /// machinery, but we send rudimentary messages on this channel which helps
94    /// with UI presentation.
95    crankshaft_events: Option<broadcast::Sender<Event>>,
96    cancellation_token: CancellationToken,
97}
98
99impl TaskManagerRequest for LsfApptainerTaskRequest {
100    fn cpu(&self) -> f64 {
101        self.cpu
102    }
103
104    fn memory(&self) -> u64 {
105        self.memory
106    }
107
108    async fn run(self) -> anyhow::Result<super::TaskExecutionResult> {
109        let crankshaft_task_id = crankshaft::events::next_task_id();
110
111        let container_sif = sif_for_container(
112            &self.backend_config,
113            &self.container,
114            self.cancellation_token.clone(),
115        )
116        .await?;
117
118        let attempt_dir = self.spawn_request.attempt_dir();
119
120        // Create the host directory that will be mapped to the WDL working directory.
121        let wdl_work_dir = attempt_dir.join(WORK_DIR_NAME);
122        fs::create_dir_all(&wdl_work_dir).await.with_context(|| {
123            format!(
124                "failed to create WDL working directory `{path}`",
125                path = wdl_work_dir.display()
126            )
127        })?;
128
129        // Write the evaluated WDL command section to a host file.
130        let wdl_command_path = attempt_dir.join(COMMAND_FILE_NAME);
131        fs::write(&wdl_command_path, self.spawn_request.command())
132            .await
133            .with_context(|| {
134                format!(
135                    "failed to write WDL command contents to `{path}`",
136                    path = wdl_command_path.display()
137                )
138            })?;
139        #[cfg(unix)]
140        tokio::fs::set_permissions(
141            &wdl_command_path,
142            <std::fs::Permissions as std::os::unix::fs::PermissionsExt>::from_mode(0o770),
143        )
144        .await?;
145
146        // Create an empty file for the WDL command's stdout.
147        let wdl_stdout_path = attempt_dir.join(STDOUT_FILE_NAME);
148        let _ = File::create(&wdl_stdout_path).await.with_context(|| {
149            format!(
150                "failed to create WDL stdout file `{path}`",
151                path = wdl_stdout_path.display()
152            )
153        })?;
154
155        // Create an empty file for the WDL command's stderr.
156        let wdl_stderr_path = attempt_dir.join(STDERR_FILE_NAME);
157        let _ = File::create(&wdl_stderr_path).await.with_context(|| {
158            format!(
159                "failed to create WDL stderr file `{path}`",
160                path = wdl_stderr_path.display()
161            )
162        })?;
163
164        // Create a temp dir for the container's execution within the attempt dir
165        // hierarchy. On many HPC systems, `/tmp` is mapped to a relatively
166        // small, local scratch disk that can fill up easily. Mapping the
167        // container's `/tmp` and `/var/tmp` paths to the filesystem we're using
168        // for other inputs and outputs prevents this from being a capacity problem,
169        // though potentially at the expense of execution speed if the
170        // non-`/tmp` filesystem is significantly slower.
171        let container_tmp_path = self
172            .spawn_request
173            .temp_dir()
174            .join("container_tmp")
175            .to_path_buf();
176        tokio::fs::DirBuilder::new()
177            .recursive(true)
178            .create(&container_tmp_path)
179            .await
180            .with_context(|| {
181                format!(
182                    "failed to create container /tmp directory at `{path}`",
183                    path = container_tmp_path.display()
184                )
185            })?;
186        let container_var_tmp_path = self
187            .spawn_request
188            .temp_dir()
189            .join("container_var_tmp")
190            .to_path_buf();
191        tokio::fs::DirBuilder::new()
192            .recursive(true)
193            .create(&container_var_tmp_path)
194            .await
195            .with_context(|| {
196                format!(
197                    "failed to create container /var/tmp directory at `{path}`",
198                    path = container_var_tmp_path.display()
199                )
200            })?;
201
202        // Assemble the Apptainer invocation. We'll write out this command to the host
203        // filesystem, and ultimately submit it as the command to run via LSF.
204        let apptainer_command_path = attempt_dir.join(APPTAINER_COMMAND_FILE_NAME);
205        let mut apptainer_command = String::new();
206        writeln!(&mut apptainer_command, "#!/bin/env bash")?;
207
208        // Set up any WDL-specified guest environment variables, using the
209        // `APPTAINERENV_` prefix approach (ref:
210        // https://apptainer.org/docs/user/1.3/environment_and_metadata.html#apptainerenv-prefix) to
211        // avoid command line argument limits.
212        for (k, v) in self.spawn_request.env().iter() {
213            writeln!(&mut apptainer_command, "export APPTAINERENV_{k}={v}")?;
214        }
215
216        // Begin writing the `apptainer` command itself. We're using the synchronous
217        // `exec` command which keeps running until the containerized command is
218        // finished.
219        write!(&mut apptainer_command, "apptainer -v exec ")?;
220        write!(&mut apptainer_command, "--cwd {GUEST_WORK_DIR} ")?;
221        // These options make the Apptainer sandbox behave more like default Docker
222        // behavior, e.g. by not auto-mounting the user's home directory and
223        // inheriting all environment variables.
224        write!(&mut apptainer_command, "--containall --cleanenv ")?;
225
226        for input in self.spawn_request.inputs() {
227            write!(
228                &mut apptainer_command,
229                "--mount type=bind,src={host_path},dst={guest_path},ro ",
230                host_path = input
231                    .local_path()
232                    .ok_or_else(|| anyhow!("input not localized: {input:?}"))?
233                    .display(),
234                guest_path = input
235                    .guest_path()
236                    .ok_or_else(|| anyhow!("guest path missing: {input:?}"))?,
237            )?;
238        }
239
240        // Mount the instantiated WDL command as read-only.
241        write!(
242            &mut apptainer_command,
243            "--mount type=bind,src={},dst={GUEST_COMMAND_PATH},ro ",
244            wdl_command_path.display()
245        )?;
246        // Mount the working dir, temp dirs, and stdio files as read/write (no `,ro` on
247        // the end like for the inputs).
248        write!(
249            &mut apptainer_command,
250            "--mount type=bind,src={},dst={GUEST_WORK_DIR} ",
251            wdl_work_dir.display()
252        )?;
253        write!(
254            &mut apptainer_command,
255            "--mount type=bind,src={},dst=/tmp ",
256            container_tmp_path.display()
257        )?;
258        write!(
259            &mut apptainer_command,
260            "--mount type=bind,src={},dst=/var/tmp ",
261            container_var_tmp_path.display()
262        )?;
263        write!(
264            &mut apptainer_command,
265            "--mount type=bind,src={},dst={GUEST_STDOUT_PATH} ",
266            wdl_stdout_path.display()
267        )?;
268        write!(
269            &mut apptainer_command,
270            "--mount type=bind,src={},dst={GUEST_STDERR_PATH} ",
271            wdl_stderr_path.display()
272        )?;
273        // Add the `--nv` argument if a GPU is required by the task.
274        if let Some(true) = self
275            .spawn_request
276            .requirements()
277            .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
278            .and_then(Value::as_boolean)
279        {
280            write!(&mut apptainer_command, "--nv ")?;
281        }
282
283        // Add any user-configured extra arguments.
284        if let Some(args) = &self.backend_config.extra_apptainer_exec_args {
285            for arg in args {
286                write!(&mut apptainer_command, "{arg} ")?;
287            }
288        }
289        // Specify the container sif file as a positional argument.
290        write!(&mut apptainer_command, "{} ", container_sif.display())?;
291        // Provide the instantiated WDL command, with its stdio handles redirected to
292        // their respective guest paths.
293        write!(
294            &mut apptainer_command,
295            "bash -c \"{GUEST_COMMAND_PATH} > {GUEST_STDOUT_PATH} 2> {GUEST_STDERR_PATH}\" "
296        )?;
297        // The path for the Apptainer-level stdout and stderr.
298        let apptainer_stdout_path = attempt_dir.join("apptainer.stdout");
299        let apptainer_stderr_path = attempt_dir.join("apptainer.stderr");
300        // Redirect the output of Apptainer itself to these files. We run Apptainer with
301        // verbosity cranked up, so these should be helpful diagnosing failures.
302        writeln!(
303            &mut apptainer_command,
304            "> {stdout} 2> {stderr}",
305            stdout = apptainer_stdout_path.display(),
306            stderr = apptainer_stderr_path.display()
307        )?;
308
309        fs::write(&apptainer_command_path, apptainer_command)
310            .await
311            .with_context(|| {
312                format!(
313                    "failed to write Apptainer command file `{}`",
314                    apptainer_command_path.display()
315                )
316            })?;
317        #[cfg(unix)]
318        tokio::fs::set_permissions(
319            &apptainer_command_path,
320            <std::fs::Permissions as std::os::unix::fs::PermissionsExt>::from_mode(0o770),
321        )
322        .await?;
323
324        // The path for the LSF-level stdout and stderr. This primarily contains the job
325        // report, as we redirect Apptainer and WDL output separately.
326        let lsf_stdout_path = attempt_dir.join("lsf.stdout");
327        let lsf_stderr_path = attempt_dir.join("lsf.stderr");
328
329        let mut bsub_command = Command::new("bsub");
330
331        // If an LSF queue has been configured, specify it. Otherwise, the job will end
332        // up on the cluster's default queue.
333        if let Some(queue) = self.backend_config.lsf_queue_for_task(
334            self.spawn_request.requirements(),
335            self.spawn_request.hints(),
336        ) {
337            bsub_command.arg("-q").arg(queue);
338        }
339
340        // If GPUs are required, pass a basic `-gpu` flag to `bsub`. If this is a bare
341        // `requirements { gpu: true }`, we request 1 GPU per host. If there is
342        // also an integer `hints: { gpu: n }`, we request `n` GPUs per host.
343        if let Some(true) = self
344            .spawn_request
345            .requirements()
346            .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
347            .and_then(Value::as_boolean)
348        {
349            match self.spawn_request.hints().get(wdl_ast::v1::TASK_HINT_GPU) {
350                Some(Value::Primitive(PrimitiveValue::Integer(n))) => {
351                    bsub_command.arg("-gpu").arg(format!("num={n}/host"));
352                }
353                Some(Value::Primitive(PrimitiveValue::String(hint))) => {
354                    warn!(
355                        %hint,
356                        "string hints for GPU are not supported; falling back to 1 GPU per host"
357                    );
358                    bsub_command.arg("-gpu").arg("num=1/host");
359                }
360                // Other hint value types should be rejected already, so the remaining valid case is
361                // a GPU requirement with no hints
362                _ => {
363                    bsub_command.arg("-gpu").arg("num=1/host");
364                }
365            }
366        }
367
368        // Add any user-configured extra arguments.
369        if let Some(args) = &self.backend_config.extra_bsub_args {
370            bsub_command.args(args);
371        }
372
373        bsub_command
374            // Pipe stdout and stderr so we can identify when a job begins, and can trace any other
375            // output. This should just be the LSF output like `<<Waiting for dispatch ...>>`.
376            .stdout(Stdio::piped())
377            .stderr(Stdio::piped())
378            // TODO ACF 2025-09-10: make this configurable; hardcode turning off LSF email spam for
379            // now though.
380            .env("LSB_JOB_REPORT_MAIL", "N")
381            // This option makes the `bsub` invocation synchronous, so this command will not exit
382            // until the job is complete.
383            //
384            // If the number of concurrent `bsub` processes becomes a problem, we can switch this to
385            // an asynchronous model where we drop this argument, grab the job ID, and poll for it
386            // using `bjobs`.
387            .arg("-K")
388            // Name the LSF job after the task ID, which has already been shortened to fit into the
389            // LSF requirements.
390            .arg("-J")
391            .arg(&self.name)
392            // Send LSF job stdout and stderr streams to these files. Since we redirect the
393            // Apptainer invocation's stdio to separate files, this will typically amount to the LSF
394            // job report.
395            .arg("-oo")
396            .arg(lsf_stdout_path)
397            .arg("-eo")
398            .arg(lsf_stderr_path)
399            // CPU request is rounded up to the nearest whole CPU
400            .arg("-R")
401            .arg(format!(
402                "affinity[cpu({cpu})]",
403                cpu = self.cpu.ceil() as u64
404            ))
405            // Memory request is specified per job to avoid ambiguity on clusters which may be
406            // configured to interpret memory requests as per-core or per-task. We also use an
407            // explicit KB unit which LSF appears to interpret as base-2 kibibytes.
408            .arg("-R")
409            .arg(format!(
410                "rusage[mem={memory_kb}KB/job]",
411                memory_kb = self.memory / 1024
412            ))
413            .arg(apptainer_command_path);
414
415        let mut bsub_child = bsub_command.spawn()?;
416
417        crankshaft::events::send_event!(
418            self.crankshaft_events,
419            crankshaft::events::Event::TaskCreated {
420                id: crankshaft_task_id,
421                name: self.name.clone(),
422                tes_id: None,
423                token: self.cancellation_token.clone(),
424            },
425        );
426
427        // Take the stdio pipes from the child process and consume them for event
428        // reporting and tracing purposes.
429        //
430        // TODO ACF 2025-09-23: drop the `-K` from `bsub` and poll status instead? Could
431        // be less intensive from a resource perspective vs having a process and
432        // two loops on the head node per task, but we should wait to observe
433        // real-world performance before complicating things.
434        let bsub_stdout = bsub_child
435            .stdout
436            .take()
437            .ok_or_else(|| anyhow!("bsub child stdout missing"))?;
438        let task_name = self.name.clone();
439        tokio::spawn(async move {
440            let mut lines = BufReader::new(bsub_stdout).lines();
441            while let Ok(Some(line)) = lines.next_line().await {
442                trace!(stdout = line, task_name);
443            }
444        });
445        let bsub_stderr = bsub_child
446            .stderr
447            .take()
448            .ok_or_else(|| anyhow!("bsub child stderr missing"))?;
449        let task_name = self.name.clone();
450        let stderr_crankshaft_events = self.crankshaft_events.clone();
451        tokio::spawn(async move {
452            let mut lines = BufReader::new(bsub_stderr).lines();
453            while let Ok(Some(line)) = lines.next_line().await {
454                if line.starts_with("<<Starting") {
455                    crankshaft::events::send_event!(
456                        stderr_crankshaft_events,
457                        crankshaft::events::Event::TaskStarted {
458                            id: crankshaft_task_id
459                        },
460                    );
461                }
462                trace!(stderr = line, task_name);
463            }
464        });
465
466        // Await the result of the `bsub` command, which will only exit on error or once
467        // the containerized command has completed.
468        let bsub_result = tokio::select! {
469            _ = self.cancellation_token.cancelled() => {
470                crankshaft::events::send_event!(
471                    self.crankshaft_events,
472                    crankshaft::events::Event::TaskCanceled {
473                        id: crankshaft_task_id
474                    },
475                );
476                Err(anyhow!("task execution cancelled"))
477            }
478            result = bsub_child.wait() => result.map_err(Into::into),
479        }?;
480
481        crankshaft::events::send_event!(
482            self.crankshaft_events,
483            crankshaft::events::Event::TaskCompleted {
484                id: crankshaft_task_id,
485                exit_statuses: NonEmpty::new(bsub_result),
486            }
487        );
488
489        Ok(TaskExecutionResult {
490            // Under normal circumstances, the exit code of `bsub -K` is the exit code of its
491            // command, and the exit code of `apptainer exec` is likewise the exit code of its
492            // command. One potential subtlety/problem here is that if `bsub` or `apptainer` exit
493            // due to an error before running the WDL command, we could be erroneously ascribing an
494            // exit code to the WDL command.
495            exit_code: bsub_result
496                .code()
497                .ok_or(anyhow!("task did not return an exit code"))?,
498            work_dir: EvaluationPath::Local(wdl_work_dir),
499            stdout: PrimitiveValue::new_file(
500                wdl_stdout_path
501                    .into_os_string()
502                    .into_string()
503                    .expect("path should be UTF-8"),
504            )
505            .into(),
506            stderr: PrimitiveValue::new_file(
507                wdl_stderr_path
508                    .into_os_string()
509                    .into_string()
510                    .expect("path should be UTF-8"),
511            )
512            .into(),
513        })
514    }
515}
516
517/// The experimental LSF + Apptainer backend.
518///
519/// See the module-level documentation for details.
520#[derive(Debug)]
521pub struct LsfApptainerBackend {
522    engine_config: Arc<Config>,
523    backend_config: Arc<LsfApptainerBackendConfig>,
524    manager: TaskManager<LsfApptainerTaskRequest>,
525    crankshaft_events: Option<broadcast::Sender<Event>>,
526}
527
528impl LsfApptainerBackend {
529    /// Create a new backend.
530    pub fn new(
531        engine_config: Arc<Config>,
532        backend_config: Arc<LsfApptainerBackendConfig>,
533        crankshaft_events: Option<broadcast::Sender<Event>>,
534    ) -> Self {
535        Self {
536            engine_config,
537            backend_config,
538            // TODO ACF 2025-09-11: the `MAX` values here mean that in addition to not limiting the
539            // overall number of CPU and memory used, we don't limit per-task consumption. There is
540            // potentially a path to pulling queue limits from LSF for these, but for now we just
541            // throw jobs at the cluster.
542            manager: TaskManager::new_unlimited(u64::MAX, u64::MAX),
543            crankshaft_events,
544        }
545    }
546}
547
548impl TaskExecutionBackend for LsfApptainerBackend {
549    fn max_concurrency(&self) -> u64 {
550        self.backend_config.max_scatter_concurrency
551    }
552
553    fn constraints(
554        &self,
555        requirements: &std::collections::HashMap<String, crate::Value>,
556        _hints: &std::collections::HashMap<String, crate::Value>,
557    ) -> anyhow::Result<super::TaskExecutionConstraints> {
558        Ok(super::TaskExecutionConstraints {
559            container: Some(
560                v1::container(requirements, self.engine_config.task.container.as_deref())
561                    .into_owned(),
562            ),
563            // TODO ACF 2025-09-11: populate more meaningful values for these based on the given LSF
564            // queue. Unfortunately, it's not straightforward to ask "what's the most CPUs I can ask
565            // for and still hope to be scheduled?". A reasonable stopgap would be to make this a
566            // config parameter, but the experience would be unfortunate when having to manually
567            // update that if changing queues, or if handling multiple queues for short jobs.
568            cpu: f64::MAX,
569            memory: i64::MAX,
570            gpu: Default::default(),
571            fpga: Default::default(),
572            disks: Default::default(),
573        })
574    }
575
576    fn guest_inputs_dir(&self) -> Option<&'static str> {
577        Some(GUEST_INPUTS_DIR)
578    }
579
580    fn needs_local_inputs(&self) -> bool {
581        true
582    }
583
584    fn spawn(
585        &self,
586        request: TaskSpawnRequest,
587        cancellation_token: CancellationToken,
588    ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<TaskExecutionResult>>> {
589        let (completed_tx, completed_rx) = tokio::sync::oneshot::channel();
590
591        let requirements = request.requirements();
592        let hints = request.hints();
593
594        let container =
595            v1::container(requirements, self.engine_config.task.container.as_deref()).into_owned();
596        let cpu = v1::cpu(requirements);
597        let memory = v1::memory(requirements)? as u64;
598        // TODO ACF 2025-09-11: I don't _think_ LSF offers a hard/soft CPU limit
599        // distinction, but we could potentially use a max as part of the
600        // resource request. That would likely mean using `bsub -n min,max`
601        // syntax as it doesn't seem that `affinity` strings support ranges
602        let _max_cpu = v1::max_cpu(hints);
603        // TODO ACF 2025-09-11: set a hard memory limit with `bsub -M !`?
604        let _max_memory = v1::max_memory(hints)?.map(|i| i as u64);
605
606        // Truncate the request ID to fit in the LSF job name length limit.
607        let request_id = request.id();
608        let name = if request_id.len() > LSF_JOB_NAME_MAX_LENGTH {
609            request_id
610                .chars()
611                .take(LSF_JOB_NAME_MAX_LENGTH)
612                .collect::<String>()
613        } else {
614            request_id.to_string()
615        };
616
617        self.manager.send(
618            LsfApptainerTaskRequest {
619                backend_config: self.backend_config.clone(),
620                spawn_request: request,
621                name,
622                container,
623                cpu,
624                memory,
625                crankshaft_events: self.crankshaft_events.clone(),
626                cancellation_token,
627            },
628            completed_tx,
629        );
630
631        Ok(completed_rx)
632    }
633
634    fn cleanup<'a>(
635        &'a self,
636        _work_dir: &'a EvaluationPath,
637        _token: CancellationToken,
638    ) -> Option<futures::future::BoxFuture<'a, ()>> {
639        // TODO ACF 2025-09-11: determine whether we need cleanup logic here;
640        // Apptainer's security model is fairly different from Docker so
641        // uid/gids on files shouldn't be as much of an issue, and using only
642        // `apptainer exec` means no longer-running containers to tear down
643        None
644    }
645}
646
647/// Configuration for the LSF + Apptainer backend.
648// TODO ACF 2025-09-12: add queue option for short tasks
649//
650// TODO ACF 2025-09-23: add a Apptainer/Singularity mode config that switches around executable
651// name, env var names, etc.
652#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
653pub struct LsfApptainerBackendConfig {
654    /// Which queue, if any, to specify when submitting normal jobs to LSF.
655    ///
656    /// This may be superseded by
657    /// [`short_task_lsf_queue`][Self::short_task_lsf_queue],
658    /// [`gpu_lsf_queue`][Self::gpu_lsf_queue], or
659    /// [`fpga_lsf_queue`][Self::fpga_lsf_queue] for corresponding tasks.
660    pub default_lsf_queue: Option<String>,
661    /// Which queue, if any, to specify when submitting [short
662    /// tasks](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#short_task) to LSF.
663    ///
664    /// This may be superseded by [`gpu_lsf_queue`][Self::gpu_lsf_queue] or
665    /// [`fpga_lsf_queue`][Self::fpga_lsf_queue] for tasks which require
666    /// specialized hardware.
667    pub short_task_lsf_queue: Option<String>,
668    /// Which queue, if any, to specify when submitting [tasks which require a
669    /// GPU](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#hardware-accelerators-gpu-and--fpga)
670    /// to LSF.
671    pub gpu_lsf_queue: Option<String>,
672    /// Which queue, if any, to specify when submitting [tasks which require a
673    /// GPU](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#hardware-accelerators-gpu-and--fpga)
674    /// to LSF.
675    pub fpga_lsf_queue: Option<String>,
676    /// Additional command-line arguments to pass to `bsub` when submitting jobs
677    /// to LSF.
678    pub extra_bsub_args: Option<Vec<String>>,
679    /// The maximum number of scatter subtasks that can be evaluated
680    /// concurrently.
681    ///
682    /// By default, this is 200.
683    #[serde(default = "default_max_scatter_concurrency")]
684    pub max_scatter_concurrency: u64,
685    /// Additional command-line arguments to pass to `apptainer exec` when
686    /// executing tasks.
687    pub extra_apptainer_exec_args: Option<Vec<String>>,
688    /// The directory in which temporary directories will be created containing
689    /// Apptainer `.sif` files.
690    ///
691    /// This should be a location that is accessible by all jobs on the LSF
692    /// cluster.
693    ///
694    /// By default, this is `$HOME/.cache/sprocket-apptainer-images`, or
695    /// `/tmp/sprocket-apptainer-images` if the home directory cannot be
696    /// determined.
697    #[serde(default = "default_apptainer_images_dir")]
698    pub apptainer_images_dir: PathBuf,
699}
700
701fn default_max_scatter_concurrency() -> u64 {
702    200
703}
704
705fn default_apptainer_images_dir() -> PathBuf {
706    if let Some(cache) = dirs::cache_dir() {
707        cache.join("sprocket-apptainer-images").to_path_buf()
708    } else {
709        std::env::temp_dir()
710            .join("sprocket-apptainer-images")
711            .to_path_buf()
712    }
713}
714
715impl Default for LsfApptainerBackendConfig {
716    fn default() -> Self {
717        Self {
718            default_lsf_queue: None,
719            short_task_lsf_queue: None,
720            gpu_lsf_queue: None,
721            fpga_lsf_queue: None,
722            extra_bsub_args: None,
723            max_scatter_concurrency: default_max_scatter_concurrency(),
724            apptainer_images_dir: default_apptainer_images_dir(),
725            extra_apptainer_exec_args: None,
726        }
727    }
728}
729
730impl LsfApptainerBackendConfig {
731    /// Validate that the backend is appropriately configured.
732    pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
733        if cfg!(not(unix)) {
734            bail!("LSF + Apptainer backend is not supported on non-unix platforms");
735        }
736        if !engine_config.experimental_features_enabled {
737            bail!("LSF + Apptainer backend requires enabling experimental features");
738        }
739
740        // Do what we can to validate options that are dependent on the dynamic
741        // environment. These are a bit fraught, particularly if the behavior of
742        // the external tools changes based on where a job gets dispatched, but
743        // querying from the perspective of the current node allows
744        // us to get better error messages in circumstances typical to a cluster.
745        if let Some(queue) = &self.default_lsf_queue {
746            validate_lsf_queue("default", queue).await?;
747        }
748        if let Some(queue) = &self.short_task_lsf_queue {
749            validate_lsf_queue("short_task", queue).await?;
750        }
751        if let Some(queue) = &self.gpu_lsf_queue {
752            validate_lsf_queue("gpu", queue).await?;
753        }
754        if let Some(queue) = &self.fpga_lsf_queue {
755            validate_lsf_queue("fpga", queue).await?;
756        }
757        Ok(())
758    }
759
760    /// Get the appropriate LSF queue for a task under this configuration.
761    ///
762    /// Specialized hardware requirements are prioritized over other
763    /// characteristics, with FPGA taking precedence over GPU.
764    fn lsf_queue_for_task(
765        &self,
766        requirements: &HashMap<String, Value>,
767        hints: &HashMap<String, Value>,
768    ) -> Option<&str> {
769        // TODO ACF 2025-09-26: what's the relationship between this code and
770        // `TaskExecutionConstraints`? Should this be there instead, or be pulling
771        // values from that instead of directly from `requirements` and `hints`?
772
773        // Specialized hardware gets priority.
774        if let Some(queue) = self.fpga_lsf_queue.as_deref()
775            && let Some(true) = requirements
776                .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
777                .and_then(Value::as_boolean)
778        {
779            return Some(queue);
780        }
781
782        if let Some(queue) = self.gpu_lsf_queue.as_deref()
783            && let Some(true) = requirements
784                .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
785                .and_then(Value::as_boolean)
786        {
787            return Some(queue);
788        }
789
790        // Then short tasks.
791        if let Some(queue) = self.short_task_lsf_queue.as_deref()
792            && let Some(true) = hints
793                .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
794                .and_then(Value::as_boolean)
795        {
796            return Some(queue);
797        }
798
799        // Finally the default queue. If this is `None`, `bsub` gets run without a queue
800        // argument and the cluster's default is used.
801        self.default_lsf_queue.as_deref()
802    }
803}
804
805async fn validate_lsf_queue(name: &str, queue: &str) -> Result<(), anyhow::Error> {
806    match tokio::time::timeout(
807        // 10 seconds is rather arbitrary; `bqueues` ordinarily returns extremely quickly, but we
808        // don't want things to run away on a misconfigured system
809        std::time::Duration::from_secs(10),
810        Command::new("bqueues").arg(queue).output(),
811    )
812    .await
813    {
814        Ok(output) => {
815            let output = output.context("validating LSF queue")?;
816            if !output.status.success() {
817                let stdout = String::from_utf8_lossy(&output.stdout);
818                let stderr = String::from_utf8_lossy(&output.stderr);
819                error!(%stdout, %stderr, %queue, "failed to validate {name}_lsf_queue");
820                Err(anyhow!("failed to validate {name}_lsf_queue `{queue}`"))
821            } else {
822                Ok(())
823            }
824        }
825        Err(_) => Err(anyhow!(
826            "timed out trying to validate {name}_lsf_queue `{queue}`"
827        )),
828    }
829}