1use std::collections::HashMap;
9use std::fmt::Write as _;
10use std::path::Path;
11use std::path::PathBuf;
12use std::path::absolute;
13use std::process::Stdio;
14use std::sync::Arc;
15use std::sync::Mutex;
16
17use anyhow::Context as _;
18use anyhow::Result;
19use anyhow::anyhow;
20use anyhow::bail;
21use tokio::process::Command;
22use tokio::sync::OnceCell;
23use tokio_retry2::Retry;
24use tokio_retry2::RetryError;
25use tokio_retry2::strategy::ExponentialBackoff;
26use tokio_util::sync::CancellationToken;
27use tracing::info;
28use tracing::warn;
29
30use crate::Value;
31use crate::backend::ExecuteTaskRequest;
32use crate::config::ApptainerConfig;
33use crate::config::DEFAULT_TASK_SHELL;
34use crate::v1::requirements::ContainerSource;
35
36const IMAGES_CACHE_DIR: &str = "apptainer-images";
38
39const GUEST_WORK_DIR: &str = "/mnt/task/work";
41
42const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
44
45const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
47
48const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
50
51const APPTAINER_ENV_PREFIX: &str = "APPTAINERENV";
53
54const SINGULARITY_ENV_PREFIX: &str = "SINGULARITYENV";
56
57#[derive(Debug)]
59pub struct ApptainerRuntime {
60 cache_dir: PathBuf,
62 images: Mutex<HashMap<ContainerSource, Arc<OnceCell<PathBuf>>>>,
64}
65
66impl ApptainerRuntime {
67 pub fn new(root_dir: &Path, image_cache_dir: Option<&Path>) -> Result<Self> {
73 let cache_dir = image_cache_dir
74 .map(Path::to_path_buf)
75 .unwrap_or_else(|| root_dir.join(IMAGES_CACHE_DIR));
76
77 Ok(Self {
78 cache_dir: absolute(&cache_dir).with_context(|| {
79 format!(
80 "failed to make path `{path}` absolute",
81 path = cache_dir.display()
82 )
83 })?,
84 images: Default::default(),
85 })
86 }
87
88 pub async fn generate_script(
98 &self,
99 config: &ApptainerConfig,
100 shell: Option<&str>,
101 request: &ExecuteTaskRequest<'_>,
102 token: CancellationToken,
103 ) -> Result<Option<String>> {
104 let path = match self
105 .pull_image(
106 &config.executable,
107 request
108 .constraints
109 .container
110 .as_ref()
111 .ok_or_else(|| anyhow!("task does not use a container"))?,
112 token,
113 )
114 .await?
115 {
116 Some(path) => path,
117 None => return Ok(None),
118 };
119
120 Ok(Some(
121 self.generate_apptainer_script(config, shell, &path, request)
122 .await?,
123 ))
124 }
125
126 async fn generate_apptainer_script(
132 &self,
133 config: &ApptainerConfig,
134 shell: Option<&str>,
135 container_sif: &Path,
136 request: &ExecuteTaskRequest<'_>,
137 ) -> Result<String> {
138 let container_tmp_path = request.temp_dir.join("container_tmp");
146 tokio::fs::DirBuilder::new()
147 .recursive(true)
148 .create(&container_tmp_path)
149 .await
150 .with_context(|| {
151 format!(
152 "failed to create container /tmp directory at `{path}`",
153 path = container_tmp_path.display()
154 )
155 })?;
156 let container_var_tmp_path = request.temp_dir.join("container_var_tmp");
157 tokio::fs::DirBuilder::new()
158 .recursive(true)
159 .create(&container_var_tmp_path)
160 .await
161 .with_context(|| {
162 format!(
163 "failed to create container /var/tmp directory at `{path}`",
164 path = container_var_tmp_path.display()
165 )
166 })?;
167
168 let env_prefix = if config.executable.contains("singularity") {
169 SINGULARITY_ENV_PREFIX
170 } else {
171 APPTAINER_ENV_PREFIX
172 };
173
174 let mut apptainer_command = String::new();
175 writeln!(&mut apptainer_command, "#!/usr/bin/env bash")?;
176 for (k, v) in request.env.iter() {
177 writeln!(&mut apptainer_command, "export {env_prefix}_{k}={v:?}")?;
178 }
179 writeln!(&mut apptainer_command, "{} -v exec \\", config.executable)?;
180 writeln!(&mut apptainer_command, "--pwd \"{GUEST_WORK_DIR}\" \\")?;
181 writeln!(&mut apptainer_command, "--containall --cleanenv \\")?;
182 for input in request.backend_inputs {
183 writeln!(
184 &mut apptainer_command,
185 "--mount type=bind,src=\"{host_path}\",dst=\"{guest_path}\",ro \\",
186 host_path = input
187 .local_path()
188 .ok_or_else(|| anyhow!("input not localized: {input:?}"))?
189 .display(),
190 guest_path = input
191 .guest_path()
192 .ok_or_else(|| anyhow!("guest path missing: {input:?}"))?,
193 )?;
194 }
195 writeln!(
196 &mut apptainer_command,
197 "--mount type=bind,src=\"{}\",dst=\"{GUEST_COMMAND_PATH}\",ro \\",
198 request.command_path().display()
199 )?;
200 writeln!(
201 &mut apptainer_command,
202 "--mount type=bind,src=\"{}\",dst=\"{GUEST_WORK_DIR}\" \\",
203 request.work_dir().display()
204 )?;
205 writeln!(
206 &mut apptainer_command,
207 "--mount type=bind,src=\"{}\",dst=\"/tmp\" \\",
208 container_tmp_path.display()
209 )?;
210 writeln!(
211 &mut apptainer_command,
212 "--mount type=bind,src=\"{}\",dst=\"/var/tmp\" \\",
213 container_var_tmp_path.display()
214 )?;
215 writeln!(
216 &mut apptainer_command,
217 "--mount type=bind,src=\"{}\",dst=\"{GUEST_STDOUT_PATH}\" \\",
218 request.stdout_path().display()
219 )?;
220 writeln!(
221 &mut apptainer_command,
222 "--mount type=bind,src=\"{}\",dst=\"{GUEST_STDERR_PATH}\" \\",
223 request.stderr_path().display()
224 )?;
225
226 if let Some(true) = request
227 .requirements
228 .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
229 .and_then(Value::as_boolean)
230 {
231 writeln!(&mut apptainer_command, "--nv \\")?;
232 }
233
234 for arg in config
235 .extra_apptainer_exec_args
236 .as_deref()
237 .unwrap_or_default()
238 {
239 writeln!(&mut apptainer_command, "{arg} \\")?;
240 }
241
242 writeln!(&mut apptainer_command, "\"{}\" \\", container_sif.display())?;
243 writeln!(
244 &mut apptainer_command,
245 "{shell} -c \"\\\"{GUEST_COMMAND_PATH}\\\" > \\\"{GUEST_STDOUT_PATH}\\\" 2> \
246 \\\"{GUEST_STDERR_PATH}\\\"\" \\",
247 shell = shell.unwrap_or(DEFAULT_TASK_SHELL)
248 )?;
249 let attempt_dir = request.attempt_dir;
250 let apptainer_stdout_path = attempt_dir.join("apptainer.stdout");
251 let apptainer_stderr_path = attempt_dir.join("apptainer.stderr");
252 writeln!(
253 &mut apptainer_command,
254 "> \"{stdout}\" 2> \"{stderr}\"",
255 stdout = apptainer_stdout_path.display(),
256 stderr = apptainer_stderr_path.display()
257 )?;
258 Ok(apptainer_command)
259 }
260
261 pub(crate) async fn pull_image(
270 &self,
271 executable: &str,
272 container: &ContainerSource,
273 token: CancellationToken,
274 ) -> Result<Option<PathBuf>> {
275 if let ContainerSource::SifFile(path) = container {
277 return Ok(Some(path.clone()));
278 }
279
280 if let ContainerSource::Unknown(s) = container {
282 bail!("unknown container source `{s}`");
283 }
284
285 let once = {
287 let mut map = self.images.lock().unwrap();
288 map.entry(container.clone())
289 .or_insert_with(|| Arc::new(OnceCell::new()))
290 .clone()
291 };
292
293 let pull = once.get_or_try_init(|| async move {
294 let mut path = self.cache_dir.join(container.scheme().unwrap());
297 for part in container.name().unwrap().split("/") {
298 for part in part.split(':') {
299 path.push(part);
300 }
301 }
302
303 path.add_extension("sif");
304
305 if path.exists() {
306 info!(path = %path.display(), "Apptainer image `{container:#}` already cached; using existing image");
307 return Ok(path);
308 }
309
310 if let Some(parent) = path.parent() {
311 tokio::fs::create_dir_all(parent).await.with_context(|| {
312 format!(
313 "failed to create directory `{parent}`",
314 parent = parent.display()
315 )
316 })?;
317 }
318
319 let container = format!("{container:#}");
320 let executable = executable.to_string();
321
322 Retry::spawn_notify(
323 ExponentialBackoff::from_millis(50)
329 .max_delay_millis(60_000)
330 .take(10),
331 || Self::try_pull_image(&executable, &container, &path),
332 {
333 let executable = executable.clone();
334 move |e: &anyhow::Error, _| {
335 warn!(e = %e, "`{executable} pull` failed");
336 }
337 },
338 )
339 .await
340 .with_context(|| format!("failed pulling Apptainer image `{container}`"))?;
341
342 info!(path = %path.display(), "Apptainer image `{container}` pulled successfully");
343 Ok(path)
344 });
345
346 tokio::select! {
347 _ = token.cancelled() => Ok(None),
348 res = pull => res.map(|p| Some(p.clone())),
349 }
350 }
351
352 async fn try_pull_image(
365 executable: &str,
366 image: &str,
367 path: &Path,
368 ) -> Result<(), RetryError<anyhow::Error>> {
369 info!("spawning `{executable}` to pull image `{image}`");
370
371 let child = Command::new(executable)
372 .stdin(Stdio::null())
373 .stdout(Stdio::piped())
374 .stderr(Stdio::piped())
375 .arg("pull")
376 .arg(path)
377 .arg(image)
378 .spawn()
379 .with_context(|| {
380 format!(
381 "failed to spawn `{executable} pull '{path}' '{image}'`",
382 path = path.display()
383 )
384 })
385 .map_err(RetryError::permanent)?;
387
388 let output = child
389 .wait_with_output()
390 .await
391 .context(format!("failed to wait for `{executable}`"))
392 .map_err(RetryError::permanent)?;
393 if !output.status.success() {
394 let permanent = if let Ok(stderr) = str::from_utf8(&output.stderr) {
395 let mut permanent = false;
396 let needles = ["manifest unknown", "403 (Forbidden)"];
400 for needle in needles {
401 if stderr.contains(needle) {
402 permanent = true;
403 break;
404 }
405 }
406
407 permanent
408 } else {
409 false
410 };
411
412 let e = anyhow!(
413 "`{executable}` failed: {status}: {stderr}",
414 status = output.status,
415 stderr = str::from_utf8(&output.stderr)
416 .unwrap_or("<output not UTF-8>")
417 .trim()
418 );
419 return if permanent {
420 Err(RetryError::permanent(e))
421 } else {
422 Err(RetryError::transient(e))
423 };
424 }
425
426 Ok(())
427 }
428}
429
430#[cfg(test)]
431mod tests {
432 use indexmap::IndexMap;
433 use tempfile::TempDir;
434 use url::Url;
435
436 use super::*;
437 use crate::ONE_GIBIBYTE;
438 use crate::TaskInputs;
439 use crate::backend::ExecuteTaskRequest;
440 use crate::backend::TaskExecutionConstraints;
441
442 #[tokio::test]
443 async fn example_task_generates() {
444 let root = TempDir::new().unwrap();
445
446 let mut env = IndexMap::new();
447 env.insert("FOO".to_string(), "bar".to_string());
448 env.insert("BAZ".to_string(), "\"quux\"".to_string());
449
450 let runtime = ApptainerRuntime::new(&root.path().join("runs"), None).unwrap();
451 let _ = runtime
452 .generate_script(
453 &ApptainerConfig::default(),
454 None,
455 &ExecuteTaskRequest {
456 id: "example-task",
457 command: "echo hello",
458 inputs: &TaskInputs::default(),
459 backend_inputs: &[],
460 requirements: &Default::default(),
461 hints: &Default::default(),
462 env: &env,
463 constraints: &TaskExecutionConstraints {
464 container: Some(
465 String::from(
466 Url::from_file_path(root.path().join("non-existent.sif")).unwrap(),
467 )
468 .parse()
469 .unwrap(),
470 ),
471 cpu: 1.0,
472 memory: ONE_GIBIBYTE as u64,
473 gpu: Default::default(),
474 fpga: Default::default(),
475 disks: Default::default(),
476 },
477 attempt_dir: &root.path().join("0"),
478 temp_dir: &root.path().join("temp"),
479 },
480 CancellationToken::new(),
481 )
482 .await
483 .inspect_err(|e| eprintln!("{e:#?}"))
484 .expect("example task script should generate");
485 }
486
487 #[cfg(unix)]
491 #[tokio::test]
492 async fn example_task_shellchecks() {
493 use tokio::process::Command;
494
495 let root = TempDir::new().unwrap();
496
497 let mut env = IndexMap::new();
498 env.insert("FOO".to_string(), "bar".to_string());
499 env.insert("BAZ".to_string(), "\"quux\"".to_string());
500
501 let runtime = ApptainerRuntime::new(&root.path().join("runs"), None).unwrap();
502 let script = runtime
503 .generate_script(
504 &ApptainerConfig::default(),
505 None,
506 &ExecuteTaskRequest {
507 id: "example-task",
508 command: "echo hello",
509 inputs: &TaskInputs::default(),
510 backend_inputs: &[],
511 requirements: &Default::default(),
512 hints: &Default::default(),
513 env: &env,
514 constraints: &TaskExecutionConstraints {
515 container: Some(
516 String::from(
517 Url::from_file_path(root.path().join("non-existent.sif")).unwrap(),
518 )
519 .parse()
520 .unwrap(),
521 ),
522 cpu: 1.0,
523 memory: ONE_GIBIBYTE as u64,
524 gpu: Default::default(),
525 fpga: Default::default(),
526 disks: Default::default(),
527 },
528 attempt_dir: &root.path().join("0"),
529 temp_dir: &root.path().join("temp"),
530 },
531 CancellationToken::new(),
532 )
533 .await
534 .inspect_err(|e| eprintln!("{e:#?}"))
535 .expect("example task script should generate")
536 .expect("operation should not be canceled");
537 let script_file = root.path().join("apptainer_script");
538 tokio::fs::write(&script_file, &script)
539 .await
540 .expect("can write script to disk");
541 let shellcheck_status = Command::new("shellcheck")
542 .arg("--shell=bash")
543 .arg("--severity=style")
544 .arg("--exclude=SC2140")
546 .arg(&script_file)
547 .status()
548 .await
549 .unwrap();
550 assert!(shellcheck_status.success());
551 }
552}