use std::collections::HashMap;
use std::fmt::Write as _;
use std::path::Path;
use std::path::PathBuf;
use std::path::absolute;
use std::process::Stdio;
use std::sync::Arc;
use std::sync::Mutex;
use anyhow::Context as _;
use anyhow::Result;
use anyhow::anyhow;
use anyhow::bail;
use tokio::process::Command;
use tokio::sync::OnceCell;
use tokio_retry2::Retry;
use tokio_retry2::RetryError;
use tokio_retry2::strategy::ExponentialBackoff;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::warn;
use crate::Value;
use crate::backend::ExecuteTaskRequest;
use crate::backend::PullResults;
use crate::config::ApptainerConfig;
use crate::v1::requirements::ContainerSource;
const IMAGES_CACHE_DIR: &str = "apptainer-images";
const GUEST_WORK_DIR: &str = "/mnt/task/work";
const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
const APPTAINER_ENV_PREFIX: &str = "APPTAINERENV";
const SINGULARITY_ENV_PREFIX: &str = "SINGULARITYENV";
#[derive(Debug)]
pub struct ApptainerRuntime {
cache_dir: PathBuf,
images: Mutex<HashMap<ContainerSource, Arc<OnceCell<PathBuf>>>>,
}
impl ApptainerRuntime {
pub fn new(root_dir: &Path, image_cache_dir: Option<&Path>) -> Result<Self> {
let cache_dir = image_cache_dir
.map(Path::to_path_buf)
.unwrap_or_else(|| root_dir.join(IMAGES_CACHE_DIR));
Ok(Self {
cache_dir: absolute(&cache_dir).with_context(|| {
format!(
"failed to make path `{path}` absolute",
path = cache_dir.display()
)
})?,
images: Default::default(),
})
}
pub async fn generate_script(
&self,
config: &ApptainerConfig,
shell: &str,
request: &ExecuteTaskRequest<'_>,
token: CancellationToken,
) -> Result<Option<(String, ContainerSource)>> {
let results = match self
.pull_first_available_image(
&config.executable,
request
.constraints
.container
.as_deref()
.ok_or_else(|| anyhow!("task does not use a container"))?,
token,
)
.await
{
Some(results) => results,
None => return Ok(None),
};
let (container, path) = results
.successful_container()
.ok_or_else(|| anyhow!("{results}"))?;
let container = container.clone();
let path = path.clone();
Ok(Some((
self.generate_apptainer_script(config, shell, &path, request)
.await?,
container,
)))
}
async fn generate_apptainer_script(
&self,
config: &ApptainerConfig,
shell: &str,
container_sif: &Path,
request: &ExecuteTaskRequest<'_>,
) -> Result<String> {
let container_tmp_path = request.temp_dir.join("container_tmp");
tokio::fs::DirBuilder::new()
.recursive(true)
.create(&container_tmp_path)
.await
.with_context(|| {
format!(
"failed to create container /tmp directory at `{path}`",
path = container_tmp_path.display()
)
})?;
let container_var_tmp_path = request.temp_dir.join("container_var_tmp");
tokio::fs::DirBuilder::new()
.recursive(true)
.create(&container_var_tmp_path)
.await
.with_context(|| {
format!(
"failed to create container /var/tmp directory at `{path}`",
path = container_var_tmp_path.display()
)
})?;
let env_prefix = if config.executable.contains("singularity") {
SINGULARITY_ENV_PREFIX
} else {
APPTAINER_ENV_PREFIX
};
let mut apptainer_command = String::new();
writeln!(&mut apptainer_command, "#!/usr/bin/env bash")?;
for (k, v) in request.env.iter() {
writeln!(&mut apptainer_command, "export {env_prefix}_{k}={v:?}")?;
}
writeln!(&mut apptainer_command, "{} -v exec \\", config.executable)?;
writeln!(&mut apptainer_command, "--pwd \"{GUEST_WORK_DIR}\" \\")?;
writeln!(&mut apptainer_command, "--containall --cleanenv \\")?;
for input in request.backend_inputs {
writeln!(
&mut apptainer_command,
"--mount type=bind,src=\"{host_path}\",dst=\"{guest_path}\",ro \\",
host_path = input
.local_path()
.ok_or_else(|| anyhow!("input not localized: {input:?}"))?
.display(),
guest_path = input
.guest_path()
.ok_or_else(|| anyhow!("guest path missing: {input:?}"))?,
)?;
}
writeln!(
&mut apptainer_command,
"--mount type=bind,src=\"{}\",dst=\"{GUEST_COMMAND_PATH}\",ro \\",
request.command_path().display()
)?;
writeln!(
&mut apptainer_command,
"--mount type=bind,src=\"{}\",dst=\"{GUEST_WORK_DIR}\" \\",
request.work_dir().display()
)?;
writeln!(
&mut apptainer_command,
"--mount type=bind,src=\"{}\",dst=\"/tmp\" \\",
container_tmp_path.display()
)?;
writeln!(
&mut apptainer_command,
"--mount type=bind,src=\"{}\",dst=\"/var/tmp\" \\",
container_var_tmp_path.display()
)?;
writeln!(
&mut apptainer_command,
"--mount type=bind,src=\"{}\",dst=\"{GUEST_STDOUT_PATH}\" \\",
request.stdout_path().display()
)?;
writeln!(
&mut apptainer_command,
"--mount type=bind,src=\"{}\",dst=\"{GUEST_STDERR_PATH}\" \\",
request.stderr_path().display()
)?;
if let Some(true) = request
.requirements
.get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
.and_then(Value::as_boolean)
{
writeln!(&mut apptainer_command, "--nv \\")?;
}
for arg in config
.extra_apptainer_exec_args
.as_deref()
.unwrap_or_default()
{
writeln!(&mut apptainer_command, "{arg} \\")?;
}
writeln!(&mut apptainer_command, "\"{}\" \\", container_sif.display())?;
writeln!(
&mut apptainer_command,
"{shell} -c \"\\\"{GUEST_COMMAND_PATH}\\\" > \\\"{GUEST_STDOUT_PATH}\\\" 2> \
\\\"{GUEST_STDERR_PATH}\\\"\" \\"
)?;
let attempt_dir = request.attempt_dir;
let apptainer_stdout_path = attempt_dir.join("apptainer.stdout");
let apptainer_stderr_path = attempt_dir.join("apptainer.stderr");
writeln!(
&mut apptainer_command,
"> \"{stdout}\" 2> \"{stderr}\"",
stdout = apptainer_stdout_path.display(),
stderr = apptainer_stderr_path.display()
)?;
Ok(apptainer_command)
}
pub(crate) async fn pull_image(
&self,
executable: &str,
container: &ContainerSource,
token: CancellationToken,
) -> Result<Option<PathBuf>> {
if let ContainerSource::SifFile(path) = container {
return Ok(Some(path.clone()));
}
if let ContainerSource::Unknown(s) = container {
bail!("unknown container source `{s}`");
}
let once = {
let mut map = self.images.lock().unwrap();
map.entry(container.clone())
.or_insert_with(|| Arc::new(OnceCell::new()))
.clone()
};
let pull = once.get_or_try_init(|| async move {
let mut path = self.cache_dir.join(container.scheme().unwrap());
for part in container.name().unwrap().split("/") {
for part in part.split(':') {
path.push(part);
}
}
path.add_extension("sif");
if path.exists() {
debug!(path = %path.display(), "Apptainer image `{container:#}` already cached; using existing image");
return Ok(path);
}
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await.with_context(|| {
format!(
"failed to create directory `{parent}`",
parent = parent.display()
)
})?;
}
let container = format!("{container:#}");
let executable = executable.to_string();
Retry::spawn_notify(
ExponentialBackoff::from_millis(50)
.max_delay_millis(60_000)
.take(10),
|| Self::try_pull_image(&executable, &container, &path),
{
let executable = executable.clone();
move |e: &anyhow::Error, _| {
warn!(e = %e, "`{executable} pull` failed");
}
},
)
.await
.with_context(|| format!("failed pulling Apptainer image `{container}`"))?;
debug!(path = %path.display(), "Apptainer image `{container}` pulled successfully");
Ok(path)
});
tokio::select! {
_ = token.cancelled() => Ok(None),
res = pull => res.map(|p| Some(p.clone())),
}
}
pub(crate) async fn pull_first_available_image(
&self,
executable: &str,
candidates: &[ContainerSource],
token: CancellationToken,
) -> Option<PullResults<PathBuf>> {
let mut results = PullResults::default();
for candidate in candidates {
debug!("attempting to pull container image `{candidate:#}`");
match self.pull_image(executable, candidate, token.clone()).await {
Ok(Some(path)) => {
debug!("successfully pulled container image `{candidate:#}`");
results.push(candidate.clone(), Ok(path));
return Some(results);
}
Ok(None) => return None,
Err(e) => {
warn!("failed to pull container image `{candidate:#}`: {e:#}");
results.push(candidate.clone(), Err(e));
}
}
}
Some(results)
}
async fn try_pull_image(
executable: &str,
image: &str,
path: &Path,
) -> Result<(), RetryError<anyhow::Error>> {
debug!("spawning `{executable}` to pull image `{image}`");
let child = Command::new(executable)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.arg("pull")
.arg(path)
.arg(image)
.spawn()
.with_context(|| {
format!(
"failed to spawn `{executable} pull '{path}' '{image}'`",
path = path.display()
)
})
.map_err(RetryError::permanent)?;
let output = child
.wait_with_output()
.await
.context(format!("failed to wait for `{executable}`"))
.map_err(RetryError::permanent)?;
if !output.status.success() {
let permanent = if let Ok(stderr) = str::from_utf8(&output.stderr) {
let mut permanent = false;
let needles = ["manifest unknown", "403 (Forbidden)"];
for needle in needles {
if stderr.contains(needle) {
permanent = true;
break;
}
}
permanent
} else {
false
};
let e = anyhow!(
"`{executable}` failed: {status}: {stderr}",
status = output.status,
stderr = str::from_utf8(&output.stderr)
.unwrap_or("<output not UTF-8>")
.trim()
);
return if permanent {
Err(RetryError::permanent(e))
} else {
Err(RetryError::transient(e))
};
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use indexmap::IndexMap;
use tempfile::TempDir;
use url::Url;
use super::*;
use crate::ONE_GIBIBYTE;
use crate::TaskInputs;
use crate::backend::ExecuteTaskRequest;
use crate::backend::TaskExecutionConstraints;
use crate::config::DEFAULT_TASK_SHELL;
#[tokio::test]
async fn example_task_generates() {
let root = TempDir::new().unwrap();
let mut env = IndexMap::new();
env.insert("FOO".to_string(), "bar".to_string());
env.insert("BAZ".to_string(), "\"quux\"".to_string());
let runtime = ApptainerRuntime::new(&root.path().join("runs"), None).unwrap();
let _ = runtime
.generate_script(
&ApptainerConfig::default(),
DEFAULT_TASK_SHELL,
&ExecuteTaskRequest {
id: "example-task",
command: "echo hello",
inputs: &TaskInputs::default(),
backend_inputs: &[],
requirements: &Default::default(),
hints: &Default::default(),
env: &env,
constraints: &TaskExecutionConstraints {
container: Some(vec![
String::from(
Url::from_file_path(root.path().join("non-existent.sif")).unwrap(),
)
.parse()
.unwrap(),
]),
cpu: 1.0,
memory: ONE_GIBIBYTE as u64,
gpu: Default::default(),
fpga: Default::default(),
disks: Default::default(),
},
attempt_dir: &root.path().join("0"),
temp_dir: &root.path().join("temp"),
},
CancellationToken::new(),
)
.await
.inspect_err(|e| eprintln!("{e:#?}"))
.expect("example task script should generate");
}
#[cfg(unix)]
#[tokio::test]
async fn example_task_shellchecks() {
use tokio::process::Command;
use crate::config::DEFAULT_TASK_SHELL;
let root = TempDir::new().unwrap();
let mut env = IndexMap::new();
env.insert("FOO".to_string(), "bar".to_string());
env.insert("BAZ".to_string(), "\"quux\"".to_string());
let runtime = ApptainerRuntime::new(&root.path().join("runs"), None).unwrap();
let (script, _) = runtime
.generate_script(
&ApptainerConfig::default(),
DEFAULT_TASK_SHELL,
&ExecuteTaskRequest {
id: "example-task",
command: "echo hello",
inputs: &TaskInputs::default(),
backend_inputs: &[],
requirements: &Default::default(),
hints: &Default::default(),
env: &env,
constraints: &TaskExecutionConstraints {
container: Some(vec![
String::from(
Url::from_file_path(root.path().join("non-existent.sif")).unwrap(),
)
.parse()
.unwrap(),
]),
cpu: 1.0,
memory: ONE_GIBIBYTE as u64,
gpu: Default::default(),
fpga: Default::default(),
disks: Default::default(),
},
attempt_dir: &root.path().join("0"),
temp_dir: &root.path().join("temp"),
},
CancellationToken::new(),
)
.await
.inspect_err(|e| eprintln!("{e:#?}"))
.expect("example task script should generate")
.expect("operation should not be canceled");
let script_file = root.path().join("apptainer_script");
tokio::fs::write(&script_file, &script)
.await
.expect("can write script to disk");
let shellcheck_status = Command::new("shellcheck")
.arg("--shell=bash")
.arg("--severity=style")
.arg("--exclude=SC2140")
.arg(&script_file)
.status()
.await
.unwrap();
assert!(shellcheck_status.success());
}
}