use std::collections::HashMap;
use std::fmt;
#[cfg(unix)]
use std::os::unix::process::ExitStatusExt as _;
#[cfg(windows)]
use std::os::windows::process::ExitStatusExt as _;
use std::path::Path;
use std::process::ExitStatus;
use std::process::Stdio;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use anyhow::Context as _;
use anyhow::Result;
use anyhow::anyhow;
use anyhow::bail;
use bytesize::ByteSize;
use cloud_copy::Alphanumeric;
use crankshaft::engine::service::name::GeneratorIterator;
use crankshaft::engine::service::name::UniqueAlphanumeric;
use crankshaft::events::Event as CrankshaftEvent;
use crankshaft::events::send_event;
use futures::FutureExt;
use futures::future::BoxFuture;
use nonempty::NonEmpty;
use serde::Deserialize;
use tokio::fs;
use tokio::fs::File;
use tokio::process::Command;
use tokio::select;
use tokio::sync::Semaphore;
use tokio::sync::oneshot;
use tokio::time::MissedTickBehavior;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::error;
use tracing::trace;
use tracing::warn;
use super::TaskExecutionBackend;
use crate::CancellationContext;
use crate::EvaluationPath;
use crate::Events;
use crate::ONE_GIBIBYTE;
use crate::PrimitiveValue;
use crate::TaskInputs;
use crate::Value;
use crate::backend::ApptainerRuntime;
use crate::backend::ExecuteTaskRequest;
use crate::backend::INITIAL_EXPECTED_NAMES;
use crate::backend::TaskExecutionConstraints;
use crate::backend::TaskExecutionResult;
use crate::config::Config;
use crate::config::LsfApptainerBackendConfig;
use crate::config::TaskResourceLimitBehavior;
use crate::http::Transferer;
use crate::v1::requirements;
const APPTAINER_COMMAND_FILE_NAME: &str = "apptainer_command";
const LSF_JOB_NAME_MAX_LENGTH: usize = 4094;
const DEFAULT_MONITOR_INTERVAL: u64 = 30;
const DEFAULT_MAX_CONCURRENCY: u32 = 10;
const MONITOR_TAG_LENGTH: usize = 10;
fn truncate_job_name(name: &str) -> &str {
if name.len() < LSF_JOB_NAME_MAX_LENGTH {
return name;
}
let index = name
.char_indices()
.find_map(|(i, c)| {
if (i + c.len_utf8()) >= LSF_JOB_NAME_MAX_LENGTH {
Some(i)
} else {
None
}
})
.expect("should have index");
&name[0..index]
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum JobState {
Pending,
Running,
Done,
Suspended,
Exited,
}
impl JobState {
fn terminated(&self) -> bool {
matches!(self, Self::Done | Self::Exited)
}
}
impl fmt::Display for JobState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Pending => write!(f, "pending"),
Self::Running => write!(f, "running"),
Self::Done => write!(f, "done"),
Self::Suspended => write!(f, "suspended"),
Self::Exited => write!(f, "exited"),
}
}
}
impl FromStr for JobState {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self> {
match s {
"PEND" => Ok(Self::Pending),
"RUN" => Ok(Self::Running),
"DONE" => Ok(Self::Done),
"PSUSP" | "USUSP" | "SSUSP" => Ok(Self::Suspended),
"EXIT" => Ok(Self::Exited),
_ => bail!("unknown LSF job state `{s}"),
}
}
}
#[derive(Deserialize)]
struct JobRecord {
#[serde(rename = "JOBID")]
job_id: String,
#[serde(rename = "STAT")]
state: String,
#[serde(default, rename = "EXIT_CODE")]
exit_code: String,
#[serde(default, rename = "AVG_MEM")]
avg_memory: String,
#[serde(default, rename = "MAX_MEM")]
max_memory: String,
#[serde(default, rename = "CPU_USED")]
cpu_used: String,
#[serde(default, rename = "RU_UTIME")]
user_cpu_time: String,
#[serde(default, rename = "RU_STIME")]
system_cpu_time: String,
}
#[derive(Debug)]
struct Job {
tick: u64,
crankshaft_id: u64,
state: JobState,
completed: oneshot::Sender<Result<u8>>,
}
#[derive(Debug)]
struct MonitorState {
names: GeneratorIterator<UniqueAlphanumeric>,
tick: u64,
tag: String,
jobs: HashMap<u64, Job>,
}
impl MonitorState {
fn new() -> Self {
Self {
names: GeneratorIterator::new(
UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
INITIAL_EXPECTED_NAMES,
),
tick: 0,
tag: String::new(),
jobs: HashMap::new(),
}
}
fn current_tag(&mut self) -> &str {
if self.tag.is_empty() {
self.tag = Alphanumeric::new(MONITOR_TAG_LENGTH).to_string();
}
&self.tag
}
fn add_job(&mut self, job_id: u64, crankshaft_id: u64, completed: oneshot::Sender<Result<u8>>) {
let tick = self.tick;
let prev = self.jobs.insert(
job_id,
Job {
tick,
crankshaft_id,
state: JobState::Pending,
completed,
},
);
if prev.is_some() {
warn!(
"encountered duplicate LSF job id `{job_id}`: tasks may not be monitored correctly"
);
}
}
fn update_jobs(&mut self, records: Vec<JobRecord>, events: &Events) {
let tick = self.tick;
for record in records {
let Ok(job_id) = record.job_id.parse() else {
warn!(
"LSF task monitor encountered invalid job identifier `{id}`",
id = record.job_id
);
continue;
};
let Some(job) = self.jobs.get_mut(&job_id) else {
continue;
};
match record.state.parse::<JobState>() {
Ok(job_state) => {
if job.state != job_state {
if job_state == JobState::Running {
send_event!(
events.crankshaft(),
CrankshaftEvent::TaskStarted {
id: job.crankshaft_id
},
);
}
if job_state.terminated() {
if job.state != JobState::Running {
send_event!(
events.crankshaft(),
CrankshaftEvent::TaskStarted {
id: job.crankshaft_id
},
);
}
let exit_code: u8 = record.exit_code.parse().unwrap_or_default();
debug!(
"LSF job `{job_id}` has exited with code `{exit_code}`: average \
memory `{avg_mem}`, maximum memory `{max_mem}`, CPU used \
`{cpu_used}`, user CPU time `{user_cpu_time}`, system CPU time \
`{system_cpu_time}`",
job_id = record.job_id,
avg_mem = record.avg_memory,
max_mem = record.max_memory,
cpu_used = record.cpu_used,
user_cpu_time = record.user_cpu_time,
system_cpu_time = record.system_cpu_time
);
let job = self.jobs.remove(&job_id).unwrap();
let _ = job.completed.send(Ok(exit_code));
continue;
} else {
debug!("LSF job `{job_id}` is now in the `{job_state}` state");
}
job.state = job_state;
}
job.tick = tick;
}
Err(e) => {
let job = self.jobs.remove(&job_id).unwrap();
let _ = job.completed.send(Err(e));
}
}
}
for (id, job) in self.jobs.extract_if(|_, j| j.tick != tick) {
let _ = job.completed.send(Err(anyhow!(
"LSF job `{id}` was missing from `bjobs` output: cannot monitor associated task"
)));
}
if self.jobs.is_empty() {
self.tag.clear();
}
}
fn fail_all_jobs(&mut self, error: &anyhow::Error) {
for (_, job) in self.jobs.drain() {
let _ = job.completed.send(Err(anyhow!("{error:#}")));
}
self.tag.clear();
}
}
#[derive(Debug)]
struct SubmittedJob {
id: u64,
task_name: String,
completed: oneshot::Receiver<Result<u8>>,
}
#[derive(Debug, Clone)]
struct Monitor {
state: Arc<Mutex<MonitorState>>,
_drop: Arc<oneshot::Sender<()>>,
}
impl Monitor {
fn new(interval: Duration, job_name_prefix: Option<String>, events: Events) -> Self {
let (tx, rx) = oneshot::channel();
let state = Arc::new(Mutex::new(MonitorState::new()));
tokio::spawn(Self::monitor(
state.clone(),
interval,
job_name_prefix,
events,
rx,
));
Self {
state,
_drop: Arc::new(tx),
}
}
async fn submit_job(
&self,
config: &LsfApptainerBackendConfig,
request: &ExecuteTaskRequest<'_>,
crankshaft_id: u64,
command_path: &Path,
) -> Result<SubmittedJob> {
let (task_name, tag) = {
let mut state = self.state.lock().expect("failed to lock state");
let task_name = format!(
"{id}-{generated}",
id = request.id,
generated = state
.names
.next()
.expect("generator should never be exhausted")
);
(task_name, state.current_tag().to_string())
};
let mut command = Command::new("bsub");
if let Some(queue) = config.lsf_queue_for_task(request.requirements, request.hints) {
command.arg("-q").arg(&queue.name);
}
if let Some(gpu) = requirements::gpu(request.inputs, request.requirements, request.hints) {
command.arg("-gpu").arg(format!("num={gpu}/host"));
}
if let Some(args) = &config.extra_bsub_args {
command.args(args);
}
let job_name = format!(
"{prefix}{sep}{tag}-{task_name}",
prefix = config.job_name_prefix.as_deref().unwrap_or(""),
sep = if config.job_name_prefix.is_some() {
"-"
} else {
""
}
);
command
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.env("LSB_JOB_REPORT_MAIL", "N")
.arg("-J")
.arg(truncate_job_name(&job_name))
.arg("-oo")
.arg(request.attempt_dir.join("job.%J.stdout"))
.arg("-eo")
.arg(request.attempt_dir.join("job.%J.stderr"))
.arg("-R")
.arg(format!(
"affinity[cpu({cpu})]",
cpu = request.constraints.cpu.ceil() as u64
))
.arg("-R")
.arg(format!(
"rusage[mem={memory_kb}KB/job]",
memory_kb = request.constraints.memory / bytesize::KIB,
))
.arg(command_path);
trace!(?command, "spawning `bsub` to queue task");
let child = command.spawn().context("failed to spawn `bsub`")?;
let output = child
.wait_with_output()
.await
.context("failed to wait for `bsub` to exit")?;
if !output.status.success() {
bail!(
"failed to submit LSF job with `bsub` ({status})\n{stderr}",
status = output.status,
stderr = str::from_utf8(&output.stderr)
.unwrap_or("<output not UTF-8>")
.trim()
);
}
let stdout =
str::from_utf8(&output.stdout).map_err(|_| anyhow!("`bsub` output was not UTF-8"))?;
let job_id: u64 = stdout
.split(' ')
.nth(1)
.and_then(|id| {
id.trim_start_matches('<')
.trim_end_matches('>')
.parse()
.ok()
})
.context("`bsub` output did not contain the job identifier")?;
debug!("task `{task_name}` was queued as LSF job `{job_id}`");
let (tx, rx) = oneshot::channel();
let mut state = self.state.lock().expect("failed to lock state");
state.add_job(job_id, crankshaft_id, tx);
drop(state);
Ok(SubmittedJob {
id: job_id,
task_name,
completed: rx,
})
}
async fn monitor(
state: Arc<Mutex<MonitorState>>,
interval: Duration,
job_name_prefix: Option<String>,
events: Events,
mut drop: oneshot::Receiver<()>,
) {
debug!(
"LSF task monitor is starting with polling interval of {interval} seconds",
interval = interval.as_secs()
);
let mut timer = tokio::time::interval(interval);
timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
select! {
_ = &mut drop => break,
_ = timer.tick() => {
let search_prefix = {
let mut state = state.lock().expect("failed to lock state");
if state.jobs.is_empty() {
continue;
}
state.tick = state.tick.wrapping_add(1);
assert!(!state.tag.is_empty(), "tag should not be empty");
format!(
"{prefix}{sep}{tag}*",
prefix = job_name_prefix.as_deref().unwrap_or(""),
sep = if job_name_prefix.is_some() {
"-"
} else {
""
},
tag = state.tag
)
};
let result = Self::read_job_records(&search_prefix).await.context("failed to query job status using `bjobs`");
let mut state = state.lock().expect("failed to lock state");
match result {
Ok(records) => state.update_jobs(records, &events),
Err(e) => state.fail_all_jobs(&e),
}
}
}
}
debug!("LSF task monitor has shut down");
}
async fn read_job_records(search_prefix: &str) -> Result<Vec<JobRecord>> {
#[derive(Deserialize)]
struct Output {
#[serde(rename = "RECORDS")]
records: Vec<JobRecord>,
}
let mut command = Command::new("bjobs");
let command = command
.arg("-a") .arg("-J")
.arg(search_prefix)
.arg("-json") .arg("-o") .arg("jobid stat exit_code max_mem avg_mem cpu_used ru_utime ru_stime")
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
trace!(?command, "spawning `bjobs` to monitor tasks");
let child = command.spawn().context("failed to spawn `bjobs` command")?;
let output = child
.wait_with_output()
.await
.context("failed to wait for `bjobs` to exit")?;
if !output.status.success() {
bail!(
"`bjobs` failed: {status}: {stderr}",
status = output.status,
stderr = str::from_utf8(&output.stderr)
.unwrap_or("<output not UTF-8>")
.trim()
);
}
Ok(serde_json::from_str::<Output>(
str::from_utf8(&output.stdout).map_err(|_| anyhow!("`bjobs` output was not UTF-8"))?,
)
.context("failed to deserialize `bjobs` output")?
.records)
}
}
pub struct LsfApptainerBackend {
config: Arc<Config>,
events: Events,
cancellation: CancellationContext,
apptainer: ApptainerRuntime,
monitor: Monitor,
permits: Semaphore,
}
impl LsfApptainerBackend {
pub fn new(
config: Arc<Config>,
run_root_dir: &Path,
events: Events,
cancellation: CancellationContext,
) -> Result<Self> {
let backend_config = config.backend()?;
let backend_config = backend_config
.as_lsf_apptainer()
.context("configured backend is not LSF Apptainer")?;
let monitor = Monitor::new(
Duration::from_secs(backend_config.interval.unwrap_or(DEFAULT_MONITOR_INTERVAL)),
backend_config.job_name_prefix.clone(),
events.clone(),
);
let permits = Semaphore::new(
backend_config
.max_concurrency
.unwrap_or(DEFAULT_MAX_CONCURRENCY) as usize,
);
let apptainer = ApptainerRuntime::new(
run_root_dir,
backend_config.apptainer_config.image_cache_dir.as_deref(),
)?;
Ok(Self {
config,
events,
cancellation,
apptainer,
monitor,
permits,
})
}
async fn kill_job(&self, job_id: u64) -> Result<()> {
let mut command = Command::new("bkill");
let command = command
.arg("-C")
.arg("task was cancelled")
.arg(job_id.to_string())
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null());
let _permit = self
.permits
.acquire()
.await
.context("failed to acquire permit for canceling job")?;
trace!(?command, "spawning `bkill` to cancel task");
let mut child = command.spawn().context("failed to spawn `bkill` command")?;
let status = child.wait().await.context("failed to wait for `bkill`")?;
if !status.success() {
bail!("`bkill` failed: {status}");
}
Ok(())
}
}
impl TaskExecutionBackend for LsfApptainerBackend {
fn constraints(
&self,
inputs: &TaskInputs,
requirements: &HashMap<String, Value>,
hints: &HashMap<String, Value>,
) -> Result<TaskExecutionConstraints> {
let mut required_cpu = requirements::cpu(inputs, requirements);
let mut required_memory = ByteSize::b(requirements::memory(inputs, requirements)? as u64);
let backend_config = self.config.backend()?;
let backend_config = backend_config
.as_lsf_apptainer()
.expect("configured backend is not LSF Apptainer");
if let Some(queue) = backend_config.lsf_queue_for_task(requirements, hints) {
if let Some(max_cpu) = queue.max_cpu_per_task
&& required_cpu > max_cpu as f64
{
let env_specific = if self.config.suppress_env_specific_output {
String::new()
} else {
format!(", but the execution backend has a maximum of {max_cpu}",)
};
match self.config.task.cpu_limit_behavior {
TaskResourceLimitBehavior::TryWithMax => {
warn!(
"task requires at least {required_cpu} CPU{s}{env_specific}",
s = if required_cpu == 1.0 { "" } else { "s" },
);
required_cpu = max_cpu as f64;
}
TaskResourceLimitBehavior::Deny => {
bail!(
"task requires at least {required_cpu} CPU{s}{env_specific}",
s = if required_cpu == 1.0 { "" } else { "s" },
);
}
}
}
if let Some(max_memory) = queue.max_memory_per_task
&& required_memory > max_memory
{
let env_specific = if self.config.suppress_env_specific_output {
String::new()
} else {
format!(
", but the execution backend has a maximum of {max_memory} GiB",
max_memory = max_memory.as_u64() as f64 / ONE_GIBIBYTE
)
};
match self.config.task.memory_limit_behavior {
TaskResourceLimitBehavior::TryWithMax => {
warn!(
"task requires at least {required_memory} GiB of memory{env_specific}",
required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
);
required_memory = max_memory;
}
TaskResourceLimitBehavior::Deny => {
bail!(
"task requires at least {required_memory} GiB of memory{env_specific}",
required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
);
}
}
}
}
let containers = requirements::container(inputs, requirements, &self.config.task.container);
Ok(TaskExecutionConstraints {
container: Some(containers),
cpu: required_cpu,
memory: required_memory.as_u64(),
gpu: Default::default(),
fpga: Default::default(),
disks: Default::default(),
})
}
fn execute<'a>(
&'a self,
_: &'a Arc<dyn Transferer>,
request: ExecuteTaskRequest<'a>,
) -> BoxFuture<'a, Result<Option<TaskExecutionResult>>> {
async move {
let backend_config = self.config.backend()?;
let backend_config = backend_config
.as_lsf_apptainer()
.expect("configured backend is not LSF Apptainer");
let work_dir = request.work_dir();
fs::create_dir_all(&work_dir).await.with_context(|| {
format!(
"failed to create working directory `{path}`",
path = work_dir.display()
)
})?;
let stdout_path = request.stdout_path();
let _ = File::create(&stdout_path).await.with_context(|| {
format!(
"failed to create stdout file `{path}`",
path = stdout_path.display()
)
})?;
let stderr_path = request.stderr_path();
let _ = File::create(&stderr_path).await.with_context(|| {
format!(
"failed to create stderr file `{path}`",
path = stderr_path.display()
)
})?;
let command_path = request.command_path();
fs::write(&command_path, request.command)
.await
.with_context(|| {
format!(
"failed to write command contents to `{path}`",
path = command_path.display()
)
})?;
let Some((apptainer_script, container)) = self
.apptainer
.generate_script(
&backend_config.apptainer_config,
&self.config.task.shell,
&request,
self.cancellation.first(),
)
.await?
else {
return Ok(None);
};
let apptainer_command_path = request.attempt_dir.join(APPTAINER_COMMAND_FILE_NAME);
fs::write(&apptainer_command_path, apptainer_script)
.await
.with_context(|| {
format!(
"failed to write Apptainer command file `{}`",
apptainer_command_path.display()
)
})?;
#[cfg(unix)]
{
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
fs::set_permissions(&command_path, Permissions::from_mode(0o770)).await?;
fs::set_permissions(&apptainer_command_path, Permissions::from_mode(0o770)).await?;
}
let crankshaft_id = crankshaft::events::next_task_id();
let permit = self
.permits
.acquire()
.await
.context("failed to acquire permit for submitting job")?;
let job = self.monitor.submit_job(backend_config, &request, crankshaft_id, &apptainer_command_path).await?;
drop(permit);
let name = job.task_name;
let job_id = job.id;
let task_token = CancellationToken::new();
send_event!(
self.events.crankshaft(),
CrankshaftEvent::TaskCreated {
id: crankshaft_id,
name: name.clone(),
tes_id: None,
token: task_token.clone(),
},
);
let cancelled = async {
send_event!(
self.events.crankshaft(),
CrankshaftEvent::TaskCanceled { id: crankshaft_id },
);
self.kill_job(job_id).await
};
let token = self.cancellation.second();
let exit_code = tokio::select! {
_ = task_token.cancelled() => {
if let Err(e) = cancelled.await {
error!("failed to cancel task `{name}` (LSF job `{job_id}`): {e:#}");
}
return Ok(None);
}
_ = token.cancelled() => {
if let Err(e) = cancelled.await {
error!("failed to cancel task `{name}` (LSF job `{job_id}`): {e:#}");
}
return Ok(None);
}
result = job.completed => match result.context("failed to wait for task to complete")? {
Ok(exit_code) => {
#[cfg(unix)]
let status = if exit_code >= 128 {
ExitStatus::from_raw((exit_code as i32 - 128) & 0x7f)
} else {
ExitStatus::from_raw((exit_code as i32) << 8)
};
#[cfg(windows)]
let status = ExitStatus::from_raw(exit_code as u32);
send_event!(
self.events.crankshaft(),
CrankshaftEvent::TaskCompleted {
id: crankshaft_id,
exit_statuses: NonEmpty::new(status),
}
);
exit_code
},
Err(e) => {
send_event!(
self.events.crankshaft(),
CrankshaftEvent::TaskFailed {
id: crankshaft_id,
message: format!("{e:#}"),
},
);
return Err(e);
}
}
};
Ok(Some(TaskExecutionResult {
container: Some(container),
exit_code: exit_code as i32,
work_dir: EvaluationPath::from_local_path(work_dir),
stdout: PrimitiveValue::new_file(
stdout_path
.into_os_string()
.into_string()
.expect("path should be UTF-8"),
)
.into(),
stderr: PrimitiveValue::new_file(
stderr_path
.into_os_string()
.into_string()
.expect("path should be UTF-8"),
)
.into(),
}))
}
.boxed()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn job_name_truncates() {
let name = "é".repeat(LSF_JOB_NAME_MAX_LENGTH);
assert_eq!(name.len(), 8188);
let name = truncate_job_name(&name);
assert!(name.len() < LSF_JOB_NAME_MAX_LENGTH);
}
}