wdl_engine/backend/
lsf_apptainer.rs

1//! Experimental LSF + Apptainer (aka Singularity) task execution backend.
2//!
3//! This experimental backend submits each task as an LSF job which invokes
4//! Apptainer to provide the appropriate container environment for the WDL
5//! command to execute.
6//!
7//! Due to the proprietary nature of LSF, and limited ability to install
8//! Apptainer locally or in CI, this is currently tested by hand; expect (and
9//! report) bugs! In follow-up work, we hope to build a limited test suite based
10//! on mocking CLI invocations and/or golden testing of generated
11//! `bsub`/`apptainer` scripts.
12
13use std::collections::HashMap;
14use std::path::Path;
15use std::process::Stdio;
16use std::sync::Arc;
17
18use anyhow::Context as _;
19use anyhow::anyhow;
20use anyhow::bail;
21use anyhow::ensure;
22use bytesize::ByteSize;
23use crankshaft::events::Event;
24use nonempty::NonEmpty;
25use tokio::fs::File;
26use tokio::fs::{self};
27use tokio::io::AsyncBufReadExt;
28use tokio::io::BufReader;
29use tokio::process::Command;
30use tokio::sync::broadcast;
31use tokio_util::sync::CancellationToken;
32use tracing::debug;
33use tracing::error;
34use tracing::trace;
35use tracing::warn;
36
37use super::ApptainerState;
38use super::TaskExecutionBackend;
39use super::TaskManager;
40use super::TaskManagerRequest;
41use super::TaskSpawnRequest;
42use super::apptainer::ApptainerConfig;
43use crate::ONE_GIBIBYTE;
44use crate::PrimitiveValue;
45use crate::TaskExecutionResult;
46use crate::Value;
47use crate::config::Config;
48use crate::config::TaskResourceLimitBehavior;
49use crate::path::EvaluationPath;
50use crate::v1;
51
52/// The name of the file where the Apptainer command invocation will be written.
53const APPTAINER_COMMAND_FILE_NAME: &str = "apptainer_command";
54
55/// The root guest path for inputs.
56const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs/";
57
58/// The maximum length of an LSF job name.
59///
60/// See <https://www.ibm.com/docs/en/spectrum-lsf/10.1.0?topic=o-j>.
61const LSF_JOB_NAME_MAX_LENGTH: usize = 4094;
62
63/// A request to execute a task on an LSF + Apptainer backend.
64#[derive(Debug)]
65struct LsfApptainerTaskRequest {
66    /// The desired configuration of the backend.
67    backend_config: Arc<LsfApptainerBackendConfig>,
68    /// The Apptainer state for the backend,
69    apptainer_state: Arc<ApptainerState>,
70    /// The name of the task, potentially truncated to fit within the LSF job
71    /// name length limit.
72    name: String,
73    /// The task spawn request.
74    spawn_request: TaskSpawnRequest,
75    /// The requested container for the task.
76    container: String,
77    /// The requested CPU reservation for the task.
78    required_cpu: f64,
79    /// The requested memory reservation for the task.
80    required_memory: ByteSize,
81    /// The broadcast channel to update interested parties with the status of
82    /// executing tasks.
83    ///
84    /// This backend does not yet take advantage of the full Crankshaft
85    /// machinery, but we send rudimentary messages on this channel which helps
86    /// with UI presentation.
87    crankshaft_events: Option<broadcast::Sender<Event>>,
88    /// The cancellation token for this task execution request.
89    cancellation_token: CancellationToken,
90}
91
92impl TaskManagerRequest for LsfApptainerTaskRequest {
93    fn cpu(&self) -> f64 {
94        self.required_cpu
95    }
96
97    fn memory(&self) -> u64 {
98        self.required_memory.as_u64()
99    }
100
101    async fn run(self) -> anyhow::Result<super::TaskExecutionResult> {
102        let crankshaft_task_id = crankshaft::events::next_task_id();
103
104        let attempt_dir = self.spawn_request.attempt_dir();
105
106        // Create the host directory that will be mapped to the WDL working directory.
107        let wdl_work_dir = self.spawn_request.wdl_work_dir_host_path();
108        fs::create_dir_all(&wdl_work_dir).await.with_context(|| {
109            format!(
110                "failed to create WDL working directory `{path}`",
111                path = wdl_work_dir.display()
112            )
113        })?;
114
115        // Create an empty file for the WDL command's stdout.
116        let wdl_stdout_path = self.spawn_request.wdl_stdout_host_path();
117        let _ = File::create(&wdl_stdout_path).await.with_context(|| {
118            format!(
119                "failed to create WDL stdout file `{path}`",
120                path = wdl_stdout_path.display()
121            )
122        })?;
123
124        // Create an empty file for the WDL command's stderr.
125        let wdl_stderr_path = self.spawn_request.wdl_stderr_host_path();
126        let _ = File::create(&wdl_stderr_path).await.with_context(|| {
127            format!(
128                "failed to create WDL stderr file `{path}`",
129                path = wdl_stderr_path.display()
130            )
131        })?;
132
133        // Write the evaluated WDL command section to a host file.
134        let wdl_command_path = self.spawn_request.wdl_command_host_path();
135        fs::write(&wdl_command_path, self.spawn_request.command())
136            .await
137            .with_context(|| {
138                format!(
139                    "failed to write WDL command contents to `{path}`",
140                    path = wdl_command_path.display()
141                )
142            })?;
143        #[cfg(unix)]
144        fs::set_permissions(
145            &wdl_command_path,
146            <std::fs::Permissions as std::os::unix::fs::PermissionsExt>::from_mode(0o770),
147        )
148        .await?;
149
150        let apptainer_command = self
151            .apptainer_state
152            .prepare_apptainer_command(
153                &self.container,
154                self.cancellation_token.clone(),
155                &self.spawn_request,
156            )
157            .await?;
158
159        let apptainer_command_path = attempt_dir.join(APPTAINER_COMMAND_FILE_NAME);
160        fs::write(&apptainer_command_path, apptainer_command)
161            .await
162            .with_context(|| {
163                format!(
164                    "failed to write Apptainer command file `{}`",
165                    apptainer_command_path.display()
166                )
167            })?;
168        #[cfg(unix)]
169        tokio::fs::set_permissions(
170            &apptainer_command_path,
171            <std::fs::Permissions as std::os::unix::fs::PermissionsExt>::from_mode(0o770),
172        )
173        .await?;
174
175        // The path for the LSF-level stdout and stderr. This primarily contains the job
176        // report, as we redirect Apptainer and WDL output separately.
177        let lsf_stdout_path = attempt_dir.join("lsf.stdout");
178        let lsf_stderr_path = attempt_dir.join("lsf.stderr");
179
180        let mut bsub_command = Command::new("bsub");
181
182        // If an LSF queue has been configured, specify it. Otherwise, the job will end
183        // up on the cluster's default queue.
184        if let Some(queue) = self.backend_config.lsf_queue_for_task(
185            self.spawn_request.requirements(),
186            self.spawn_request.hints(),
187        ) {
188            bsub_command.arg("-q").arg(queue.name());
189        }
190
191        // If GPUs are required, pass a basic `-gpu` flag to `bsub`.
192        if let Some(n_gpu) = v1::gpu(
193            self.spawn_request.requirements(),
194            self.spawn_request.hints(),
195        ) {
196            bsub_command.arg("-gpu").arg(format!("num={n_gpu}/host"));
197        }
198
199        // Add any user-configured extra arguments.
200        if let Some(args) = &self.backend_config.extra_bsub_args {
201            bsub_command.args(args);
202        }
203
204        bsub_command
205            // Pipe stdout and stderr so we can identify when a job begins, and can trace any other
206            // output. This should just be the LSF output like `<<Waiting for dispatch ...>>`.
207            .stdout(Stdio::piped())
208            .stderr(Stdio::piped())
209            // TODO ACF 2025-09-10: make this configurable; hardcode turning off LSF email spam for
210            // now though.
211            .env("LSB_JOB_REPORT_MAIL", "N")
212            // This option makes the `bsub` invocation synchronous, so this command will not exit
213            // until the job is complete.
214            //
215            // If the number of concurrent `bsub` processes becomes a problem, we can switch this to
216            // an asynchronous model where we drop this argument, grab the job ID, and poll for it
217            // using `bjobs`.
218            .arg("-K")
219            // Name the LSF job after the task ID, which has already been shortened to fit into the
220            // LSF requirements.
221            .arg("-J")
222            .arg(&self.name)
223            // Send LSF job stdout and stderr streams to these files. Since we redirect the
224            // Apptainer invocation's stdio to separate files, this will typically amount to the LSF
225            // job report.
226            .arg("-oo")
227            .arg(lsf_stdout_path)
228            .arg("-eo")
229            .arg(lsf_stderr_path)
230            // CPU request is rounded up to the nearest whole CPU
231            .arg("-R")
232            .arg(format!(
233                "affinity[cpu({cpu})]",
234                cpu = self.required_cpu.ceil() as u64
235            ))
236            // Memory request is specified per job to avoid ambiguity on clusters which may be
237            // configured to interpret memory requests as per-core or per-task. We also use an
238            // explicit KB unit which LSF appears to interpret as base-2 kibibytes.
239            .arg("-R")
240            .arg(format!(
241                "rusage[mem={memory_kb}KB/job]",
242                memory_kb = self.required_memory.as_u64() / bytesize::KIB,
243            ))
244            .arg(apptainer_command_path);
245
246        debug!(?bsub_command, "spawning `bsub` command");
247
248        let mut bsub_child = bsub_command.spawn()?;
249
250        crankshaft::events::send_event!(
251            self.crankshaft_events,
252            crankshaft::events::Event::TaskCreated {
253                id: crankshaft_task_id,
254                name: self.name.clone(),
255                tes_id: None,
256                token: self.cancellation_token.clone(),
257            },
258        );
259
260        // Take the stdio pipes from the child process and consume them for event
261        // reporting and tracing purposes.
262        //
263        // TODO ACF 2025-09-23: drop the `-K` from `bsub` and poll status instead? Could
264        // be less intensive from a resource perspective vs having a process and
265        // two loops on the head node per task, but we should wait to observe
266        // real-world performance before complicating things.
267        let bsub_stdout = bsub_child
268            .stdout
269            .take()
270            .ok_or_else(|| anyhow!("bsub child stdout missing"))?;
271        let task_name = self.name.clone();
272        tokio::spawn(async move {
273            let mut lines = BufReader::new(bsub_stdout).lines();
274            while let Ok(Some(line)) = lines.next_line().await {
275                trace!(stdout = line, task_name);
276            }
277        });
278        let bsub_stderr = bsub_child
279            .stderr
280            .take()
281            .ok_or_else(|| anyhow!("bsub child stderr missing"))?;
282        let task_name = self.name.clone();
283        let stderr_crankshaft_events = self.crankshaft_events.clone();
284        tokio::spawn(async move {
285            let mut lines = BufReader::new(bsub_stderr).lines();
286            while let Ok(Some(line)) = lines.next_line().await {
287                if line.starts_with("<<Starting") {
288                    crankshaft::events::send_event!(
289                        stderr_crankshaft_events,
290                        crankshaft::events::Event::TaskStarted {
291                            id: crankshaft_task_id
292                        },
293                    );
294                }
295                trace!(stderr = line, task_name);
296            }
297        });
298
299        // Await the result of the `bsub` command, which will only exit on error or once
300        // the containerized command has completed.
301        let bsub_result = tokio::select! {
302            _ = self.cancellation_token.cancelled() => {
303                crankshaft::events::send_event!(
304                    self.crankshaft_events,
305                    crankshaft::events::Event::TaskCanceled {
306                        id: crankshaft_task_id
307                    },
308                );
309                Err(anyhow!("task execution cancelled"))
310            }
311            result = bsub_child.wait() => result.map_err(Into::into),
312        }?;
313
314        crankshaft::events::send_event!(
315            self.crankshaft_events,
316            crankshaft::events::Event::TaskCompleted {
317                id: crankshaft_task_id,
318                exit_statuses: NonEmpty::new(bsub_result),
319            }
320        );
321
322        Ok(TaskExecutionResult {
323            // Under normal circumstances, the exit code of `bsub -K` is the exit code of its
324            // command, and the exit code of `apptainer exec` is likewise the exit code of its
325            // command. One potential subtlety/problem here is that if `bsub` or `apptainer` exit
326            // due to an error before running the WDL command, we could be erroneously ascribing an
327            // exit code to the WDL command.
328            exit_code: bsub_result
329                .code()
330                .ok_or(anyhow!("task did not return an exit code"))?,
331            work_dir: EvaluationPath::Local(wdl_work_dir),
332            stdout: PrimitiveValue::new_file(
333                wdl_stdout_path
334                    .into_os_string()
335                    .into_string()
336                    .expect("path should be UTF-8"),
337            )
338            .into(),
339            stderr: PrimitiveValue::new_file(
340                wdl_stderr_path
341                    .into_os_string()
342                    .into_string()
343                    .expect("path should be UTF-8"),
344            )
345            .into(),
346        })
347    }
348}
349
350/// The experimental LSF + Apptainer backend.
351///
352/// See the module-level documentation for details.
353#[derive(Debug)]
354pub struct LsfApptainerBackend {
355    /// The configuration of the overall engine being executed.
356    engine_config: Arc<Config>,
357    /// The configuration of this backend.
358    backend_config: Arc<LsfApptainerBackendConfig>,
359    /// The task manager for the backend.
360    manager: TaskManager<LsfApptainerTaskRequest>,
361    /// Sender for crankshaft events.
362    crankshaft_events: Option<broadcast::Sender<Event>>,
363    /// Apptainer state.
364    apptainer_state: Arc<ApptainerState>,
365}
366
367impl LsfApptainerBackend {
368    /// Create a new backend.
369    ///
370    /// The `run_root_dir` argument should be a directory that exists for the
371    /// duration of the entire top-level evaluation. It is used to store
372    /// Apptainer images which should only be created once per container per
373    /// run.
374    pub fn new(
375        run_root_dir: &Path,
376        engine_config: Arc<Config>,
377        backend_config: Arc<LsfApptainerBackendConfig>,
378        crankshaft_events: Option<broadcast::Sender<Event>>,
379    ) -> Self {
380        let apptainer_state =
381            ApptainerState::new(&backend_config.apptainer_config, run_root_dir).into();
382        Self {
383            engine_config,
384            backend_config,
385            // TODO ACF 2025-09-11: the `MAX` values here mean that in addition to not limiting the
386            // overall number of CPU and memory used, we don't limit per-task consumption. There is
387            // potentially a path to pulling queue limits from LSF for these, but for now we just
388            // throw jobs at the cluster.
389            manager: TaskManager::new_unlimited(u64::MAX, u64::MAX),
390            crankshaft_events,
391            apptainer_state,
392        }
393    }
394}
395
396impl TaskExecutionBackend for LsfApptainerBackend {
397    fn max_concurrency(&self) -> u64 {
398        self.backend_config.max_scatter_concurrency
399    }
400
401    fn constraints(
402        &self,
403        requirements: &HashMap<String, Value>,
404        hints: &HashMap<String, Value>,
405    ) -> anyhow::Result<super::TaskExecutionConstraints> {
406        let mut required_cpu = v1::cpu(requirements);
407        let mut required_memory = ByteSize::b(v1::memory(requirements)? as u64);
408
409        // Determine whether CPU or memory limits are set for this queue, and clamp or
410        // deny them as appropriate if the limits are exceeded
411        //
412        // TODO ACF 2025-10-16: refactor so that we're not duplicating logic here (for
413        // the in-WDL `task` values) and below in `spawn` (for the actual
414        // resource request)
415        if let Some(queue) = self.backend_config.lsf_queue_for_task(requirements, hints) {
416            if let Some(max_cpu) = queue.max_cpu_per_task()
417                && required_cpu > max_cpu as f64
418            {
419                let env_specific = if self.engine_config.suppress_env_specific_output {
420                    String::new()
421                } else {
422                    format!(", but the execution backend has a maximum of {max_cpu}",)
423                };
424                match self.engine_config.task.cpu_limit_behavior {
425                    TaskResourceLimitBehavior::TryWithMax => {
426                        warn!(
427                            "task requires at least {required_cpu} CPU{s}{env_specific}",
428                            s = if required_cpu == 1.0 { "" } else { "s" },
429                        );
430                        // clamp the reported constraint to what's available
431                        required_cpu = max_cpu as f64;
432                    }
433                    TaskResourceLimitBehavior::Deny => {
434                        bail!(
435                            "task requires at least {required_cpu} CPU{s}{env_specific}",
436                            s = if required_cpu == 1.0 { "" } else { "s" },
437                        );
438                    }
439                }
440            }
441            if let Some(max_memory) = queue.max_memory_per_task()
442                && required_memory > max_memory
443            {
444                let env_specific = if self.engine_config.suppress_env_specific_output {
445                    String::new()
446                } else {
447                    format!(
448                        ", but the execution backend has a maximum of {max_memory} GiB",
449                        max_memory = max_memory.as_u64() as f64 / ONE_GIBIBYTE
450                    )
451                };
452                match self.engine_config.task.memory_limit_behavior {
453                    TaskResourceLimitBehavior::TryWithMax => {
454                        warn!(
455                            "task requires at least {required_memory} GiB of memory{env_specific}",
456                            required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
457                        );
458                        // clamp the reported constraint to what's available
459                        required_memory = max_memory;
460                    }
461                    TaskResourceLimitBehavior::Deny => {
462                        bail!(
463                            "task requires at least {required_memory} GiB of memory{env_specific}",
464                            required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
465                        );
466                    }
467                }
468            }
469        }
470        Ok(super::TaskExecutionConstraints {
471            container: Some(
472                v1::container(requirements, self.engine_config.task.container.as_deref())
473                    .into_owned(),
474            ),
475            cpu: required_cpu,
476            memory: required_memory.as_u64().try_into().unwrap_or(i64::MAX),
477            // TODO ACF 2025-10-16: these are almost certainly wrong
478            gpu: Default::default(),
479            fpga: Default::default(),
480            disks: Default::default(),
481        })
482    }
483
484    fn guest_inputs_dir(&self) -> Option<&'static str> {
485        Some(GUEST_INPUTS_DIR)
486    }
487
488    fn needs_local_inputs(&self) -> bool {
489        true
490    }
491
492    fn spawn(
493        &self,
494        request: TaskSpawnRequest,
495        cancellation_token: CancellationToken,
496    ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<TaskExecutionResult>>> {
497        let (completed_tx, completed_rx) = tokio::sync::oneshot::channel();
498
499        let requirements = request.requirements();
500        let hints = request.hints();
501
502        let container =
503            v1::container(requirements, self.engine_config.task.container.as_deref()).into_owned();
504
505        let mut required_cpu = v1::cpu(requirements);
506        let mut required_memory = ByteSize::b(v1::memory(requirements)? as u64);
507
508        // Determine whether CPU or memory limits are set for this queue, and clamp or
509        // deny them as appropriate if the limits are exceeded
510        if let Some(queue) = self.backend_config.lsf_queue_for_task(requirements, hints) {
511            if let Some(max_cpu) = queue.max_cpu_per_task()
512                && required_cpu > max_cpu as f64
513            {
514                let env_specific = if self.engine_config.suppress_env_specific_output {
515                    String::new()
516                } else {
517                    format!(", but the execution backend has a maximum of {max_cpu}",)
518                };
519                match self.engine_config.task.cpu_limit_behavior {
520                    TaskResourceLimitBehavior::TryWithMax => {
521                        warn!(
522                            "task requires at least {required_cpu} CPU{s}{env_specific}",
523                            s = if required_cpu == 1.0 { "" } else { "s" },
524                        );
525                        // clamp the reported constraint to what's available
526                        required_cpu = max_cpu as f64;
527                    }
528                    TaskResourceLimitBehavior::Deny => {
529                        bail!(
530                            "task requires at least {required_cpu} CPU{s}{env_specific}",
531                            s = if required_cpu == 1.0 { "" } else { "s" },
532                        );
533                    }
534                }
535            }
536            if let Some(max_memory) = queue.max_memory_per_task()
537                && required_memory > max_memory
538            {
539                let env_specific = if self.engine_config.suppress_env_specific_output {
540                    String::new()
541                } else {
542                    format!(
543                        ", but the execution backend has a maximum of {max_memory} GiB",
544                        max_memory = max_memory.as_u64() as f64 / ONE_GIBIBYTE
545                    )
546                };
547                match self.engine_config.task.memory_limit_behavior {
548                    TaskResourceLimitBehavior::TryWithMax => {
549                        warn!(
550                            "task requires at least {required_memory} GiB of memory{env_specific}",
551                            required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
552                        );
553                        // clamp the reported constraint to what's available
554                        required_memory = max_memory;
555                    }
556                    TaskResourceLimitBehavior::Deny => {
557                        bail!(
558                            "task requires at least {required_memory} GiB of memory{env_specific}",
559                            required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
560                        );
561                    }
562                }
563            }
564        }
565
566        // TODO ACF 2025-09-11: I don't _think_ LSF offers a hard/soft CPU limit
567        // distinction, but we could potentially use a max as part of the
568        // resource request. That would likely mean using `bsub -n min,max`
569        // syntax as it doesn't seem that `affinity` strings support ranges
570        let _max_cpu = v1::max_cpu(hints);
571        // TODO ACF 2025-09-11: set a hard memory limit with `bsub -M !`?
572        let _max_memory = v1::max_memory(hints)?.map(|i| i as u64);
573
574        // Truncate the request ID to fit in the LSF job name length limit.
575        let request_id = request.id();
576        let name = if request_id.len() > LSF_JOB_NAME_MAX_LENGTH {
577            request_id
578                .chars()
579                .take(LSF_JOB_NAME_MAX_LENGTH)
580                .collect::<String>()
581        } else {
582            request_id.to_string()
583        };
584
585        self.manager.send(
586            LsfApptainerTaskRequest {
587                backend_config: self.backend_config.clone(),
588                apptainer_state: self.apptainer_state.clone(),
589                spawn_request: request,
590                name,
591                container,
592                required_cpu,
593                required_memory,
594                crankshaft_events: self.crankshaft_events.clone(),
595                cancellation_token,
596            },
597            completed_tx,
598        );
599
600        Ok(completed_rx)
601    }
602
603    fn cleanup<'a>(
604        &'a self,
605        _work_dir: &'a EvaluationPath,
606        _token: CancellationToken,
607    ) -> Option<futures::future::BoxFuture<'a, ()>> {
608        // TODO ACF 2025-09-11: determine whether we need cleanup logic here;
609        // Apptainer's security model is fairly different from Docker so
610        // uid/gids on files shouldn't be as much of an issue, and using only
611        // `apptainer exec` means no longer-running containers to tear down
612        None
613    }
614}
615
616/// Configuration for an LSF queue.
617///
618/// Each queue can optionally have per-task CPU and memory limits set so that
619/// tasks which are too large to be scheduled on that queue will fail
620/// immediately instead of pending indefinitely. In the future, these limits may
621/// be populated or validated by live information from the cluster, but
622/// for now they must be manually based on the user's understanding of the
623/// cluster configuration.
624#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
625pub struct LsfQueueConfig {
626    /// The name of the queue; this is the string passed to `bsub -q
627    /// <queue_name>`.
628    name: String,
629    /// The maximum number of CPUs this queue can provision for a single task.
630    max_cpu_per_task: Option<u64>,
631    /// The maximum memory this queue can provision for a single task.
632    max_memory_per_task: Option<ByteSize>,
633}
634
635impl LsfQueueConfig {
636    /// Create an [`LsfQueueConfig`].
637    pub fn new(
638        name: String,
639        max_cpu_per_task: Option<u64>,
640        max_memory_per_task: Option<ByteSize>,
641    ) -> Self {
642        Self {
643            name,
644            max_cpu_per_task,
645            max_memory_per_task,
646        }
647    }
648
649    /// The name of the queue; this is the string passed to `bsub -q
650    /// <queue_name>`.
651    pub fn name(&self) -> &str {
652        &self.name
653    }
654
655    /// The maximum number of CPUs this queue can provision for a single task.
656    pub fn max_cpu_per_task(&self) -> Option<u64> {
657        self.max_cpu_per_task
658    }
659
660    /// The maximum memory this queue can provision for a single task.
661    pub fn max_memory_per_task(&self) -> Option<ByteSize> {
662        self.max_memory_per_task
663    }
664
665    /// Validate that this LSF queue exists according to the local `bqueues`.
666    async fn validate(&self, name: &str) -> Result<(), anyhow::Error> {
667        let queue = self.name();
668        ensure!(!queue.is_empty(), "{name}_lsf_queue name cannot be empty");
669        if let Some(max_cpu_per_task) = self.max_cpu_per_task() {
670            ensure!(
671                max_cpu_per_task > 0,
672                "{name}_lsf_queue `{queue}` must allow at least 1 CPU to be provisioned"
673            );
674        }
675        if let Some(max_memory_per_task) = self.max_memory_per_task() {
676            ensure!(
677                max_memory_per_task.as_u64() > 0,
678                "{name}_lsf_queue `{queue}` must allow at least some memory to be provisioned"
679            );
680        }
681        match tokio::time::timeout(
682            // 10 seconds is rather arbitrary; `bqueues` ordinarily returns extremely quickly, but
683            // we don't want things to run away on a misconfigured system
684            std::time::Duration::from_secs(10),
685            Command::new("bqueues").arg(queue).output(),
686        )
687        .await
688        {
689            Ok(output) => {
690                let output = output.context("validating LSF queue")?;
691                if !output.status.success() {
692                    let stdout = String::from_utf8_lossy(&output.stdout);
693                    let stderr = String::from_utf8_lossy(&output.stderr);
694                    error!(%stdout, %stderr, %queue, "failed to validate {name}_lsf_queue");
695                    Err(anyhow!("failed to validate {name}_lsf_queue `{queue}`"))
696                } else {
697                    Ok(())
698                }
699            }
700            Err(_) => Err(anyhow!(
701                "timed out trying to validate {name}_lsf_queue `{queue}`"
702            )),
703        }
704    }
705}
706
707/// Configuration for the LSF + Apptainer backend.
708// TODO ACF 2025-09-12: add queue option for short tasks
709//
710// TODO ACF 2025-09-23: add a Apptainer/Singularity mode config that switches around executable
711// name, env var names, etc.
712#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
713pub struct LsfApptainerBackendConfig {
714    /// Which queue, if any, to specify when submitting normal jobs to LSF.
715    ///
716    /// This may be superseded by
717    /// [`short_task_lsf_queue`][Self::short_task_lsf_queue],
718    /// [`gpu_lsf_queue`][Self::gpu_lsf_queue], or
719    /// [`fpga_lsf_queue`][Self::fpga_lsf_queue] for corresponding tasks.
720    pub default_lsf_queue: Option<LsfQueueConfig>,
721    /// Which queue, if any, to specify when submitting [short
722    /// tasks](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#short_task) to LSF.
723    ///
724    /// This may be superseded by [`gpu_lsf_queue`][Self::gpu_lsf_queue] or
725    /// [`fpga_lsf_queue`][Self::fpga_lsf_queue] for tasks which require
726    /// specialized hardware.
727    pub short_task_lsf_queue: Option<LsfQueueConfig>,
728    /// Which queue, if any, to specify when submitting [tasks which require a
729    /// GPU](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#hardware-accelerators-gpu-and--fpga)
730    /// to LSF.
731    pub gpu_lsf_queue: Option<LsfQueueConfig>,
732    /// Which queue, if any, to specify when submitting [tasks which require an
733    /// FPGA](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#hardware-accelerators-gpu-and--fpga)
734    /// to LSF.
735    pub fpga_lsf_queue: Option<LsfQueueConfig>,
736    /// Additional command-line arguments to pass to `bsub` when submitting jobs
737    /// to LSF.
738    pub extra_bsub_args: Option<Vec<String>>,
739    /// The maximum number of scatter subtasks that can be evaluated
740    /// concurrently.
741    ///
742    /// By default, this is 200.
743    #[serde(default = "default_max_scatter_concurrency")]
744    pub max_scatter_concurrency: u64,
745    /// The configuration of Apptainer, which is used as the container runtime
746    /// on the compute nodes where LSF dispatches tasks.
747    ///
748    /// Note that this will likely be replaced by an abstraction over multiple
749    /// container execution runtimes in the future, rather than being
750    /// hardcoded to Apptainer.
751    #[serde(default)]
752    // TODO ACF 2025-10-16: temporarily flatten this into the overall config so that it doesn't
753    // break existing serialized configs. We'll save breaking the config file format for when we
754    // actually have meaningful composition of in-place runtimes.
755    #[serde(flatten)]
756    pub apptainer_config: ApptainerConfig,
757}
758
759fn default_max_scatter_concurrency() -> u64 {
760    200
761}
762
763impl Default for LsfApptainerBackendConfig {
764    fn default() -> Self {
765        Self {
766            default_lsf_queue: None,
767            short_task_lsf_queue: None,
768            gpu_lsf_queue: None,
769            fpga_lsf_queue: None,
770            extra_bsub_args: None,
771            max_scatter_concurrency: default_max_scatter_concurrency(),
772            apptainer_config: ApptainerConfig::default(),
773        }
774    }
775}
776
777impl LsfApptainerBackendConfig {
778    /// Validate that the backend is appropriately configured.
779    pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
780        if cfg!(not(unix)) {
781            bail!("LSF + Apptainer backend is not supported on non-unix platforms");
782        }
783        if !engine_config.experimental_features_enabled {
784            bail!("LSF + Apptainer backend requires enabling experimental features");
785        }
786
787        // Do what we can to validate options that are dependent on the dynamic
788        // environment. These are a bit fraught, particularly if the behavior of
789        // the external tools changes based on where a job gets dispatched, but
790        // querying from the perspective of the current node allows
791        // us to get better error messages in circumstances typical to a cluster.
792        if let Some(queue) = self.default_lsf_queue.as_ref() {
793            queue.validate("default").await?;
794        }
795        if let Some(queue) = self.short_task_lsf_queue.as_ref() {
796            queue.validate("short_task").await?;
797        }
798        if let Some(queue) = self.gpu_lsf_queue.as_ref() {
799            queue.validate("gpu").await?;
800        }
801        if let Some(queue) = self.fpga_lsf_queue.as_ref() {
802            queue.validate("fpga").await?;
803        }
804
805        self.apptainer_config.validate().await?;
806
807        Ok(())
808    }
809
810    /// Get the appropriate LSF queue for a task under this configuration.
811    ///
812    /// Specialized hardware requirements are prioritized over other
813    /// characteristics, with FPGA taking precedence over GPU.
814    fn lsf_queue_for_task(
815        &self,
816        requirements: &HashMap<String, Value>,
817        hints: &HashMap<String, Value>,
818    ) -> Option<&LsfQueueConfig> {
819        // TODO ACF 2025-09-26: what's the relationship between this code and
820        // `TaskExecutionConstraints`? Should this be there instead, or be pulling
821        // values from that instead of directly from `requirements` and `hints`?
822
823        // Specialized hardware gets priority.
824        if let Some(queue) = self.fpga_lsf_queue.as_ref()
825            && let Some(true) = requirements
826                .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
827                .and_then(Value::as_boolean)
828        {
829            return Some(queue);
830        }
831
832        if let Some(queue) = self.gpu_lsf_queue.as_ref()
833            && let Some(true) = requirements
834                .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
835                .and_then(Value::as_boolean)
836        {
837            return Some(queue);
838        }
839
840        // Then short tasks.
841        if let Some(queue) = self.short_task_lsf_queue.as_ref()
842            && let Some(true) = hints
843                .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
844                .and_then(Value::as_boolean)
845        {
846            return Some(queue);
847        }
848
849        // Finally the default queue. If this is `None`, `bsub` gets run without a queue
850        // argument and the cluster's default is used.
851        self.default_lsf_queue.as_ref()
852    }
853}