use std::collections::HashMap;
use std::fmt;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Result;
use futures::future::BoxFuture;
use indexmap::IndexMap;
use crate::ContentKind;
use crate::EvaluationPath;
use crate::GuestPath;
use crate::TaskInputs;
use crate::Value;
use crate::http::Location;
use crate::http::Transferer;
use crate::v1::requirements::ContainerSource;
mod apptainer;
mod docker;
mod local;
mod lsf_apptainer;
pub(crate) mod manager;
mod slurm_apptainer;
mod tes;
pub use apptainer::*;
pub use docker::*;
pub use local::*;
pub use lsf_apptainer::*;
pub use slurm_apptainer::*;
pub use tes::*;
const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs/";
pub(crate) const WORK_DIR_NAME: &str = "work";
pub(crate) const COMMAND_FILE_NAME: &str = "command";
pub(crate) const STDOUT_FILE_NAME: &str = "stdout";
pub(crate) const STDERR_FILE_NAME: &str = "stderr";
const INITIAL_EXPECTED_NAMES: usize = 1000;
#[derive(Debug, Clone)]
pub(crate) struct Input {
kind: ContentKind,
path: EvaluationPath,
guest_path: Option<GuestPath>,
location: Option<Location>,
}
impl Input {
pub fn new(kind: ContentKind, path: EvaluationPath, guest_path: Option<GuestPath>) -> Self {
Self {
kind,
path,
guest_path,
location: None,
}
}
pub fn kind(&self) -> ContentKind {
self.kind
}
pub fn path(&self) -> &EvaluationPath {
&self.path
}
pub fn guest_path(&self) -> Option<&GuestPath> {
self.guest_path.as_ref()
}
pub fn local_path(&self) -> Option<&Path> {
self.location.as_deref().or_else(|| self.path.as_local())
}
pub fn set_location(&mut self, location: Location) {
self.location = Some(location);
}
}
pub struct PullResults<T>(Vec<(ContainerSource, anyhow::Result<T>)>);
impl<T> Default for PullResults<T> {
fn default() -> Self {
Self(Vec::new())
}
}
impl<T> PullResults<T> {
pub fn push(&mut self, source: ContainerSource, result: anyhow::Result<T>) {
self.0.push((source, result));
}
pub fn successful_container(&self) -> Option<(&ContainerSource, &T)> {
self.0
.iter()
.find_map(|(source, result)| result.as_ref().ok().map(|value| (source, value)))
}
pub fn failures(&self) -> impl Iterator<Item = (&ContainerSource, &anyhow::Error)> {
self.0
.iter()
.filter_map(|(source, result)| result.as_ref().err().map(|e| (source, e)))
}
}
impl<T> fmt::Display for PullResults<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "all container image candidates failed to pull:")?;
for (source, error) in self.failures() {
write!(f, "\n - `{source:#}`: {error:#}")?;
}
Ok(())
}
}
#[derive(Debug)]
pub struct TaskExecutionConstraints {
pub container: Option<Vec<ContainerSource>>,
pub cpu: f64,
pub memory: u64,
pub gpu: Vec<String>,
pub fpga: Vec<String>,
pub disks: IndexMap<String, i64>,
}
#[derive(Debug)]
pub struct ExecuteTaskRequest<'a> {
pub id: &'a str,
pub command: &'a str,
pub inputs: &'a TaskInputs,
pub backend_inputs: &'a [Input],
pub requirements: &'a HashMap<String, Value>,
pub hints: &'a HashMap<String, Value>,
pub env: &'a IndexMap<String, String>,
pub constraints: &'a TaskExecutionConstraints,
pub attempt_dir: &'a Path,
pub temp_dir: &'a Path,
}
impl<'a> ExecuteTaskRequest<'a> {
pub fn command_path(&self) -> PathBuf {
self.attempt_dir.join(COMMAND_FILE_NAME)
}
pub fn work_dir(&self) -> PathBuf {
self.attempt_dir.join(WORK_DIR_NAME)
}
pub fn stdout_path(&self) -> PathBuf {
self.attempt_dir.join(STDOUT_FILE_NAME)
}
pub fn stderr_path(&self) -> PathBuf {
self.attempt_dir.join(STDERR_FILE_NAME)
}
}
#[derive(Debug)]
pub struct TaskExecutionResult {
pub container: Option<ContainerSource>,
pub exit_code: i32,
pub work_dir: EvaluationPath,
pub stdout: Value,
pub stderr: Value,
}
pub(crate) trait TaskExecutionBackend: Send + Sync {
fn constraints(
&self,
inputs: &TaskInputs,
requirements: &HashMap<String, Value>,
hints: &HashMap<String, Value>,
) -> Result<TaskExecutionConstraints>;
fn guest_inputs_dir(&self) -> Option<&'static str> {
Some(GUEST_INPUTS_DIR)
}
fn needs_local_inputs(&self) -> bool {
true
}
fn execute<'a>(
&'a self,
transferer: &'a Arc<dyn Transferer>,
request: ExecuteTaskRequest<'a>,
) -> BoxFuture<'a, Result<Option<TaskExecutionResult>>>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_pull_results_has_no_successful_container() {
let results: PullResults<String> = PullResults::default();
assert!(results.successful_container().is_none());
}
#[test]
fn pull_results_with_success() {
let mut results = PullResults::default();
let source = ContainerSource::Docker("foo:latest".to_string());
results.push(source.clone(), Ok("resolved".to_string()));
assert_eq!(
results
.successful_container()
.map(|(s, v)| (s.clone(), v.clone())),
Some((source, "resolved".to_string()))
);
}
#[test]
fn pull_results_with_all_failures() {
let mut results: PullResults<String> = PullResults::default();
results.push(
ContainerSource::Docker("a:1".to_string()),
Err(anyhow::anyhow!("not found")),
);
results.push(
ContainerSource::Docker("b:2".to_string()),
Err(anyhow::anyhow!("timeout")),
);
assert!(results.successful_container().is_none());
assert_eq!(results.failures().count(), 2);
}
#[test]
fn pull_results_display_lists_failures() {
let mut results: PullResults<String> = PullResults::default();
results.push(
ContainerSource::Docker("a:1".to_string()),
Err(anyhow::anyhow!("not found")),
);
results.push(
ContainerSource::Docker("b:2".to_string()),
Err(anyhow::anyhow!("timeout")),
);
let display = results.to_string();
assert!(display.contains("a:1"));
assert!(display.contains("not found"));
assert!(display.contains("b:2"));
assert!(display.contains("timeout"));
}
#[test]
fn pull_results_failures_skips_successes() {
let mut results = PullResults::default();
results.push(
ContainerSource::Docker("a:1".to_string()),
Err(anyhow::anyhow!("not found")),
);
results.push(
ContainerSource::Docker("b:2".to_string()),
Ok("resolved".to_string()),
);
assert_eq!(results.failures().count(), 1);
}
}