use std::collections::HashMap;
use std::ffi::OsStr;
use std::fs;
use std::fs::File;
use std::process::Stdio;
use std::sync::Arc;
use std::sync::Mutex;
use anyhow::Context;
use anyhow::Result;
use anyhow::bail;
use crankshaft::engine::service::name::GeneratorIterator;
use crankshaft::engine::service::name::UniqueAlphanumeric;
use crankshaft::events::Event;
use crankshaft::events::next_task_id;
use crankshaft::events::send_event;
use futures::FutureExt;
use futures::future::BoxFuture;
use nonempty::NonEmpty;
use tokio::process::Command;
use tokio::select;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::warn;
use super::TaskExecutionBackend;
use super::TaskExecutionConstraints;
use crate::CancellationContext;
use crate::EvaluationPath;
use crate::Events;
use crate::ONE_GIBIBYTE;
use crate::PrimitiveValue;
use crate::SYSTEM;
use crate::TaskInputs;
use crate::Value;
use crate::backend::ExecuteTaskRequest;
use crate::backend::INITIAL_EXPECTED_NAMES;
use crate::backend::TaskExecutionResult;
use crate::backend::manager::TaskManager;
use crate::config::Config;
use crate::config::TaskResourceLimitBehavior;
use crate::convert_unit_string;
use crate::http::Transferer;
use crate::v1::requirements;
struct LocalTask<'a> {
config: Arc<Config>,
request: ExecuteTaskRequest<'a>,
name: String,
events: Option<broadcast::Sender<Event>>,
cancellation: CancellationContext,
}
impl<'a> LocalTask<'a> {
async fn run(self) -> Result<Option<TaskExecutionResult>> {
let id = next_task_id();
let work_dir = self.request.work_dir();
let stdout_path = self.request.stdout_path();
let stderr_path = self.request.stderr_path();
let run = async {
fs::create_dir_all(&work_dir).with_context(|| {
format!(
"failed to create directory `{path}`",
path = work_dir.display()
)
})?;
let command_path = self.request.command_path();
fs::write(&command_path, self.request.command).with_context(|| {
format!(
"failed to write command contents to `{path}`",
path = command_path.display()
)
})?;
let stdout = File::create(&stdout_path).with_context(|| {
format!(
"failed to create stdout file `{path}`",
path = stdout_path.display()
)
})?;
let stderr = File::create(&stderr_path).with_context(|| {
format!(
"failed to create stderr file `{path}`",
path = stderr_path.display()
)
})?;
let mut command = Command::new(&self.config.task.shell);
command
.current_dir(&work_dir)
.arg(command_path)
.stdin(Stdio::null())
.stdout(stdout)
.stderr(stderr)
.envs(
self.request
.env
.iter()
.map(|(k, v)| (OsStr::new(k), OsStr::new(v))),
)
.kill_on_drop(true);
#[cfg(windows)]
if let Ok(path) = std::env::var("PATH") {
command.env("PATH", path);
}
let mut child = command.spawn().context("failed to spawn shell")?;
send_event!(self.events, Event::TaskStarted { id });
let id = child.id().expect("should have id");
info!(
"spawned local shell process {id} for execution of task `{name}`",
name = self.name
);
let status = child.wait().await.with_context(|| {
format!("failed to wait for termination of task child process {id}")
})?;
#[cfg(unix)]
{
use std::os::unix::process::ExitStatusExt;
if let Some(signal) = status.signal() {
tracing::warn!("task process {id} has terminated with signal {signal}");
bail!(
"task child process {id} has terminated with signal {signal}; see stderr \
file `{path}` for more details",
path = stderr_path.display()
);
}
}
Ok(status)
};
let task_token = CancellationToken::new();
send_event!(
self.events,
Event::TaskCreated {
id,
name: self.name.clone(),
tes_id: None,
token: task_token.clone(),
}
);
let token = self.cancellation.second();
select! {
biased;
_ = task_token.cancelled() => {
send_event!(self.events, Event::TaskCanceled { id });
Ok(None)
}
_ = token.cancelled() => {
send_event!(self.events, Event::TaskCanceled { id });
Ok(None)
}
result = run => {
match result {
Ok(status) => {
send_event!(self.events, Event::TaskCompleted { id, exit_statuses: NonEmpty::new(status) });
let exit_code = status.code().expect("process should have exited");
info!("process {id} for task `{name}` has terminated with status code {exit_code}", name = self.name);
Ok(Some(TaskExecutionResult {
container: None,
exit_code,
work_dir: EvaluationPath::from_local_path(work_dir),
stdout: PrimitiveValue::new_file(stdout_path.into_os_string().into_string().expect("path should be UTF-8")).into(),
stderr: PrimitiveValue::new_file(stderr_path.into_os_string().into_string().expect("path should be UTF-8")).into(),
}))
}
Err(e) => {
send_event!(self.events, Event::TaskFailed { id, message: format!("{e:#}") });
Err(e)
}
}
}
}
}
}
pub struct LocalBackend {
config: Arc<Config>,
cancellation: CancellationContext,
cpu: f64,
memory: u64,
manager: TaskManager,
names: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
events: Events,
}
impl LocalBackend {
pub fn new(
config: Arc<Config>,
events: Events,
cancellation: CancellationContext,
) -> Result<Self> {
info!("initializing local backend");
let names = Arc::new(Mutex::new(GeneratorIterator::new(
UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
INITIAL_EXPECTED_NAMES,
)));
let backend_config = config.backend()?;
let backend_config = backend_config
.as_local()
.context("configured backend is not local")?;
let cpu = backend_config
.cpu
.map(|v| v as f64)
.unwrap_or_else(|| SYSTEM.cpus().len() as f64);
let memory = backend_config
.memory
.as_ref()
.map(|s| convert_unit_string(s).expect("value should be valid"))
.unwrap_or_else(|| SYSTEM.total_memory());
let manager = TaskManager::new(
cpu,
cpu,
memory,
memory,
events.clone(),
cancellation.clone(),
);
Ok(Self {
config,
cancellation,
cpu,
memory,
manager,
names,
events,
})
}
}
impl TaskExecutionBackend for LocalBackend {
fn constraints(
&self,
inputs: &TaskInputs,
requirements: &HashMap<String, Value>,
_: &HashMap<String, Value>,
) -> Result<TaskExecutionConstraints> {
let mut cpu = requirements::cpu(inputs, requirements);
if self.cpu < cpu {
let env_specific = if self.config.suppress_env_specific_output {
String::new()
} else {
format!(
", but the host only has {total_cpu} available",
total_cpu = self.cpu
)
};
match self.config.task.cpu_limit_behavior {
TaskResourceLimitBehavior::TryWithMax => {
warn!(
"task requires at least {cpu} CPU{s}{env_specific}",
s = if cpu == 1.0 { "" } else { "s" },
);
cpu = self.cpu;
}
TaskResourceLimitBehavior::Deny => {
bail!(
"task requires at least {cpu} CPU{s}{env_specific}",
s = if cpu == 1.0 { "" } else { "s" },
);
}
}
}
let mut memory = requirements::memory(inputs, requirements)? as u64;
if self.memory < memory as u64 {
let env_specific = if self.config.suppress_env_specific_output {
String::new()
} else {
format!(
", but the host only has {total_memory} GiB available",
total_memory = self.memory as f64 / ONE_GIBIBYTE,
)
};
match self.config.task.memory_limit_behavior {
TaskResourceLimitBehavior::TryWithMax => {
warn!(
"task requires at least {memory} GiB of memory{env_specific}",
memory = memory as f64 / ONE_GIBIBYTE,
);
memory = self.memory;
}
TaskResourceLimitBehavior::Deny => {
bail!(
"task requires at least {memory} GiB of memory{env_specific}",
memory = memory as f64 / ONE_GIBIBYTE,
);
}
}
}
Ok(TaskExecutionConstraints {
container: None,
cpu,
memory,
gpu: Default::default(),
fpga: Default::default(),
disks: Default::default(),
})
}
fn guest_inputs_dir(&self) -> Option<&'static str> {
None
}
fn execute<'a>(
&'a self,
_: &'a Arc<dyn Transferer>,
request: ExecuteTaskRequest<'a>,
) -> BoxFuture<'a, Result<Option<TaskExecutionResult>>> {
async move {
let name = format!(
"{id}-{generated}",
id = request.id,
generated = self
.names
.lock()
.expect("generator should always acquire")
.next()
.expect("generator should never be exhausted")
);
let cpu = request.constraints.cpu;
let memory = request.constraints.memory;
let task = LocalTask {
config: self.config.clone(),
request,
name,
events: self.events.crankshaft().clone(),
cancellation: self.cancellation.clone(),
};
self.manager.run(cpu, memory, task.run()).await
}
.boxed()
}
}