use async_trait::async_trait;
use clap::{CommandFactory, Parser};
use crate::ast::Value;
use crate::interpreter::{ExecResult, OutputData};
use crate::scheduler::JobId;
use crate::tools::{schema_from_clap, ExecContext, ToolCtx, GlobalFlags, Tool, ToolArgs, ToolSchema};
pub struct Wait;
#[derive(Parser, Debug)]
#[command(name = "wait", about = "Wait for background jobs to complete")]
struct WaitArgs {
#[command(flatten)]
global: GlobalFlags,
job: Vec<String>,
}
#[async_trait]
impl Tool for Wait {
fn name(&self) -> &str {
"wait"
}
fn schema(&self) -> ToolSchema {
schema_from_clap(
&WaitArgs::command(),
"wait",
"Wait for background jobs to complete",
[
("Wait for all jobs", "wait"),
("Wait for specific job", "wait 1"),
],
)
}
async fn execute(&self, args: ToolArgs, ctx: &mut dyn ToolCtx) -> ExecResult {
let Some(ctx) = ctx.as_any_mut().downcast_mut::<ExecContext>() else {
return ExecResult::failure(1, "internal error: kernel builtin requires ExecContext");
};
let parsed = match WaitArgs::try_parse_from(
std::iter::once("wait".to_string()).chain(args.to_argv()),
) {
Ok(p) => p,
Err(e) => return ExecResult::failure(2, format!("wait: {e}")),
};
parsed.global.apply(ctx);
let manager = match &ctx.job_manager {
Some(m) => m.clone(),
None => return ExecResult::with_output(OutputData::text("(no job manager)\n")),
};
if !args.positional.is_empty() {
let mut output = String::new();
let mut any_failed = false;
for spec in &args.positional {
let id = match spec {
Value::Int(i) => JobId(*i as u64),
Value::String(s) => {
let digits = s.strip_prefix('%').unwrap_or(s);
match digits.parse::<u64>() {
Ok(i) => JobId(i),
Err(_) => return ExecResult::failure(1, format!("wait: invalid job id: {}", s)),
}
}
_ => return ExecResult::failure(1, "wait: job id must be a number"),
};
match manager.wait(id).await {
Some(result) => {
let status = if result.ok() {
"Done"
} else {
any_failed = true;
"Failed"
};
output.push_str(&format!("[{}] {}\n", id, status));
}
None => return ExecResult::failure(1, format!("wait: job {} not found", id)),
}
}
if any_failed {
let mut result = ExecResult::from_output(1, output.clone(), "");
result.set_output(Some(OutputData::text(output)));
result
} else {
ExecResult::with_output(OutputData::text(output))
}
} else {
let results = manager.wait_all().await;
if results.is_empty() {
return ExecResult::with_output(OutputData::text("(no jobs to wait for)\n"));
}
let mut output = String::new();
let mut any_failed = false;
for (id, result) in results {
let status = if result.ok() {
"Done"
} else {
any_failed = true;
"Failed"
};
output.push_str(&format!("[{}] {}\n", id, status));
}
if any_failed {
let mut result = ExecResult::from_output(1, output.clone(), "");
result.set_output(Some(OutputData::text(output)));
result
} else {
ExecResult::with_output(OutputData::text(output))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::scheduler::JobManager;
use crate::vfs::{MemoryFs, VfsRouter};
use std::sync::Arc;
use std::time::Duration;
fn make_ctx() -> ExecContext {
let mut vfs = VfsRouter::new();
vfs.mount("/", MemoryFs::new());
ExecContext::new(Arc::new(vfs))
}
#[tokio::test]
async fn test_wait_no_manager() {
let mut ctx = make_ctx();
let result = Wait.execute(ToolArgs::new(), &mut ctx).await;
assert!(result.ok());
assert!(result.text_out().contains("no job manager"));
}
#[tokio::test]
async fn test_wait_no_jobs() {
let mut ctx = make_ctx();
ctx.set_job_manager(Arc::new(JobManager::new()));
let result = Wait.execute(ToolArgs::new(), &mut ctx).await;
assert!(result.ok());
assert!(result.text_out().contains("no jobs"));
}
#[tokio::test]
async fn test_wait_all() {
let mut ctx = make_ctx();
let manager = Arc::new(JobManager::new());
ctx.set_job_manager(manager.clone());
manager.spawn("job1".to_string(), async {
tokio::time::sleep(Duration::from_millis(10)).await;
ExecResult::success("")
});
manager.spawn("job2".to_string(), async {
tokio::time::sleep(Duration::from_millis(5)).await;
ExecResult::success("")
});
tokio::time::sleep(Duration::from_millis(5)).await;
let result = Wait.execute(ToolArgs::new(), &mut ctx).await;
assert!(result.ok());
assert!(result.text_out().contains("[1]"));
assert!(result.text_out().contains("[2]"));
assert!(result.text_out().contains("Done"));
}
#[tokio::test]
async fn test_wait_specific_job() {
let mut ctx = make_ctx();
let manager = Arc::new(JobManager::new());
ctx.set_job_manager(manager.clone());
let id = manager.spawn("test".to_string(), async {
tokio::time::sleep(Duration::from_millis(10)).await;
ExecResult::success("")
});
tokio::time::sleep(Duration::from_millis(5)).await;
let mut args = ToolArgs::new();
args.positional.push(Value::Int(id.0 as i64));
let result = Wait.execute(args, &mut ctx).await;
assert!(result.ok());
assert!(result.text_out().contains(&format!("[{}]", id)));
}
#[tokio::test]
async fn test_wait_jobspec_percent_form() {
let mut ctx = make_ctx();
let manager = Arc::new(JobManager::new());
ctx.set_job_manager(manager.clone());
let id = manager.spawn("test".to_string(), async {
tokio::time::sleep(Duration::from_millis(10)).await;
ExecResult::success("")
});
tokio::time::sleep(Duration::from_millis(5)).await;
let mut args = ToolArgs::new();
args.positional.push(Value::String(format!("%{}", id.0)));
let result = Wait.execute(args, &mut ctx).await;
assert!(result.ok());
assert!(result.text_out().contains(&format!("[{}]", id)));
}
#[tokio::test]
async fn test_wait_multiple_jobspecs() {
let mut ctx = make_ctx();
let manager = Arc::new(JobManager::new());
ctx.set_job_manager(manager.clone());
let id1 = manager.spawn("j1".to_string(), async {
tokio::time::sleep(Duration::from_millis(10)).await;
ExecResult::success("")
});
let id2 = manager.spawn("j2".to_string(), async {
tokio::time::sleep(Duration::from_millis(5)).await;
ExecResult::success("")
});
tokio::time::sleep(Duration::from_millis(5)).await;
let mut args = ToolArgs::new();
args.positional.push(Value::String(format!("%{}", id1.0)));
args.positional.push(Value::String(format!("%{}", id2.0)));
let result = Wait.execute(args, &mut ctx).await;
assert!(result.ok());
assert!(result.text_out().contains(&format!("[{}]", id1)));
assert!(result.text_out().contains(&format!("[{}]", id2)));
}
#[tokio::test]
async fn test_wait_nonexistent_job() {
let mut ctx = make_ctx();
ctx.set_job_manager(Arc::new(JobManager::new()));
let mut args = ToolArgs::new();
args.positional.push(Value::Int(999));
let result = Wait.execute(args, &mut ctx).await;
assert!(!result.ok());
assert!(result.err.contains("not found"));
}
#[tokio::test]
async fn test_wait_failed_job() {
let mut ctx = make_ctx();
let manager = Arc::new(JobManager::new());
ctx.set_job_manager(manager.clone());
manager.spawn("fail".to_string(), async {
ExecResult::failure(1, "intentional failure")
});
tokio::time::sleep(Duration::from_millis(5)).await;
let result = Wait.execute(ToolArgs::new(), &mut ctx).await;
assert!(!result.ok()); assert!(result.text_out().contains("Failed"));
}
}