Skip to main content

wdl_engine/backend/
apptainer.rs

1//! Support for using Apptainer (a.k.a. Singularity) container runtime.
2//!
3//! There are two primary responsibilities of this module: `.sif` image cache
4//! management and command script generation.
5//!
6//! The entrypoint for both of these is [`ApptainerRuntime::generate_script`].
7
8use std::collections::HashMap;
9use std::fmt::Write as _;
10use std::path::Path;
11use std::path::PathBuf;
12use std::path::absolute;
13use std::process::Stdio;
14use std::sync::Arc;
15use std::sync::Mutex;
16
17use anyhow::Context as _;
18use anyhow::Result;
19use anyhow::anyhow;
20use anyhow::bail;
21use tokio::process::Command;
22use tokio::sync::OnceCell;
23use tokio_retry2::Retry;
24use tokio_retry2::RetryError;
25use tokio_retry2::strategy::ExponentialBackoff;
26use tokio_util::sync::CancellationToken;
27use tracing::info;
28use tracing::warn;
29
30use crate::Value;
31use crate::backend::ExecuteTaskRequest;
32use crate::config::ApptainerConfig;
33use crate::config::DEFAULT_TASK_SHELL;
34use crate::v1::requirements::ContainerSource;
35
36/// The name of the images cache directory.
37const IMAGES_CACHE_DIR: &str = "apptainer-images";
38
39/// The guest working directory.
40const GUEST_WORK_DIR: &str = "/mnt/task/work";
41
42/// The guest path for the command file.
43const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
44
45/// The path to the container's stdout.
46const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
47
48/// The path to the container's stderr.
49const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
50
51/// The environment variable prefix for Apptainer.
52const APPTAINER_ENV_PREFIX: &str = "APPTAINERENV";
53
54/// The environment variable prefix for Singularity.
55const SINGULARITY_ENV_PREFIX: &str = "SINGULARITYENV";
56
57/// Represents the Apptainer container runtime.
58#[derive(Debug)]
59pub struct ApptainerRuntime {
60    /// The cache directory for `.sif` images.
61    cache_dir: PathBuf,
62    /// The map of container source to `.sif` path.
63    images: Mutex<HashMap<ContainerSource, Arc<OnceCell<PathBuf>>>>,
64}
65
66impl ApptainerRuntime {
67    /// Creates a new [`ApptainerRuntime`] with the specified root directory.
68    ///
69    /// If `image_cache_dir` is provided, it is used as the directory for
70    /// caching `.sif` images. Otherwise, a default subdirectory is created
71    /// within the given root.
72    pub fn new(root_dir: &Path, image_cache_dir: Option<&Path>) -> Result<Self> {
73        let cache_dir = image_cache_dir
74            .map(Path::to_path_buf)
75            .unwrap_or_else(|| root_dir.join(IMAGES_CACHE_DIR));
76
77        Ok(Self {
78            cache_dir: absolute(&cache_dir).with_context(|| {
79                format!(
80                    "failed to make path `{path}` absolute",
81                    path = cache_dir.display()
82                )
83            })?,
84            images: Default::default(),
85        })
86    }
87
88    /// Generates the script to run the given task using the Apptainer runtime.
89    ///
90    /// # Shared filesystem assumptions
91    ///
92    /// The returned script should be run in an environment that shares a
93    /// filesystem with the environment where this method is invoked, except
94    /// for node-specific mounts like `/tmp` and `/var`. This assumption
95    /// typically holds on HPC systems with shared filesystems like Lustre or
96    /// GPFS.
97    pub async fn generate_script(
98        &self,
99        config: &ApptainerConfig,
100        shell: Option<&str>,
101        request: &ExecuteTaskRequest<'_>,
102        token: CancellationToken,
103    ) -> Result<Option<String>> {
104        let path = match self
105            .pull_image(
106                &config.executable,
107                request
108                    .constraints
109                    .container
110                    .as_ref()
111                    .ok_or_else(|| anyhow!("task does not use a container"))?,
112                token,
113            )
114            .await?
115        {
116            Some(path) => path,
117            None => return Ok(None),
118        };
119
120        Ok(Some(
121            self.generate_apptainer_script(config, shell, &path, request)
122                .await?,
123        ))
124    }
125
126    /// Generate the script, given a container path that's already assumed to be
127    /// populated.
128    ///
129    /// This is a separate method in order to facilitate testing, and should not
130    /// be called from outside this module.
131    async fn generate_apptainer_script(
132        &self,
133        config: &ApptainerConfig,
134        shell: Option<&str>,
135        container_sif: &Path,
136        request: &ExecuteTaskRequest<'_>,
137    ) -> Result<String> {
138        // Create a temp dir for the container's execution within the attempt dir
139        // hierarchy. On many HPC systems, `/tmp` is mapped to a relatively
140        // small, local scratch disk that can fill up easily. Mapping the
141        // container's `/tmp` and `/var/tmp` paths to the filesystem we're using
142        // for other inputs and outputs prevents this from being a capacity problem,
143        // though potentially at the expense of execution speed if the
144        // non-`/tmp` filesystem is significantly slower.
145        let container_tmp_path = request.temp_dir.join("container_tmp");
146        tokio::fs::DirBuilder::new()
147            .recursive(true)
148            .create(&container_tmp_path)
149            .await
150            .with_context(|| {
151                format!(
152                    "failed to create container /tmp directory at `{path}`",
153                    path = container_tmp_path.display()
154                )
155            })?;
156        let container_var_tmp_path = request.temp_dir.join("container_var_tmp");
157        tokio::fs::DirBuilder::new()
158            .recursive(true)
159            .create(&container_var_tmp_path)
160            .await
161            .with_context(|| {
162                format!(
163                    "failed to create container /var/tmp directory at `{path}`",
164                    path = container_var_tmp_path.display()
165                )
166            })?;
167
168        let env_prefix = if config.executable.contains("singularity") {
169            SINGULARITY_ENV_PREFIX
170        } else {
171            APPTAINER_ENV_PREFIX
172        };
173
174        let mut apptainer_command = String::new();
175        writeln!(&mut apptainer_command, "#!/usr/bin/env bash")?;
176        for (k, v) in request.env.iter() {
177            writeln!(&mut apptainer_command, "export {env_prefix}_{k}={v:?}")?;
178        }
179        writeln!(&mut apptainer_command, "{} -v exec \\", config.executable)?;
180        writeln!(&mut apptainer_command, "--pwd \"{GUEST_WORK_DIR}\" \\")?;
181        writeln!(&mut apptainer_command, "--containall --cleanenv \\")?;
182        for input in request.backend_inputs {
183            writeln!(
184                &mut apptainer_command,
185                "--mount type=bind,src=\"{host_path}\",dst=\"{guest_path}\",ro \\",
186                host_path = input
187                    .local_path()
188                    .ok_or_else(|| anyhow!("input not localized: {input:?}"))?
189                    .display(),
190                guest_path = input
191                    .guest_path()
192                    .ok_or_else(|| anyhow!("guest path missing: {input:?}"))?,
193            )?;
194        }
195        writeln!(
196            &mut apptainer_command,
197            "--mount type=bind,src=\"{}\",dst=\"{GUEST_COMMAND_PATH}\",ro \\",
198            request.command_path().display()
199        )?;
200        writeln!(
201            &mut apptainer_command,
202            "--mount type=bind,src=\"{}\",dst=\"{GUEST_WORK_DIR}\" \\",
203            request.work_dir().display()
204        )?;
205        writeln!(
206            &mut apptainer_command,
207            "--mount type=bind,src=\"{}\",dst=\"/tmp\" \\",
208            container_tmp_path.display()
209        )?;
210        writeln!(
211            &mut apptainer_command,
212            "--mount type=bind,src=\"{}\",dst=\"/var/tmp\" \\",
213            container_var_tmp_path.display()
214        )?;
215        writeln!(
216            &mut apptainer_command,
217            "--mount type=bind,src=\"{}\",dst=\"{GUEST_STDOUT_PATH}\" \\",
218            request.stdout_path().display()
219        )?;
220        writeln!(
221            &mut apptainer_command,
222            "--mount type=bind,src=\"{}\",dst=\"{GUEST_STDERR_PATH}\" \\",
223            request.stderr_path().display()
224        )?;
225
226        if let Some(true) = request
227            .requirements
228            .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
229            .and_then(Value::as_boolean)
230        {
231            writeln!(&mut apptainer_command, "--nv \\")?;
232        }
233
234        for arg in config
235            .extra_apptainer_exec_args
236            .as_deref()
237            .unwrap_or_default()
238        {
239            writeln!(&mut apptainer_command, "{arg} \\")?;
240        }
241
242        writeln!(&mut apptainer_command, "\"{}\" \\", container_sif.display())?;
243        writeln!(
244            &mut apptainer_command,
245            "{shell} -c \"\\\"{GUEST_COMMAND_PATH}\\\" > \\\"{GUEST_STDOUT_PATH}\\\" 2> \
246             \\\"{GUEST_STDERR_PATH}\\\"\" \\",
247            shell = shell.unwrap_or(DEFAULT_TASK_SHELL)
248        )?;
249        let attempt_dir = request.attempt_dir;
250        let apptainer_stdout_path = attempt_dir.join("apptainer.stdout");
251        let apptainer_stderr_path = attempt_dir.join("apptainer.stderr");
252        writeln!(
253            &mut apptainer_command,
254            "> \"{stdout}\" 2> \"{stderr}\"",
255            stdout = apptainer_stdout_path.display(),
256            stderr = apptainer_stderr_path.display()
257        )?;
258        Ok(apptainer_command)
259    }
260
261    /// Pulls the image for the given container source and returns the path to
262    /// the image file (SIF).
263    ///
264    /// If the container source is already a SIF file, the given source path is
265    /// returned.
266    ///
267    /// If the image has already been pulled, the pull is skipped and the path
268    /// to the previous location is returned.
269    pub(crate) async fn pull_image(
270        &self,
271        executable: &str,
272        container: &ContainerSource,
273        token: CancellationToken,
274    ) -> Result<Option<PathBuf>> {
275        // For local SIF files, return the path directly.
276        if let ContainerSource::SifFile(path) = container {
277            return Ok(Some(path.clone()));
278        }
279
280        // For unknown container sources, error early.
281        if let ContainerSource::Unknown(s) = container {
282            bail!("unknown container source `{s}`");
283        }
284
285        // For registry-based images, pull and cache.
286        let once = {
287            let mut map = self.images.lock().unwrap();
288            map.entry(container.clone())
289                .or_insert_with(|| Arc::new(OnceCell::new()))
290                .clone()
291        };
292
293        let pull = once.get_or_try_init(|| async move {
294            // SAFETY: the next two `unwrap` calls are safe because the source can't be a
295            // file or an unknown source at this point
296            let mut path = self.cache_dir.join(container.scheme().unwrap());
297            for part in container.name().unwrap().split("/") {
298                for part in part.split(':') {
299                    path.push(part);
300                }
301            }
302
303            path.add_extension("sif");
304
305            if path.exists() {
306                info!(path = %path.display(), "Apptainer image `{container:#}` already cached; using existing image");
307                return Ok(path);
308            }
309
310            if let Some(parent) = path.parent() {
311                tokio::fs::create_dir_all(parent).await.with_context(|| {
312                    format!(
313                        "failed to create directory `{parent}`",
314                        parent = parent.display()
315                    )
316                })?;
317            }
318
319            let container = format!("{container:#}");
320            let executable = executable.to_string();
321
322            Retry::spawn_notify(
323                // TODO ACF 2025-09-22: configure the retry behavior based on actual experience
324                // with flakiness of the container registries. This is a
325                // finger-in-the-wind guess at some reasonable parameters that
326                // shouldn't lead to us making our own problems worse by
327                // overwhelming registries with repeated retries.
328                ExponentialBackoff::from_millis(50)
329                    .max_delay_millis(60_000)
330                    .take(10),
331                || Self::try_pull_image(&executable, &container, &path),
332                {
333                    let executable = executable.clone();
334                    move |e: &anyhow::Error, _| {
335                        warn!(e = %e, "`{executable} pull` failed");
336                    }
337                },
338            )
339            .await
340            .with_context(|| format!("failed pulling Apptainer image `{container}`"))?;
341
342            info!(path = %path.display(), "Apptainer image `{container}` pulled successfully");
343            Ok(path)
344        });
345
346        tokio::select! {
347            _ = token.cancelled() => Ok(None),
348            res = pull => res.map(|p| Some(p.clone())),
349        }
350    }
351
352    /// Tries to pull an image.  
353    ///
354    /// The tricky thing about this function is determining whether a failure is
355    /// transient or permanent. When in doubt, choose transient; the downside is
356    /// a permanent failure may take longer to finally bring down an
357    /// execution, but this is better for a long-running task than letting a
358    /// transient failure bring it down before a retry.
359    ///
360    /// `apptainer pull` doesn't have a well-defined interface for us to tell
361    /// whether a failure is transient, but as we gain experience recognizing
362    /// its output patterns, we can enhance the fidelity of the error
363    /// handling.
364    async fn try_pull_image(
365        executable: &str,
366        image: &str,
367        path: &Path,
368    ) -> Result<(), RetryError<anyhow::Error>> {
369        info!("spawning `{executable}` to pull image `{image}`");
370
371        let child = Command::new(executable)
372            .stdin(Stdio::null())
373            .stdout(Stdio::piped())
374            .stderr(Stdio::piped())
375            .arg("pull")
376            .arg(path)
377            .arg(image)
378            .spawn()
379            .with_context(|| {
380                format!(
381                    "failed to spawn `{executable} pull '{path}' '{image}'`",
382                    path = path.display()
383                )
384            })
385            // If the system can't handle spawning a process, we're better off failing quickly
386            .map_err(RetryError::permanent)?;
387
388        let output = child
389            .wait_with_output()
390            .await
391            .context(format!("failed to wait for `{executable}`"))
392            .map_err(RetryError::permanent)?;
393        if !output.status.success() {
394            let permanent = if let Ok(stderr) = str::from_utf8(&output.stderr) {
395                let mut permanent = false;
396                // A collection of strings observed in `apptainer pull` stderr in unrecoverable
397                // conditions. Finding one of these in the output marks the attempt as a
398                // permanent failure.
399                let needles = ["manifest unknown", "403 (Forbidden)"];
400                for needle in needles {
401                    if stderr.contains(needle) {
402                        permanent = true;
403                        break;
404                    }
405                }
406
407                permanent
408            } else {
409                false
410            };
411
412            let e = anyhow!(
413                "`{executable}` failed: {status}: {stderr}",
414                status = output.status,
415                stderr = str::from_utf8(&output.stderr)
416                    .unwrap_or("<output not UTF-8>")
417                    .trim()
418            );
419            return if permanent {
420                Err(RetryError::permanent(e))
421            } else {
422                Err(RetryError::transient(e))
423            };
424        }
425
426        Ok(())
427    }
428}
429
430#[cfg(test)]
431mod tests {
432    use indexmap::IndexMap;
433    use tempfile::TempDir;
434    use url::Url;
435
436    use super::*;
437    use crate::ONE_GIBIBYTE;
438    use crate::TaskInputs;
439    use crate::backend::ExecuteTaskRequest;
440    use crate::backend::TaskExecutionConstraints;
441
442    #[tokio::test]
443    async fn example_task_generates() {
444        let root = TempDir::new().unwrap();
445
446        let mut env = IndexMap::new();
447        env.insert("FOO".to_string(), "bar".to_string());
448        env.insert("BAZ".to_string(), "\"quux\"".to_string());
449
450        let runtime = ApptainerRuntime::new(&root.path().join("runs"), None).unwrap();
451        let _ = runtime
452            .generate_script(
453                &ApptainerConfig::default(),
454                None,
455                &ExecuteTaskRequest {
456                    id: "example-task",
457                    command: "echo hello",
458                    inputs: &TaskInputs::default(),
459                    backend_inputs: &[],
460                    requirements: &Default::default(),
461                    hints: &Default::default(),
462                    env: &env,
463                    constraints: &TaskExecutionConstraints {
464                        container: Some(
465                            String::from(
466                                Url::from_file_path(root.path().join("non-existent.sif")).unwrap(),
467                            )
468                            .parse()
469                            .unwrap(),
470                        ),
471                        cpu: 1.0,
472                        memory: ONE_GIBIBYTE as u64,
473                        gpu: Default::default(),
474                        fpga: Default::default(),
475                        disks: Default::default(),
476                    },
477                    attempt_dir: &root.path().join("0"),
478                    temp_dir: &root.path().join("temp"),
479                },
480                CancellationToken::new(),
481            )
482            .await
483            .inspect_err(|e| eprintln!("{e:#?}"))
484            .expect("example task script should generate");
485    }
486
487    // `shellcheck` works quite differently on Windows, and since we're not going to
488    // run Apptainer on Windows anytime soon, we limit this test to Unixy
489    // systems
490    #[cfg(unix)]
491    #[tokio::test]
492    async fn example_task_shellchecks() {
493        use tokio::process::Command;
494
495        let root = TempDir::new().unwrap();
496
497        let mut env = IndexMap::new();
498        env.insert("FOO".to_string(), "bar".to_string());
499        env.insert("BAZ".to_string(), "\"quux\"".to_string());
500
501        let runtime = ApptainerRuntime::new(&root.path().join("runs"), None).unwrap();
502        let script = runtime
503            .generate_script(
504                &ApptainerConfig::default(),
505                None,
506                &ExecuteTaskRequest {
507                    id: "example-task",
508                    command: "echo hello",
509                    inputs: &TaskInputs::default(),
510                    backend_inputs: &[],
511                    requirements: &Default::default(),
512                    hints: &Default::default(),
513                    env: &env,
514                    constraints: &TaskExecutionConstraints {
515                        container: Some(
516                            String::from(
517                                Url::from_file_path(root.path().join("non-existent.sif")).unwrap(),
518                            )
519                            .parse()
520                            .unwrap(),
521                        ),
522                        cpu: 1.0,
523                        memory: ONE_GIBIBYTE as u64,
524                        gpu: Default::default(),
525                        fpga: Default::default(),
526                        disks: Default::default(),
527                    },
528                    attempt_dir: &root.path().join("0"),
529                    temp_dir: &root.path().join("temp"),
530                },
531                CancellationToken::new(),
532            )
533            .await
534            .inspect_err(|e| eprintln!("{e:#?}"))
535            .expect("example task script should generate")
536            .expect("operation should not be canceled");
537        let script_file = root.path().join("apptainer_script");
538        tokio::fs::write(&script_file, &script)
539            .await
540            .expect("can write script to disk");
541        let shellcheck_status = Command::new("shellcheck")
542            .arg("--shell=bash")
543            .arg("--severity=style")
544            // all the quotes in the generated `--mount` args look suspicious but are okay
545            .arg("--exclude=SC2140")
546            .arg(&script_file)
547            .status()
548            .await
549            .unwrap();
550        assert!(shellcheck_status.success());
551    }
552}