use std::collections::HashMap;
use std::fs;
use std::sync::Arc;
use std::sync::Mutex;
use anyhow::Context;
use anyhow::Result;
use anyhow::bail;
use crankshaft::config::backend;
use crankshaft::engine::Task;
use crankshaft::engine::service::name::GeneratorIterator;
use crankshaft::engine::service::name::UniqueAlphanumeric;
use crankshaft::engine::service::runner::Backend;
use crankshaft::engine::service::runner::backend::TaskRunError;
use crankshaft::engine::service::runner::backend::docker;
use crankshaft::engine::task::Execution;
use crankshaft::engine::task::Input;
use crankshaft::engine::task::Output;
use crankshaft::engine::task::Resources;
use crankshaft::engine::task::input::Contents;
use crankshaft::engine::task::input::Type as InputType;
use crankshaft::engine::task::output::Type as OutputType;
use futures::FutureExt;
use futures::future::BoxFuture;
use nonempty::NonEmpty;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::info;
use tracing::warn;
use url::Url;
use super::PullResults;
use super::TaskExecutionBackend;
use super::TaskExecutionConstraints;
use super::TaskExecutionResult;
use crate::CancellationContext;
use crate::EvaluationPath;
use crate::Events;
use crate::ONE_GIBIBYTE;
use crate::PrimitiveValue;
use crate::TaskInputs;
use crate::Value;
use crate::backend::ExecuteTaskRequest;
use crate::backend::INITIAL_EXPECTED_NAMES;
use crate::backend::manager::TaskManager;
use crate::config::Config;
use crate::config::TaskResourceLimitBehavior;
use crate::http::Transferer;
use crate::v1::DEFAULT_DISK_MOUNT_POINT;
use crate::v1::hints;
use crate::v1::requirements;
use crate::v1::requirements::ContainerSource;
const GUEST_WORK_DIR: &str = "/mnt/task/work";
const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
#[cfg(unix)]
const CLEANUP_TASK_CPU: f64 = 0.1;
#[cfg(unix)]
const CLEANUP_TASK_MEMORY: u64 = 4096 * 1024;
struct DockerTask<'a> {
config: Arc<Config>,
request: ExecuteTaskRequest<'a>,
container: ContainerSource,
backend: Arc<docker::Backend>,
name: String,
max_cpu: Option<f64>,
max_memory: Option<u64>,
gpu: Option<u64>,
cancellation: CancellationContext,
}
impl<'a> DockerTask<'a> {
async fn run(self) -> Result<Option<TaskExecutionResult>> {
let work_dir = self.request.work_dir();
fs::create_dir_all(&work_dir).with_context(|| {
format!(
"failed to create directory `{path}`",
path = work_dir.display()
)
})?;
#[cfg(unix)]
{
use std::fs::Permissions;
use std::fs::set_permissions;
use std::os::unix::fs::PermissionsExt;
set_permissions(&work_dir, Permissions::from_mode(0o770)).with_context(|| {
format!(
"failed to set permissions for work directory `{path}`",
path = work_dir.display()
)
})?;
}
let command_path = self.request.command_path();
fs::write(&command_path, self.request.command).with_context(|| {
format!(
"failed to write command contents to `{path}`",
path = command_path.display()
)
})?;
let mut inputs = Vec::with_capacity(self.request.backend_inputs.len() + 2);
for input in self.request.backend_inputs.iter() {
let guest_path = input.guest_path().expect("input should have guest path");
let local_path = input.local_path().expect("input should be localized");
if !local_path.exists() {
bail!(
"cannot mount input `{path}` as it does not exist",
path = local_path.display()
);
}
inputs.push(
Input::builder()
.path(guest_path.as_str())
.contents(Contents::Path(local_path.into()))
.ty(input.kind())
.read_only(true)
.build(),
);
}
inputs.push(
Input::builder()
.path(GUEST_WORK_DIR)
.contents(Contents::Path(work_dir.to_path_buf()))
.ty(InputType::Directory)
.read_only(false)
.build(),
);
inputs.push(
Input::builder()
.path(GUEST_COMMAND_PATH)
.contents(Contents::Path(command_path.to_path_buf()))
.ty(InputType::File)
.read_only(true)
.build(),
);
let stdout_path = self.request.stdout_path();
let stderr_path = self.request.stderr_path();
let outputs = vec![
Output::builder()
.path(GUEST_STDOUT_PATH)
.url(Url::from_file_path(&stdout_path).expect("path should be absolute"))
.ty(OutputType::File)
.build(),
Output::builder()
.path(GUEST_STDERR_PATH)
.url(Url::from_file_path(&stderr_path).expect("path should be absolute"))
.ty(OutputType::File)
.build(),
];
let volumes = self
.request
.constraints
.disks
.keys()
.filter_map(|mp| {
if mp == DEFAULT_DISK_MOUNT_POINT {
None
} else {
Some(mp.clone())
}
})
.collect::<Vec<_>>();
if !volumes.is_empty() {
debug!(
"disk size constraints cannot be enforced by the Docker backend; mount points \
will be created but sizes will not be limited"
);
}
let task = Task::builder()
.name(&self.name)
.executions(NonEmpty::new(
Execution::builder()
.image(self.container.to_string())
.program(&self.config.task.shell)
.args([GUEST_COMMAND_PATH.to_string()])
.work_dir(GUEST_WORK_DIR)
.env(self.request.env.clone())
.stdout(GUEST_STDOUT_PATH)
.stderr(GUEST_STDERR_PATH)
.build(),
))
.inputs(inputs)
.outputs(outputs)
.resources(
Resources::builder()
.cpu(self.request.constraints.cpu)
.maybe_cpu_limit(self.max_cpu)
.ram(self.request.constraints.memory as f64 / ONE_GIBIBYTE)
.maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
.maybe_gpu(self.gpu)
.build(),
)
.volumes(volumes)
.build();
let statuses = match self.backend.run(task, self.cancellation.second())?.await {
Ok(statuses) => statuses,
Err(TaskRunError::Canceled) => return Ok(None),
Err(e) => return Err(e.into()),
};
assert_eq!(statuses.len(), 1, "there should only be one exit status");
let status = statuses.first();
Ok(Some(TaskExecutionResult {
container: Some(self.container),
exit_code: status.code().expect("should have exit code"),
work_dir: EvaluationPath::from_local_path(work_dir),
stdout: PrimitiveValue::new_file(
stdout_path
.into_os_string()
.into_string()
.expect("path should be UTF-8"),
)
.into(),
stderr: PrimitiveValue::new_file(
stderr_path
.into_os_string()
.into_string()
.expect("path should be UTF-8"),
)
.into(),
}))
}
}
#[cfg(unix)]
struct CleanupTask {
name: String,
work_dir: EvaluationPath,
backend: Arc<docker::Backend>,
cancellation: CancellationContext,
}
#[cfg(unix)]
impl CleanupTask {
async fn run(self) -> Result<Option<()>> {
use crankshaft::engine::service::runner::backend::TaskRunError;
use tracing::debug;
let work_dir = self.work_dir.as_local().expect("path should be local");
assert!(work_dir.is_absolute(), "work directory should be absolute");
let (uid, gid) = unsafe { (libc::geteuid(), libc::getegid()) };
let ownership = format!("{uid}:{gid}");
let task = Task::builder()
.name(&self.name)
.executions(NonEmpty::new(
Execution::builder()
.image("alpine:latest")
.program("chown")
.args([
"-R".to_string(),
ownership.clone(),
GUEST_WORK_DIR.to_string(),
])
.build(),
))
.inputs([Input::builder()
.path(GUEST_WORK_DIR)
.contents(Contents::Path(work_dir.to_path_buf()))
.ty(InputType::Directory)
.read_only(false)
.build()])
.resources(
Resources::builder()
.cpu(CLEANUP_TASK_CPU)
.ram(CLEANUP_TASK_MEMORY as f64 / ONE_GIBIBYTE)
.build(),
)
.build();
debug!(
"running cleanup task `{name}` to change ownership of `{path}` to `{ownership}`",
name = self.name,
path = work_dir.display(),
);
match self
.backend
.run(task, self.cancellation.second())
.context("failed to submit cleanup task")?
.await
{
Ok(statuses) => {
let status = statuses.first();
if status.success() {
Ok(Some(()))
} else {
bail!(
"failed to chown task work directory `{path}`",
path = work_dir.display()
);
}
}
Err(TaskRunError::Canceled) => Ok(None),
Err(e) => Err(e).context("failed to run cleanup task"),
}
}
}
async fn pull_first_available_docker_image(
docker: &crankshaft::docker::Docker,
candidates: &[ContainerSource],
token: CancellationToken,
) -> Option<PullResults<ContainerSource>> {
let mut results = PullResults::default();
for candidate in candidates {
match candidate {
ContainerSource::Docker(_) => {}
ContainerSource::Library(_) | ContainerSource::Oras(_) => {
let err = anyhow::anyhow!(
"Docker backend does not support `{candidate:#}`; use a Docker registry image \
instead"
);
warn!("{err:#}");
results.push(candidate.clone(), Err(err));
continue;
}
ContainerSource::SifFile(_) => {
let err = anyhow::anyhow!(
"Docker backend does not support local SIF file `{candidate:#}`; use a Docker \
registry image instead"
);
warn!("{err:#}");
results.push(candidate.clone(), Err(err));
continue;
}
ContainerSource::Unknown(_) => {
let err = anyhow::anyhow!(
"Docker backend does not support unknown container source `{candidate:#}`"
);
warn!("{err:#}");
results.push(candidate.clone(), Err(err));
continue;
}
}
debug!("attempting to pull container image `{candidate:#}`");
match docker
.ensure_image(&candidate.to_string(), token.clone())
.await
{
Ok(Some(())) => {
debug!("successfully pulled container image `{candidate:#}`");
results.push(candidate.clone(), Ok(candidate.clone()));
return Some(results);
}
Ok(None) => return None,
Err(e) => {
let err =
anyhow::anyhow!(e).context(format!("failed to pull image `{candidate:#}`"));
warn!("failed to pull container image `{candidate:#}`: {err:#}");
results.push(candidate.clone(), Err(err));
}
}
}
Some(results)
}
pub struct DockerBackend {
config: Arc<Config>,
inner: Arc<docker::Backend>,
cancellation: CancellationContext,
max_cpu: f64,
max_memory: u64,
manager: TaskManager,
names: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
}
impl DockerBackend {
pub async fn new(
config: Arc<Config>,
events: Events,
cancellation: CancellationContext,
) -> Result<Self> {
info!("initializing Docker backend");
let names = Arc::new(Mutex::new(GeneratorIterator::new(
UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
INITIAL_EXPECTED_NAMES,
)));
let backend_config = config.backend()?;
let backend_config = backend_config
.as_docker()
.context("configured backend is not Docker")?;
let backend = docker::Backend::initialize_default_with(
backend::docker::Config::builder()
.cleanup(backend_config.cleanup)
.build(),
names.clone(),
events.crankshaft().clone(),
)
.await
.context("failed to initialize Docker backend")?;
let resources = *backend.resources();
let cpu = resources.cpu() as f64;
let max_cpu = resources.max_cpu() as f64;
let memory = resources.memory();
let max_memory = resources.max_memory();
let manager = if resources.use_service() {
TaskManager::new_unlimited(max_cpu, max_memory)
} else {
TaskManager::new(
cpu,
max_cpu,
memory,
max_memory,
events,
cancellation.clone(),
)
};
Ok(Self {
config,
inner: Arc::new(backend),
cancellation,
max_cpu,
max_memory,
manager,
names,
})
}
}
impl TaskExecutionBackend for DockerBackend {
fn constraints(
&self,
inputs: &TaskInputs,
requirements: &HashMap<String, Value>,
hints: &HashMap<String, Value>,
) -> Result<TaskExecutionConstraints> {
let containers = requirements::container(inputs, requirements, &self.config.task.container);
let mut cpu = requirements::cpu(inputs, requirements);
if self.max_cpu < cpu {
let env_specific = if self.config.suppress_env_specific_output {
String::new()
} else {
format!(
", but the execution backend has a maximum of {max_cpu}",
max_cpu = self.max_cpu,
)
};
match self.config.task.cpu_limit_behavior {
TaskResourceLimitBehavior::TryWithMax => {
warn!(
"task requires at least {cpu} CPU{s}{env_specific}",
s = if cpu == 1.0 { "" } else { "s" },
);
cpu = self.max_cpu;
}
TaskResourceLimitBehavior::Deny => {
bail!(
"task requires at least {cpu} CPU{s}{env_specific}",
s = if cpu == 1.0 { "" } else { "s" },
);
}
}
}
let mut memory = requirements::memory(inputs, requirements)? as u64;
if self.max_memory < memory as u64 {
let env_specific = if self.config.suppress_env_specific_output {
String::new()
} else {
format!(
", but the execution backend has a maximum of {max_memory} GiB",
max_memory = self.max_memory as f64 / ONE_GIBIBYTE,
)
};
match self.config.task.memory_limit_behavior {
TaskResourceLimitBehavior::TryWithMax => {
warn!(
"task requires at least {memory} GiB of memory{env_specific}",
memory = memory as f64 / ONE_GIBIBYTE,
);
memory = self.max_memory;
}
TaskResourceLimitBehavior::Deny => {
bail!(
"task requires at least {memory} GiB of memory{env_specific}",
memory = memory as f64 / ONE_GIBIBYTE,
);
}
}
}
let gpu = requirements::gpu(inputs, requirements, hints)
.map(|count| (0..count).map(|i| format!("nvidia-gpu-{i}")).collect())
.unwrap_or_default();
let disks = requirements::disks(inputs, requirements, hints)?
.into_iter()
.map(|(mount_point, disk)| (mount_point.to_string(), disk.size))
.collect();
Ok(TaskExecutionConstraints {
container: Some(containers),
cpu,
memory,
gpu,
fpga: Default::default(),
disks,
})
}
fn execute<'a>(
&'a self,
_: &'a Arc<dyn Transferer>,
request: ExecuteTaskRequest<'a>,
) -> BoxFuture<'a, Result<Option<TaskExecutionResult>>> {
async move {
let cpu = request.constraints.cpu;
let memory = request.constraints.memory;
let max_cpu =
hints::max_cpu(request.inputs, request.hints).map(|m| m.min(self.max_cpu));
let max_memory = hints::max_memory(request.inputs, request.hints)?
.map(|i| (i as u64).min(self.max_memory));
let gpu = requirements::gpu(request.inputs, request.requirements, request.hints);
let results = match pull_first_available_docker_image(
self.inner.client(),
request
.constraints
.container
.as_ref()
.context("task does not use a container")?,
self.cancellation.second(),
)
.await
{
Some(results) => results,
None => return Ok(None),
};
let (_, container) = results
.successful_container()
.ok_or_else(|| anyhow::anyhow!("{results}"))?;
let container = container.clone();
let name = format!(
"{id}-{generated}",
id = request.id,
generated = self
.names
.lock()
.expect("generator should always acquire")
.next()
.expect("generator should never be exhausted")
);
let task = DockerTask {
config: self.config.clone(),
request,
container,
backend: self.inner.clone(),
name,
max_cpu,
max_memory,
gpu,
cancellation: self.cancellation.clone(),
};
match self.manager.run(cpu, memory, task.run()).await? {
Some(res) => {
#[cfg(unix)]
{
let name = format!(
"docker-chown-{id}",
id = self
.names
.lock()
.expect("generator should always acquire")
.next()
.expect("generator should never be exhausted")
);
let task = CleanupTask {
name,
work_dir: res.work_dir.clone(),
backend: self.inner.clone(),
cancellation: self.cancellation.clone(),
};
if let Err(e) = self
.manager
.run(CLEANUP_TASK_CPU, CLEANUP_TASK_MEMORY, task.run())
.await
{
tracing::error!("Docker backend cleanup failed: {e:#}");
}
}
Ok(Some(res))
}
None => Ok(None),
}
}
.boxed()
}
}