use anyhow::{Context, Result};
use std::path::Path;
use std::process::Command;
use tracing::{debug, info, warn};
use ulid::Ulid;
use crate::jobstore::{JobDir, resolve_root};
use crate::schema::{
JobMeta, JobMetaJob, JobState, JobStateJob, JobStateResult, JobStatus, Response, RunData,
Snapshot,
};
use crate::tag::dedup_tags;
#[derive(Debug)]
pub struct RunOpts<'a> {
pub command: Vec<String>,
pub root: Option<&'a str>,
pub snapshot_after: u64,
pub tail_lines: u64,
pub max_bytes: u64,
pub timeout_ms: u64,
pub kill_after_ms: u64,
pub cwd: Option<&'a str>,
pub env_vars: Vec<String>,
pub env_files: Vec<String>,
pub inherit_env: bool,
pub mask: Vec<String>,
pub tags: Vec<String>,
pub log: Option<&'a str>,
pub progress_every_ms: u64,
pub wait: bool,
pub wait_poll_ms: u64,
pub wait_until_ms: u64,
pub wait_forever: bool,
pub notify_command: Option<String>,
pub notify_file: Option<String>,
pub output_pattern: Option<String>,
pub output_match_type: Option<String>,
pub output_stream: Option<String>,
pub output_command: Option<String>,
pub output_file: Option<String>,
pub shell_wrapper: Vec<String>,
}
impl<'a> Default for RunOpts<'a> {
fn default() -> Self {
RunOpts {
command: vec![],
root: None,
snapshot_after: 10_000,
tail_lines: 50,
max_bytes: 65536,
timeout_ms: 0,
kill_after_ms: 0,
cwd: None,
env_vars: vec![],
env_files: vec![],
inherit_env: true,
mask: vec![],
tags: vec![],
log: None,
progress_every_ms: 0,
wait: false,
wait_poll_ms: 200,
wait_until_ms: 30_000,
wait_forever: false,
notify_command: None,
notify_file: None,
output_pattern: None,
output_match_type: None,
output_stream: None,
output_command: None,
output_file: None,
shell_wrapper: crate::config::default_shell_wrapper(),
}
}
}
const MAX_SNAPSHOT_AFTER_MS: u64 = 10_000;
pub struct SpawnSupervisorParams {
pub job_id: String,
pub root: std::path::PathBuf,
pub full_log_path: String,
pub timeout_ms: u64,
pub kill_after_ms: u64,
pub cwd: Option<String>,
pub env_vars: Vec<String>,
pub env_files: Vec<String>,
pub inherit_env: bool,
pub progress_every_ms: u64,
pub notify_command: Option<String>,
pub notify_file: Option<String>,
pub shell_wrapper: Vec<String>,
pub command: Vec<String>,
}
pub fn spawn_supervisor_process(
job_dir: &JobDir,
params: SpawnSupervisorParams,
) -> Result<(u32, String)> {
let started_at = now_rfc3339();
let exe = std::env::current_exe().context("resolve current exe")?;
let mut supervisor_cmd = Command::new(&exe);
supervisor_cmd
.arg("_supervise")
.arg("--job-id")
.arg(¶ms.job_id)
.arg("--supervise-root")
.arg(params.root.display().to_string())
.arg("--full-log")
.arg(¶ms.full_log_path);
if params.timeout_ms > 0 {
supervisor_cmd
.arg("--timeout")
.arg(params.timeout_ms.to_string());
}
if params.kill_after_ms > 0 {
supervisor_cmd
.arg("--kill-after")
.arg(params.kill_after_ms.to_string());
}
if let Some(ref cwd) = params.cwd {
supervisor_cmd.arg("--cwd").arg(cwd);
}
for env_file in ¶ms.env_files {
supervisor_cmd.arg("--env-file").arg(env_file);
}
for env_var in ¶ms.env_vars {
supervisor_cmd.arg("--env").arg(env_var);
}
if !params.inherit_env {
supervisor_cmd.arg("--no-inherit-env");
}
if params.progress_every_ms > 0 {
supervisor_cmd
.arg("--progress-every")
.arg(params.progress_every_ms.to_string());
}
if let Some(ref nc) = params.notify_command {
supervisor_cmd.arg("--notify-command").arg(nc);
}
if let Some(ref nf) = params.notify_file {
supervisor_cmd.arg("--notify-file").arg(nf);
}
let wrapper_json =
serde_json::to_string(¶ms.shell_wrapper).context("serialize shell wrapper")?;
supervisor_cmd
.arg("--shell-wrapper-resolved")
.arg(&wrapper_json);
supervisor_cmd
.arg("--")
.args(¶ms.command)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null());
let supervisor = supervisor_cmd.spawn().context("spawn supervisor")?;
let supervisor_pid = supervisor.id();
debug!(supervisor_pid, "supervisor spawned");
job_dir.init_state(supervisor_pid, &started_at)?;
#[cfg(windows)]
{
let handshake_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
loop {
std::thread::sleep(std::time::Duration::from_millis(10));
if let Ok(current_state) = job_dir.read_state() {
let supervisor_updated = current_state
.pid
.map(|p| p != supervisor_pid)
.unwrap_or(false)
|| *current_state.status() == crate::schema::JobStatus::Failed;
if supervisor_updated {
if *current_state.status() == crate::schema::JobStatus::Failed {
anyhow::bail!(
"supervisor failed to assign child process to Job Object \
(Windows MUST requirement); see stderr for details"
);
}
debug!("supervisor confirmed Job Object assignment via state.json handshake");
break;
}
}
if std::time::Instant::now() >= handshake_deadline {
debug!("supervisor handshake timed out; proceeding with initial state");
break;
}
}
}
Ok((supervisor_pid, started_at))
}
pub fn pre_create_log_files(job_dir: &JobDir) -> Result<()> {
for log_path in [
job_dir.stdout_path(),
job_dir.stderr_path(),
job_dir.full_log_path(),
] {
std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
.with_context(|| format!("pre-create log file {}", log_path.display()))?;
}
Ok(())
}
pub struct SnapshotWaitOpts {
pub snapshot_after: u64,
pub tail_lines: u64,
pub max_bytes: u64,
pub wait: bool,
pub wait_poll_ms: u64,
pub wait_until_ms: u64,
pub wait_forever: bool,
}
pub fn run_snapshot_wait(
job_dir: &JobDir,
opts: &SnapshotWaitOpts,
) -> (
String,
Option<i32>,
Option<String>,
Option<Snapshot>,
Option<Snapshot>,
u64,
) {
use crate::schema::JobStatus;
let effective_snapshot_after = if opts.wait {
0
} else {
opts.snapshot_after.min(MAX_SNAPSHOT_AFTER_MS)
};
let wait_start = std::time::Instant::now();
let snapshot = if effective_snapshot_after > 0 {
debug!(ms = effective_snapshot_after, "polling for snapshot");
let deadline = wait_start + std::time::Duration::from_millis(effective_snapshot_after);
let poll_interval = std::time::Duration::from_millis(15);
loop {
std::thread::sleep(poll_interval);
if let Ok(st) = job_dir.read_state()
&& !st.status().is_non_terminal()
{
debug!("snapshot poll: job no longer running/created, exiting early");
break;
}
if std::time::Instant::now() >= deadline {
debug!("snapshot poll: deadline reached");
break;
}
}
Some(build_snapshot(job_dir, opts.tail_lines, opts.max_bytes))
} else {
None
};
let (final_state, exit_code_opt, finished_at_opt, final_snapshot_opt) = if opts.wait {
debug!(
wait_until_ms = opts.wait_until_ms,
wait_forever = opts.wait_forever,
"--wait: polling for terminal or deadline"
);
let poll = std::time::Duration::from_millis(opts.wait_poll_ms.max(1));
let wait_deadline = if opts.wait_forever {
None
} else {
Some(wait_start + std::time::Duration::from_millis(opts.wait_until_ms))
};
loop {
std::thread::sleep(poll);
if let Ok(st) = job_dir.read_state() {
if !st.status().is_non_terminal() {
let snap = build_snapshot(job_dir, opts.tail_lines, opts.max_bytes);
let ec = st.exit_code();
let fa = st.finished_at.clone();
let state_str = st.status().as_str().to_string();
break (state_str, ec, fa, Some(snap));
}
if let Some(deadline) = wait_deadline
&& std::time::Instant::now() >= deadline
{
break (st.status().as_str().to_string(), None, None, None);
}
}
}
} else {
(JobStatus::Running.as_str().to_string(), None, None, None)
};
let waited_ms = wait_start.elapsed().as_millis() as u64;
(
final_state,
exit_code_opt,
finished_at_opt,
snapshot,
final_snapshot_opt,
waited_ms,
)
}
pub fn execute(opts: RunOpts) -> Result<()> {
if opts.command.is_empty() {
anyhow::bail!("no command specified for run");
}
let elapsed_start = std::time::Instant::now();
let root = resolve_root(opts.root);
std::fs::create_dir_all(&root)
.with_context(|| format!("create jobs root {}", root.display()))?;
let job_id = Ulid::new().to_string();
let created_at = now_rfc3339();
let env_keys: Vec<String> = opts
.env_vars
.iter()
.map(|kv| kv.split('=').next().unwrap_or(kv.as_str()).to_string())
.collect();
let masked_env_vars = mask_env_vars(&opts.env_vars, &opts.mask);
let effective_cwd = resolve_effective_cwd(opts.cwd);
let on_output_match = crate::notify::build_output_match_config(
opts.output_pattern,
opts.output_match_type,
opts.output_stream,
opts.output_command,
opts.output_file,
None,
);
let notification =
if opts.notify_command.is_some() || opts.notify_file.is_some() || on_output_match.is_some()
{
Some(crate::schema::NotificationConfig {
notify_command: opts.notify_command.clone(),
notify_file: opts.notify_file.clone(),
on_output_match,
})
} else {
None
};
let tags = dedup_tags(opts.tags)?;
let meta = JobMeta {
job: JobMetaJob { id: job_id.clone() },
schema_version: crate::schema::SCHEMA_VERSION.to_string(),
command: opts.command.clone(),
created_at: created_at.clone(),
root: root.display().to_string(),
env_keys,
env_vars: masked_env_vars.clone(),
env_vars_runtime: vec![],
mask: opts.mask.clone(),
cwd: Some(effective_cwd),
notification,
inherit_env: opts.inherit_env,
env_files: opts.env_files.clone(),
timeout_ms: opts.timeout_ms,
kill_after_ms: opts.kill_after_ms,
progress_every_ms: opts.progress_every_ms,
shell_wrapper: Some(opts.shell_wrapper.clone()),
tags: tags.clone(),
};
let job_dir = JobDir::create(&root, &job_id, &meta)?;
info!(job_id = %job_id, "created job directory");
let full_log_path = if let Some(log) = opts.log {
log.to_string()
} else {
job_dir.full_log_path().display().to_string()
};
pre_create_log_files(&job_dir)?;
let (_supervisor_pid, _started_at) = spawn_supervisor_process(
&job_dir,
SpawnSupervisorParams {
job_id: job_id.clone(),
root: root.clone(),
full_log_path: full_log_path.clone(),
timeout_ms: opts.timeout_ms,
kill_after_ms: opts.kill_after_ms,
cwd: opts.cwd.map(|s| s.to_string()),
env_vars: opts.env_vars.clone(),
env_files: opts.env_files.clone(),
inherit_env: opts.inherit_env,
progress_every_ms: opts.progress_every_ms,
notify_command: opts.notify_command.clone(),
notify_file: opts.notify_file.clone(),
shell_wrapper: opts.shell_wrapper.clone(),
command: opts.command.clone(),
},
)?;
let stdout_log_path = job_dir.stdout_path().display().to_string();
let stderr_log_path = job_dir.stderr_path().display().to_string();
let (final_state, exit_code_opt, finished_at_opt, snapshot, final_snapshot_opt, waited_ms) =
run_snapshot_wait(
&job_dir,
&SnapshotWaitOpts {
snapshot_after: opts.snapshot_after,
tail_lines: opts.tail_lines,
max_bytes: opts.max_bytes,
wait: opts.wait,
wait_poll_ms: opts.wait_poll_ms,
wait_until_ms: opts.wait_until_ms,
wait_forever: opts.wait_forever,
},
);
let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
let response = Response::new(
"run",
RunData {
job_id,
state: final_state,
tags,
env_vars: masked_env_vars,
snapshot,
stdout_log_path,
stderr_log_path,
waited_ms,
elapsed_ms,
exit_code: exit_code_opt,
finished_at: finished_at_opt,
final_snapshot: final_snapshot_opt,
},
);
response.print();
Ok(())
}
fn build_snapshot(job_dir: &JobDir, tail_lines: u64, max_bytes: u64) -> Snapshot {
let stdout = job_dir.read_tail_metrics("stdout.log", tail_lines, max_bytes);
let stderr = job_dir.read_tail_metrics("stderr.log", tail_lines, max_bytes);
Snapshot {
truncated: stdout.truncated || stderr.truncated,
encoding: "utf-8-lossy".to_string(),
stdout_observed_bytes: stdout.observed_bytes,
stderr_observed_bytes: stderr.observed_bytes,
stdout_included_bytes: stdout.included_bytes,
stderr_included_bytes: stderr.included_bytes,
stdout_tail: stdout.tail,
stderr_tail: stderr.tail,
}
}
#[derive(Debug)]
pub struct SuperviseOpts<'a> {
pub job_id: &'a str,
pub root: &'a Path,
pub command: &'a [String],
pub full_log: Option<&'a str>,
pub timeout_ms: u64,
pub kill_after_ms: u64,
pub cwd: Option<&'a str>,
pub env_vars: Vec<String>,
pub env_files: Vec<String>,
pub inherit_env: bool,
pub progress_every_ms: u64,
pub notify_command: Option<String>,
pub notify_file: Option<String>,
pub shell_wrapper: Vec<String>,
}
pub fn resolve_effective_cwd(cwd_override: Option<&str>) -> String {
let base = match cwd_override {
Some(p) => std::path::PathBuf::from(p),
None => std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")),
};
match base.canonicalize() {
Ok(canonical) => canonical.display().to_string(),
Err(_) => {
if base.is_absolute() {
base.display().to_string()
} else {
let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
cwd.join(base).display().to_string()
}
}
}
}
pub fn mask_env_vars(env_vars: &[String], mask_keys: &[String]) -> Vec<String> {
if mask_keys.is_empty() {
return env_vars.to_vec();
}
env_vars
.iter()
.map(|s| {
let (key, _val) = parse_env_var(s);
if mask_keys.iter().any(|k| k == &key) {
format!("{key}=***")
} else {
s.clone()
}
})
.collect()
}
fn parse_env_var(s: &str) -> (String, String) {
if let Some(pos) = s.find('=') {
(s[..pos].to_string(), s[pos + 1..].to_string())
} else {
(s.to_string(), String::new())
}
}
fn load_env_file(path: &str) -> Result<Vec<(String, String)>> {
let contents =
std::fs::read_to_string(path).with_context(|| format!("read env-file {path}"))?;
let mut vars = Vec::new();
for line in contents.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
vars.push(parse_env_var(line));
}
Ok(vars)
}
struct OutputMatchChecker {
job_dir_path: std::path::PathBuf,
shell_wrapper: Vec<String>,
inner: std::sync::Mutex<OutputMatchInner>,
}
struct OutputMatchInner {
config: Option<crate::schema::NotificationConfig>,
}
impl OutputMatchChecker {
fn new(
job_dir_path: std::path::PathBuf,
shell_wrapper: Vec<String>,
initial_config: Option<crate::schema::NotificationConfig>,
) -> Self {
Self {
job_dir_path,
shell_wrapper,
inner: std::sync::Mutex::new(OutputMatchInner {
config: initial_config,
}),
}
}
fn check_line(&self, line: &str, stream: &str) {
use crate::schema::{OutputMatchStream, OutputMatchType};
let match_info: Option<crate::schema::OutputMatchConfig> = {
let mut inner = self.inner.lock().unwrap();
{
let meta_path = self.job_dir_path.join("meta.json");
if let Ok(raw) = std::fs::read(&meta_path)
&& let Ok(meta) = serde_json::from_slice::<crate::schema::JobMeta>(&raw)
{
inner.config = meta.notification;
}
}
let Some(ref notification) = inner.config else {
return;
};
let Some(ref match_cfg) = notification.on_output_match else {
return;
};
let stream_matches = match match_cfg.stream {
OutputMatchStream::Stdout => stream == "stdout",
OutputMatchStream::Stderr => stream == "stderr",
OutputMatchStream::Either => true,
};
if !stream_matches {
return;
}
let matched = match &match_cfg.match_type {
OutputMatchType::Contains => line.contains(&match_cfg.pattern),
OutputMatchType::Regex => regex::Regex::new(&match_cfg.pattern)
.map(|re| re.is_match(line))
.unwrap_or(false),
};
if matched {
Some(match_cfg.clone())
} else {
None
}
};
if let Some(match_cfg) = match_info {
self.dispatch_match(line, stream, &match_cfg);
}
}
fn dispatch_match(
&self,
line: &str,
stream: &str,
match_cfg: &crate::schema::OutputMatchConfig,
) {
use std::io::Write;
let job_id = self
.job_dir_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown");
let stdout_log_path = self.job_dir_path.join("stdout.log").display().to_string();
let stderr_log_path = self.job_dir_path.join("stderr.log").display().to_string();
let events_path = self.job_dir_path.join("notification_events.ndjson");
let events_path_str = events_path.display().to_string();
let match_type_str = match &match_cfg.match_type {
crate::schema::OutputMatchType::Contains => "contains",
crate::schema::OutputMatchType::Regex => "regex",
};
let event = crate::schema::OutputMatchEvent {
schema_version: crate::schema::SCHEMA_VERSION.to_string(),
event_type: "job.output.matched".to_string(),
job_id: job_id.to_string(),
pattern: match_cfg.pattern.clone(),
match_type: match_type_str.to_string(),
stream: stream.to_string(),
line: line.to_string(),
stdout_log_path,
stderr_log_path,
};
let event_json = serde_json::to_string(&event).unwrap_or_default();
let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
if let Some(ref cmd) = match_cfg.command {
delivery_results.push(dispatch_command_sink(
cmd,
&event_json,
job_id,
&events_path_str,
&self.shell_wrapper,
"job.output.matched",
));
}
if let Some(ref file_path) = match_cfg.file {
delivery_results.push(dispatch_file_sink(file_path, &event_json));
}
let record = crate::schema::OutputMatchEventRecord {
event,
delivery_results,
};
if let Ok(record_json) = serde_json::to_string(&record)
&& let Ok(mut f) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&events_path)
{
let _ = writeln!(f, "{record_json}");
}
}
}
fn stream_to_logs<R, F>(
stream: R,
log_path: &std::path::Path,
full_log: std::sync::Arc<std::sync::Mutex<std::fs::File>>,
label: &str,
on_line: Option<F>,
) where
R: std::io::Read,
F: Fn(&str),
{
use std::io::Write;
let mut log_file = std::fs::File::create(log_path).expect("create stream log file in thread");
let mut stream = stream;
let mut buf = [0u8; 8192];
let mut line_buf: Vec<u8> = Vec::new();
loop {
match stream.read(&mut buf) {
Ok(0) => break, Ok(n) => {
let chunk = &buf[..n];
let _ = log_file.write_all(chunk);
for &b in chunk {
if b == b'\n' {
let line = String::from_utf8_lossy(&line_buf);
if let Ok(mut fl) = full_log.lock() {
let ts = now_rfc3339();
let _ = writeln!(fl, "{ts} [{label}] {line}");
}
if let Some(ref f) = on_line {
f(&line);
}
line_buf.clear();
} else {
line_buf.push(b);
}
}
}
Err(_) => break,
}
}
if !line_buf.is_empty() {
let line = String::from_utf8_lossy(&line_buf);
if let Ok(mut fl) = full_log.lock() {
let ts = now_rfc3339();
let _ = writeln!(fl, "{ts} [{label}] {line}");
}
if let Some(ref f) = on_line {
f(&line);
}
}
}
pub fn supervise(opts: SuperviseOpts) -> Result<()> {
use std::sync::{Arc, Mutex};
let job_id = opts.job_id;
let root = opts.root;
let command = opts.command;
if command.is_empty() {
anyhow::bail!("supervisor: no command");
}
let job_dir = JobDir::open(root, job_id)?;
let meta = job_dir.read_meta()?;
let started_at = now_rfc3339();
let full_log_path = if let Some(p) = opts.full_log {
std::path::PathBuf::from(p)
} else {
job_dir.full_log_path()
};
if let Some(parent) = full_log_path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("create dir for full.log: {}", parent.display()))?;
}
let full_log_file = std::fs::File::create(&full_log_path).context("create full.log")?;
let full_log = Arc::new(Mutex::new(full_log_file));
if opts.shell_wrapper.is_empty() {
anyhow::bail!("supervisor: shell wrapper must not be empty");
}
let mut child_cmd = Command::new(&opts.shell_wrapper[0]);
if command.len() == 1 {
child_cmd.args(&opts.shell_wrapper[1..]).arg(&command[0]);
} else {
#[cfg(unix)]
{
child_cmd
.args(&opts.shell_wrapper[1..])
.arg("exec \"$@\"")
.arg("--")
.args(command);
}
#[cfg(not(unix))]
{
let joined = command
.iter()
.map(|a| {
if a.contains(' ') {
format!("\"{}\"", a)
} else {
a.clone()
}
})
.collect::<Vec<_>>()
.join(" ");
child_cmd.args(&opts.shell_wrapper[1..]).arg(joined);
}
}
if opts.inherit_env {
} else {
child_cmd.env_clear();
}
for env_file in &opts.env_files {
let vars = load_env_file(env_file)?;
for (k, v) in vars {
child_cmd.env(&k, &v);
}
}
for env_var in &opts.env_vars {
let (k, v) = parse_env_var(env_var);
child_cmd.env(&k, &v);
}
if let Some(cwd) = opts.cwd {
child_cmd.current_dir(cwd);
}
#[cfg(unix)]
{
use std::os::unix::process::CommandExt;
unsafe {
child_cmd.pre_exec(|| {
libc::setsid();
Ok(())
});
}
}
let mut child = child_cmd
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.context("supervisor: spawn child")?;
let pid = child.id();
info!(job_id, pid, "child process started");
#[cfg(windows)]
let windows_job_name = {
match assign_to_job_object(job_id, pid) {
Ok(name) => Some(name),
Err(e) => {
let kill_err = child.kill();
let _ = child.wait();
let failed_state = JobState {
job: JobStateJob {
id: job_id.to_string(),
status: JobStatus::Failed,
started_at: Some(started_at.clone()),
},
result: JobStateResult {
exit_code: None,
signal: None,
duration_ms: None,
},
pid: Some(pid),
finished_at: Some(now_rfc3339()),
updated_at: now_rfc3339(),
windows_job_name: None,
};
let _ = job_dir.write_state(&failed_state);
if opts.notify_command.is_some() || opts.notify_file.is_some() {
let finished_at_ts =
failed_state.finished_at.clone().unwrap_or_else(now_rfc3339);
let stdout_log = job_dir.stdout_path().display().to_string();
let stderr_log = job_dir.stderr_path().display().to_string();
let fail_event = crate::schema::CompletionEvent {
schema_version: crate::schema::SCHEMA_VERSION.to_string(),
event_type: "job.finished".to_string(),
job_id: job_id.to_string(),
state: JobStatus::Failed.as_str().to_string(),
command: meta.command.clone(),
cwd: meta.cwd.clone(),
started_at: started_at.clone(),
finished_at: finished_at_ts,
duration_ms: None,
exit_code: None,
signal: None,
stdout_log_path: stdout_log,
stderr_log_path: stderr_log,
};
let fail_event_json = serde_json::to_string(&fail_event).unwrap_or_default();
let fail_event_path = job_dir.completion_event_path().display().to_string();
let mut fail_delivery_results: Vec<crate::schema::SinkDeliveryResult> =
Vec::new();
if let Err(we) = job_dir.write_completion_event_atomic(
&crate::schema::CompletionEventRecord {
event: fail_event.clone(),
delivery_results: vec![],
},
) {
warn!(
job_id,
error = %we,
"failed to write initial completion_event.json for failed job"
);
}
if let Some(ref shell_cmd) = opts.notify_command {
fail_delivery_results.push(dispatch_command_sink(
shell_cmd,
&fail_event_json,
job_id,
&fail_event_path,
&opts.shell_wrapper,
"job.finished",
));
}
if let Some(ref file_path) = opts.notify_file {
fail_delivery_results.push(dispatch_file_sink(file_path, &fail_event_json));
}
if let Err(we) = job_dir.write_completion_event_atomic(
&crate::schema::CompletionEventRecord {
event: fail_event,
delivery_results: fail_delivery_results,
},
) {
warn!(
job_id,
error = %we,
"failed to update completion_event.json with delivery results for failed job"
);
}
}
if let Err(ke) = kill_err {
return Err(anyhow::anyhow!(
"supervisor: failed to assign pid {pid} to Job Object \
(Windows MUST requirement): {e}; also failed to kill child: {ke}"
));
}
return Err(anyhow::anyhow!(
"supervisor: failed to assign pid {pid} to Job Object \
(Windows MUST requirement); child process was killed; \
consider running outside a nested Job Object environment: {e}"
));
}
}
};
#[cfg(not(windows))]
let windows_job_name: Option<String> = None;
let state = JobState {
job: JobStateJob {
id: job_id.to_string(),
status: JobStatus::Running,
started_at: Some(started_at.clone()),
},
result: JobStateResult {
exit_code: None,
signal: None,
duration_ms: None,
},
pid: Some(pid),
finished_at: None,
updated_at: now_rfc3339(),
windows_job_name,
};
job_dir.write_state(&state)?;
let child_start_time = std::time::Instant::now();
let child_stdout = child.stdout.take().expect("child stdout piped");
let child_stderr = child.stderr.take().expect("child stderr piped");
let match_checker = std::sync::Arc::new(OutputMatchChecker::new(
job_dir.path.clone(),
opts.shell_wrapper.clone(),
meta.notification.clone(),
));
let (tx_stdout_done, rx_stdout_done) = std::sync::mpsc::channel::<()>();
let (tx_stderr_done, rx_stderr_done) = std::sync::mpsc::channel::<()>();
let stdout_log_path = job_dir.stdout_path();
let full_log_stdout = Arc::clone(&full_log);
let match_checker_stdout = std::sync::Arc::clone(&match_checker);
let t_stdout = std::thread::spawn(move || {
stream_to_logs(
child_stdout,
&stdout_log_path,
full_log_stdout,
"STDOUT",
Some(move |line: &str| match_checker_stdout.check_line(line, "stdout")),
);
let _ = tx_stdout_done.send(());
});
let stderr_log_path = job_dir.stderr_path();
let full_log_stderr = Arc::clone(&full_log);
let match_checker_stderr = std::sync::Arc::clone(&match_checker);
let t_stderr = std::thread::spawn(move || {
stream_to_logs(
child_stderr,
&stderr_log_path,
full_log_stderr,
"STDERR",
Some(move |line: &str| match_checker_stderr.check_line(line, "stderr")),
);
let _ = tx_stderr_done.send(());
});
let timeout_ms = opts.timeout_ms;
let kill_after_ms = opts.kill_after_ms;
let progress_every_ms = opts.progress_every_ms;
let state_path = job_dir.state_path();
let job_id_str = job_id.to_string();
use std::sync::atomic::{AtomicBool, Ordering};
let child_done = Arc::new(AtomicBool::new(false));
let watcher = if timeout_ms > 0 || progress_every_ms > 0 {
let state_path_clone = state_path.clone();
let child_done_clone = Arc::clone(&child_done);
Some(std::thread::spawn(move || {
let start = std::time::Instant::now();
let timeout_dur = if timeout_ms > 0 {
Some(std::time::Duration::from_millis(timeout_ms))
} else {
None
};
let progress_dur = if progress_every_ms > 0 {
Some(std::time::Duration::from_millis(progress_every_ms))
} else {
None
};
let poll_interval = std::time::Duration::from_millis(100);
loop {
std::thread::sleep(poll_interval);
if child_done_clone.load(Ordering::Relaxed) {
break;
}
let elapsed = start.elapsed();
if let Some(td) = timeout_dur
&& elapsed >= td
{
info!(job_id = %job_id_str, "timeout reached, sending SIGTERM to process group");
#[cfg(unix)]
{
unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGTERM) };
}
if kill_after_ms > 0 {
std::thread::sleep(std::time::Duration::from_millis(kill_after_ms));
info!(job_id = %job_id_str, "kill-after elapsed, sending SIGKILL to process group");
#[cfg(unix)]
{
unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
}
} else {
#[cfg(unix)]
{
unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
}
}
break;
}
if let Some(pd) = progress_dur {
let elapsed_ms = elapsed.as_millis() as u64;
let pd_ms = pd.as_millis() as u64;
let poll_ms = poll_interval.as_millis() as u64;
if elapsed_ms % pd_ms < poll_ms {
if let Ok(raw) = std::fs::read(&state_path_clone)
&& let Ok(mut st) =
serde_json::from_slice::<crate::schema::JobState>(&raw)
{
st.updated_at = now_rfc3339();
if let Ok(s) = serde_json::to_string_pretty(&st) {
let _ = std::fs::write(&state_path_clone, s);
}
}
}
}
}
}))
} else {
None
};
let exit_status = child.wait().context("wait for child")?;
child_done.store(true, Ordering::Relaxed);
let duration_ms = child_start_time.elapsed().as_millis() as u64;
let exit_code = exit_status.code();
let finished_at = now_rfc3339();
#[cfg(unix)]
let (terminal_status, signal_name) = {
use std::os::unix::process::ExitStatusExt;
if let Some(sig) = exit_status.signal() {
(JobStatus::Killed, Some(sig.to_string()))
} else {
(JobStatus::Exited, None)
}
};
#[cfg(not(unix))]
let (terminal_status, signal_name) = (JobStatus::Exited, None::<String>);
let state = JobState {
job: JobStateJob {
id: job_id.to_string(),
status: terminal_status.clone(),
started_at: Some(started_at.clone()),
},
result: JobStateResult {
exit_code,
signal: signal_name.clone(),
duration_ms: Some(duration_ms),
},
pid: Some(pid),
finished_at: Some(finished_at.clone()),
updated_at: now_rfc3339(),
windows_job_name: None, };
job_dir.write_state(&state)?;
info!(job_id, ?exit_code, "child process finished");
const LOG_DRAIN_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000);
let drain_deadline = std::time::Instant::now() + LOG_DRAIN_TIMEOUT;
let remaining = drain_deadline
.checked_duration_since(std::time::Instant::now())
.unwrap_or(std::time::Duration::ZERO);
if rx_stdout_done.recv_timeout(remaining).is_ok() {
let _ = t_stdout.join();
} else {
drop(t_stdout); }
let remaining = drain_deadline
.checked_duration_since(std::time::Instant::now())
.unwrap_or(std::time::Duration::ZERO);
if rx_stderr_done.recv_timeout(remaining).is_ok() {
let _ = t_stderr.join();
} else {
drop(t_stderr); }
if let Some(w) = watcher {
let _ = w.join();
}
let latest_notification = job_dir.read_meta().ok().and_then(|m| m.notification);
let (current_notify_command, current_notify_file) = match &latest_notification {
Some(n) => (n.notify_command.clone(), n.notify_file.clone()),
None => (None, None),
};
let has_notification = current_notify_command.is_some() || current_notify_file.is_some();
if has_notification {
let stdout_log = job_dir.stdout_path().display().to_string();
let stderr_log = job_dir.stderr_path().display().to_string();
let event = crate::schema::CompletionEvent {
schema_version: crate::schema::SCHEMA_VERSION.to_string(),
event_type: "job.finished".to_string(),
job_id: job_id.to_string(),
state: terminal_status.as_str().to_string(),
command: meta.command.clone(),
cwd: meta.cwd.clone(),
started_at,
finished_at,
duration_ms: Some(duration_ms),
exit_code,
signal: signal_name,
stdout_log_path: stdout_log,
stderr_log_path: stderr_log,
};
let event_json = serde_json::to_string(&event).unwrap_or_default();
let event_path = job_dir.completion_event_path().display().to_string();
let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
if let Err(e) =
job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
event: event.clone(),
delivery_results: vec![],
})
{
warn!(job_id, error = %e, "failed to write initial completion_event.json");
}
if let Some(ref shell_cmd) = current_notify_command {
delivery_results.push(dispatch_command_sink(
shell_cmd,
&event_json,
job_id,
&event_path,
&opts.shell_wrapper,
"job.finished",
));
}
if let Some(ref file_path) = current_notify_file {
delivery_results.push(dispatch_file_sink(file_path, &event_json));
}
if let Err(e) =
job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
event,
delivery_results,
})
{
warn!(job_id, error = %e, "failed to update completion_event.json with delivery results");
}
}
Ok(())
}
fn dispatch_command_sink(
shell_cmd: &str,
event_json: &str,
job_id: &str,
event_path: &str,
shell_wrapper: &[String],
event_type: &str,
) -> crate::schema::SinkDeliveryResult {
use std::io::Write;
let attempted_at = now_rfc3339();
let target = shell_cmd.to_string();
if shell_cmd.trim().is_empty() {
return crate::schema::SinkDeliveryResult {
sink_type: "command".to_string(),
target,
success: false,
error: Some("empty shell command".to_string()),
attempted_at,
};
}
if shell_wrapper.is_empty() {
return crate::schema::SinkDeliveryResult {
sink_type: "command".to_string(),
target,
success: false,
error: Some("shell wrapper must not be empty".to_string()),
attempted_at,
};
}
let mut cmd = Command::new(&shell_wrapper[0]);
cmd.args(&shell_wrapper[1..]).arg(shell_cmd);
cmd.env("AGENT_EXEC_EVENT_PATH", event_path);
cmd.env("AGENT_EXEC_JOB_ID", job_id);
cmd.env("AGENT_EXEC_EVENT_TYPE", event_type);
cmd.stdin(std::process::Stdio::piped());
cmd.stdout(std::process::Stdio::null());
cmd.stderr(std::process::Stdio::null());
match cmd.spawn() {
Ok(mut child) => {
if let Some(mut stdin) = child.stdin.take() {
let _ = stdin.write_all(event_json.as_bytes());
}
match child.wait() {
Ok(status) if status.success() => crate::schema::SinkDeliveryResult {
sink_type: "command".to_string(),
target,
success: true,
error: None,
attempted_at,
},
Ok(status) => crate::schema::SinkDeliveryResult {
sink_type: "command".to_string(),
target,
success: false,
error: Some(format!("exited with status {status}")),
attempted_at,
},
Err(e) => crate::schema::SinkDeliveryResult {
sink_type: "command".to_string(),
target,
success: false,
error: Some(format!("wait error: {e}")),
attempted_at,
},
}
}
Err(e) => crate::schema::SinkDeliveryResult {
sink_type: "command".to_string(),
target,
success: false,
error: Some(format!("spawn error: {e}")),
attempted_at,
},
}
}
fn dispatch_file_sink(file_path: &str, event_json: &str) -> crate::schema::SinkDeliveryResult {
use std::io::Write;
let attempted_at = now_rfc3339();
let path = std::path::Path::new(file_path);
if let Some(parent) = path.parent()
&& let Err(e) = std::fs::create_dir_all(parent)
{
return crate::schema::SinkDeliveryResult {
sink_type: "file".to_string(),
target: file_path.to_string(),
success: false,
error: Some(format!("create parent dir: {e}")),
attempted_at,
};
}
match std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
{
Ok(mut f) => match writeln!(f, "{event_json}") {
Ok(_) => crate::schema::SinkDeliveryResult {
sink_type: "file".to_string(),
target: file_path.to_string(),
success: true,
error: None,
attempted_at,
},
Err(e) => crate::schema::SinkDeliveryResult {
sink_type: "file".to_string(),
target: file_path.to_string(),
success: false,
error: Some(format!("write error: {e}")),
attempted_at,
},
},
Err(e) => crate::schema::SinkDeliveryResult {
sink_type: "file".to_string(),
target: file_path.to_string(),
success: false,
error: Some(format!("open error: {e}")),
attempted_at,
},
}
}
pub fn now_rfc3339_pub() -> String {
now_rfc3339()
}
fn now_rfc3339() -> String {
let d = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default();
format_rfc3339(d.as_secs())
}
fn format_rfc3339(secs: u64) -> String {
let mut s = secs;
let seconds = s % 60;
s /= 60;
let minutes = s % 60;
s /= 60;
let hours = s % 24;
s /= 24;
let mut days = s;
let mut year = 1970u64;
loop {
let days_in_year = if is_leap(year) { 366 } else { 365 };
if days < days_in_year {
break;
}
days -= days_in_year;
year += 1;
}
let leap = is_leap(year);
let month_days: [u64; 12] = [
31,
if leap { 29 } else { 28 },
31,
30,
31,
30,
31,
31,
30,
31,
30,
31,
];
let mut month = 0usize;
for (i, &d) in month_days.iter().enumerate() {
if days < d {
month = i;
break;
}
days -= d;
}
let day = days + 1;
format!(
"{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
year,
month + 1,
day,
hours,
minutes,
seconds
)
}
fn is_leap(year: u64) -> bool {
(year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
}
#[cfg(windows)]
fn assign_to_job_object(job_id: &str, pid: u32) -> Result<String> {
use windows::Win32::Foundation::CloseHandle;
use windows::Win32::System::JobObjects::{AssignProcessToJobObject, CreateJobObjectW};
use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
use windows::core::HSTRING;
let job_name = format!("AgentExec-{job_id}");
let hname = HSTRING::from(job_name.as_str());
unsafe {
let proc_handle =
OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid).map_err(|e| {
anyhow::anyhow!(
"supervisor: OpenProcess(pid={pid}) failed — cannot assign to Job Object: {e}"
)
})?;
let job = match CreateJobObjectW(None, &hname) {
Ok(h) => h,
Err(e) => {
let _ = CloseHandle(proc_handle);
return Err(anyhow::anyhow!(
"supervisor: CreateJobObjectW({job_name}) failed: {e}"
));
}
};
if let Err(e) = AssignProcessToJobObject(job, proc_handle) {
let _ = CloseHandle(job);
let _ = CloseHandle(proc_handle);
return Err(anyhow::anyhow!(
"supervisor: AssignProcessToJobObject(pid={pid}) failed \
(process may already belong to another Job Object, e.g. in a CI environment): {e}"
));
}
let _ = CloseHandle(proc_handle);
std::mem::forget(job);
}
info!(job_id, name = %job_name, "supervisor: child assigned to Job Object");
Ok(job_name)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rfc3339_epoch() {
assert_eq!(format_rfc3339(0), "1970-01-01T00:00:00Z");
}
#[test]
fn rfc3339_known_date() {
assert_eq!(format_rfc3339(1704067200), "2024-01-01T00:00:00Z");
}
}