Skip to main content

agent_exec/
run.rs

1//! Implementation of the `run` sub-command.
2//!
3//! Design decisions (from design.md):
4//! - `run` spawns a short-lived front-end that forks a `_supervise` child.
5//! - The supervisor continues logging stdout/stderr after `run` returns.
6//! - If `--snapshot-after` elapses before `run` returns, a snapshot is
7//!   included in the JSON response.
8
9use anyhow::{Context, Result};
10use std::path::Path;
11use std::process::Command;
12use tracing::{debug, info, warn};
13use ulid::Ulid;
14
15use crate::jobstore::{JobDir, resolve_root};
16use crate::schema::{
17    JobMeta, JobMetaJob, JobState, JobStateJob, JobStateResult, JobStatus, Response, RunData,
18    Snapshot,
19};
20
21/// Options for the `run` sub-command.
22#[derive(Debug)]
23pub struct RunOpts<'a> {
24    /// Command and arguments to execute.
25    pub command: Vec<String>,
26    /// Override for jobs root directory.
27    pub root: Option<&'a str>,
28    /// Milliseconds to wait before returning; 0 = return immediately.
29    pub snapshot_after: u64,
30    /// Number of tail lines to include in snapshot.
31    pub tail_lines: u64,
32    /// Max bytes for tail.
33    pub max_bytes: u64,
34    /// Timeout in milliseconds; 0 = no timeout.
35    pub timeout_ms: u64,
36    /// Milliseconds after SIGTERM before SIGKILL; 0 = immediate SIGKILL.
37    pub kill_after_ms: u64,
38    /// Working directory for the command.
39    pub cwd: Option<&'a str>,
40    /// Environment variables as KEY=VALUE strings.
41    pub env_vars: Vec<String>,
42    /// Paths to env files, applied in order.
43    pub env_files: Vec<String>,
44    /// Whether to inherit the current process environment (default: true).
45    pub inherit_env: bool,
46    /// Keys to mask in JSON output (values replaced with "***").
47    pub mask: Vec<String>,
48    /// Override full.log path; None = use job dir.
49    pub log: Option<&'a str>,
50    /// Interval (ms) for state.json updated_at refresh; 0 = disabled.
51    pub progress_every_ms: u64,
52    /// If true, wait for the job to reach a terminal state before returning.
53    /// The response will include exit_code, finished_at, and final_snapshot.
54    pub wait: bool,
55    /// Poll interval in milliseconds when `wait` is true.
56    pub wait_poll_ms: u64,
57    /// argv for command notification sink (shell-free); None = no command sink.
58    pub notify_command: Option<Vec<String>>,
59    /// File path for NDJSON notification sink; None = no file sink.
60    pub notify_file: Option<String>,
61}
62
63impl<'a> Default for RunOpts<'a> {
64    fn default() -> Self {
65        RunOpts {
66            command: vec![],
67            root: None,
68            snapshot_after: 10_000,
69            tail_lines: 50,
70            max_bytes: 65536,
71            timeout_ms: 0,
72            kill_after_ms: 0,
73            cwd: None,
74            env_vars: vec![],
75            env_files: vec![],
76            inherit_env: true,
77            mask: vec![],
78            log: None,
79            progress_every_ms: 0,
80            wait: false,
81            wait_poll_ms: 200,
82            notify_command: None,
83            notify_file: None,
84        }
85    }
86}
87
88/// Maximum allowed value for `snapshot_after` in milliseconds (10 seconds).
89const MAX_SNAPSHOT_AFTER_MS: u64 = 10_000;
90
91/// Execute `run`: spawn job, possibly wait for snapshot, return JSON.
92pub fn execute(opts: RunOpts) -> Result<()> {
93    if opts.command.is_empty() {
94        anyhow::bail!("no command specified for run");
95    }
96
97    let elapsed_start = std::time::Instant::now();
98
99    let root = resolve_root(opts.root);
100    std::fs::create_dir_all(&root)
101        .with_context(|| format!("create jobs root {}", root.display()))?;
102
103    let job_id = Ulid::new().to_string();
104    let created_at = now_rfc3339();
105
106    // Extract only the key names from KEY=VALUE env var strings (values are not persisted).
107    let env_keys: Vec<String> = opts
108        .env_vars
109        .iter()
110        .map(|kv| kv.split('=').next().unwrap_or(kv.as_str()).to_string())
111        .collect();
112
113    // Apply masking: replace values of masked keys with "***" in env_vars for metadata.
114    let masked_env_vars = mask_env_vars(&opts.env_vars, &opts.mask);
115
116    // Resolve the effective working directory for this job.
117    // If --cwd was specified, use that path; otherwise use the current process's working directory.
118    // Canonicalize the path for consistent comparison; fall back to absolute path on failure.
119    let effective_cwd = resolve_effective_cwd(opts.cwd);
120
121    let notification = if opts.notify_command.is_some() || opts.notify_file.is_some() {
122        Some(crate::schema::NotificationConfig {
123            notify_command: opts.notify_command.clone(),
124            notify_file: opts.notify_file.clone(),
125        })
126    } else {
127        None
128    };
129
130    let meta = JobMeta {
131        job: JobMetaJob { id: job_id.clone() },
132        schema_version: crate::schema::SCHEMA_VERSION.to_string(),
133        command: opts.command.clone(),
134        created_at: created_at.clone(),
135        root: root.display().to_string(),
136        env_keys,
137        env_vars: masked_env_vars.clone(),
138        mask: opts.mask.clone(),
139        cwd: Some(effective_cwd),
140        notification,
141    };
142
143    let job_dir = JobDir::create(&root, &job_id, &meta)?;
144    info!(job_id = %job_id, "created job directory");
145
146    // Determine the full.log path (may be overridden by --log).
147    let full_log_path = if let Some(log) = opts.log {
148        log.to_string()
149    } else {
150        job_dir.full_log_path().display().to_string()
151    };
152
153    // Pre-create empty log files so they exist before the supervisor starts.
154    // This guarantees that `stdout.log`, `stderr.log`, and `full.log` are
155    // present immediately after `run` returns, even if the supervisor has
156    // not yet begun writing.
157    for log_path in [
158        job_dir.stdout_path(),
159        job_dir.stderr_path(),
160        job_dir.full_log_path(),
161    ] {
162        std::fs::OpenOptions::new()
163            .create(true)
164            .append(true)
165            .open(&log_path)
166            .with_context(|| format!("pre-create log file {}", log_path.display()))?;
167    }
168
169    // Spawn the supervisor (same binary, internal `_supervise` sub-command).
170    let exe = std::env::current_exe().context("resolve current exe")?;
171    let mut supervisor_cmd = Command::new(&exe);
172    supervisor_cmd
173        .arg("_supervise")
174        .arg("--job-id")
175        .arg(&job_id)
176        .arg("--root")
177        .arg(root.display().to_string())
178        .arg("--full-log")
179        .arg(&full_log_path);
180
181    if opts.timeout_ms > 0 {
182        supervisor_cmd
183            .arg("--timeout")
184            .arg(opts.timeout_ms.to_string());
185    }
186    if opts.kill_after_ms > 0 {
187        supervisor_cmd
188            .arg("--kill-after")
189            .arg(opts.kill_after_ms.to_string());
190    }
191    if let Some(cwd) = opts.cwd {
192        supervisor_cmd.arg("--cwd").arg(cwd);
193    }
194    for env_file in &opts.env_files {
195        supervisor_cmd.arg("--env-file").arg(env_file);
196    }
197    for env_var in &opts.env_vars {
198        supervisor_cmd.arg("--env").arg(env_var);
199    }
200    if !opts.inherit_env {
201        supervisor_cmd.arg("--no-inherit-env");
202    }
203    // Note: masking is handled by `run` (meta.json + JSON response). The supervisor
204    // receives the real env var values so the child process can use them as intended.
205    if opts.progress_every_ms > 0 {
206        supervisor_cmd
207            .arg("--progress-every")
208            .arg(opts.progress_every_ms.to_string());
209    }
210    if let Some(ref nc) = opts.notify_command {
211        let json = serde_json::to_string(nc).unwrap_or_default();
212        supervisor_cmd.arg("--notify-command").arg(&json);
213    }
214    if let Some(ref nf) = opts.notify_file {
215        supervisor_cmd.arg("--notify-file").arg(nf);
216    }
217
218    supervisor_cmd
219        .arg("--")
220        .args(&opts.command)
221        .stdin(std::process::Stdio::null())
222        .stdout(std::process::Stdio::null())
223        .stderr(std::process::Stdio::null());
224
225    let supervisor = supervisor_cmd.spawn().context("spawn supervisor")?;
226
227    let supervisor_pid = supervisor.id();
228    debug!(supervisor_pid, "supervisor spawned");
229
230    // Write initial state with supervisor PID so `status` can track it.
231    // On Windows, this also pre-records the deterministic Job Object name
232    // (AgentExec-{job_id}) so that callers can find it immediately after run returns.
233    job_dir.init_state(supervisor_pid, &created_at)?;
234
235    // On Windows, poll state.json until the supervisor confirms Job Object
236    // assignment (state pid changes to child pid or state changes to "failed").
237    // This handshake ensures `run` can detect Job Object assignment failures
238    // before returning.  We wait up to 5 seconds (500 × 10ms intervals).
239    #[cfg(windows)]
240    {
241        let handshake_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
242        loop {
243            std::thread::sleep(std::time::Duration::from_millis(10));
244            if let Ok(current_state) = job_dir.read_state() {
245                // Supervisor has updated state once the pid changes from
246                // supervisor_pid to the child pid (success), or state becomes
247                // "failed" (Job Object assignment error).
248                let supervisor_updated = current_state
249                    .pid
250                    .map(|p| p != supervisor_pid)
251                    .unwrap_or(false)
252                    || *current_state.status() == JobStatus::Failed;
253                if supervisor_updated {
254                    if *current_state.status() == JobStatus::Failed {
255                        // Supervisor failed to assign the child to a Job Object
256                        // and has already killed the child and updated state.json.
257                        // Report failure to the caller.
258                        anyhow::bail!(
259                            "supervisor failed to assign child process to Job Object \
260                             (Windows MUST requirement); see stderr for details"
261                        );
262                    }
263                    debug!("supervisor confirmed Job Object assignment via state.json handshake");
264                    break;
265                }
266            }
267            if std::time::Instant::now() >= handshake_deadline {
268                // Supervisor did not confirm within 5 seconds. Proceed with
269                // the initial state (deterministic job name already written).
270                debug!("supervisor handshake timed out; proceeding with initial state");
271                break;
272            }
273        }
274    }
275
276    // Compute absolute paths for stdout.log and stderr.log.
277    let stdout_log_path = job_dir.stdout_path().display().to_string();
278    let stderr_log_path = job_dir.stderr_path().display().to_string();
279
280    // Clamp snapshot_after to MAX_SNAPSHOT_AFTER_MS, but only when --wait is NOT set.
281    // When --wait is set, we skip the snapshot_after phase entirely (the final_snapshot
282    // from the --wait phase replaces it), so the clamp is irrelevant.
283    let effective_snapshot_after = if opts.wait {
284        // Skip the snapshot_after wait when --wait is active; the terminal-state
285        // poll below will produce the definitive final_snapshot.
286        0
287    } else {
288        opts.snapshot_after.min(MAX_SNAPSHOT_AFTER_MS)
289    };
290
291    // Start a single wait_start timer that spans both the snapshot_after phase
292    // and the optional --wait phase so waited_ms reflects total wait time.
293    let wait_start = std::time::Instant::now();
294
295    // Optionally wait for snapshot and measure waited_ms.
296    // Uses a polling loop so we can exit early when output is available
297    // or the job has finished, rather than always sleeping the full duration.
298    let snapshot = if effective_snapshot_after > 0 {
299        debug!(ms = effective_snapshot_after, "polling for snapshot");
300        let deadline = wait_start + std::time::Duration::from_millis(effective_snapshot_after);
301        // Poll interval: 15ms gives good responsiveness without excessive CPU.
302        let poll_interval = std::time::Duration::from_millis(15);
303        loop {
304            std::thread::sleep(poll_interval);
305            // Early exit: job state changed from running (finished or failed).
306            // Output availability alone does NOT cause early exit; we always
307            // wait until the deadline when the job is still running.
308            if let Ok(st) = job_dir.read_state()
309                && *st.status() != JobStatus::Running
310            {
311                debug!("snapshot poll: job no longer running, exiting early");
312                break;
313            }
314            // Exit when deadline is reached.
315            if std::time::Instant::now() >= deadline {
316                debug!("snapshot poll: deadline reached");
317                break;
318            }
319        }
320        let snap = build_snapshot(&job_dir, opts.tail_lines, opts.max_bytes);
321        Some(snap)
322    } else {
323        None
324    };
325
326    // If --wait is set, wait for the job to reach a terminal state.
327    // Unlike snapshot_after, there is no upper bound on wait time.
328    // waited_ms will accumulate the full time spent (snapshot_after + wait phases).
329    let (final_state, exit_code_opt, finished_at_opt, final_snapshot_opt) = if opts.wait {
330        debug!("--wait: polling for terminal state");
331        let poll = std::time::Duration::from_millis(opts.wait_poll_ms.max(1));
332        loop {
333            std::thread::sleep(poll);
334            if let Ok(st) = job_dir.read_state()
335                && *st.status() != JobStatus::Running
336            {
337                let snap = build_snapshot(&job_dir, opts.tail_lines, opts.max_bytes);
338                let ec = st.exit_code();
339                let fa = st.finished_at.clone();
340                let state_str = st.status().as_str().to_string();
341                break (state_str, ec, fa, Some(snap));
342            }
343        }
344    } else {
345        (JobStatus::Running.as_str().to_string(), None, None, None)
346    };
347
348    // waited_ms reflects the total time spent waiting (snapshot_after + --wait phases).
349    let waited_ms = wait_start.elapsed().as_millis() as u64;
350
351    let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
352
353    let response = Response::new(
354        "run",
355        RunData {
356            job_id,
357            state: final_state,
358            // Include masked env_vars in the JSON response so callers can inspect
359            // which variables were set (with secret values replaced by "***").
360            env_vars: masked_env_vars,
361            snapshot,
362            stdout_log_path,
363            stderr_log_path,
364            waited_ms,
365            elapsed_ms,
366            exit_code: exit_code_opt,
367            finished_at: finished_at_opt,
368            final_snapshot: final_snapshot_opt,
369        },
370    );
371    response.print();
372    Ok(())
373}
374
375fn build_snapshot(job_dir: &JobDir, tail_lines: u64, max_bytes: u64) -> Snapshot {
376    let stdout = job_dir.read_tail_metrics("stdout.log", tail_lines, max_bytes);
377    let stderr = job_dir.read_tail_metrics("stderr.log", tail_lines, max_bytes);
378    Snapshot {
379        truncated: stdout.truncated || stderr.truncated,
380        encoding: "utf-8-lossy".to_string(),
381        stdout_observed_bytes: stdout.observed_bytes,
382        stderr_observed_bytes: stderr.observed_bytes,
383        stdout_included_bytes: stdout.included_bytes,
384        stderr_included_bytes: stderr.included_bytes,
385        stdout_tail: stdout.tail,
386        stderr_tail: stderr.tail,
387    }
388}
389
390/// Options for the `_supervise` internal sub-command.
391///
392/// Masking is the responsibility of `run` (which writes masked values to meta.json
393/// and includes them in the JSON response). The supervisor only needs the real
394/// environment variable values to launch the child process correctly.
395#[derive(Debug)]
396pub struct SuperviseOpts<'a> {
397    pub job_id: &'a str,
398    pub root: &'a Path,
399    pub command: &'a [String],
400    /// Override full.log path; None = use job dir default.
401    pub full_log: Option<&'a str>,
402    /// Timeout in milliseconds; 0 = no timeout.
403    pub timeout_ms: u64,
404    /// Milliseconds after SIGTERM before SIGKILL; 0 = immediate SIGKILL.
405    pub kill_after_ms: u64,
406    /// Working directory for the child process.
407    pub cwd: Option<&'a str>,
408    /// Environment variables as KEY=VALUE strings (real values, not masked).
409    pub env_vars: Vec<String>,
410    /// Paths to env files, applied in order.
411    pub env_files: Vec<String>,
412    /// Whether to inherit the current process environment.
413    pub inherit_env: bool,
414    /// Interval (ms) for state.json updated_at refresh; 0 = disabled.
415    pub progress_every_ms: u64,
416    /// argv for command notification sink; None = no command sink.
417    pub notify_command: Option<Vec<String>>,
418    /// File path for NDJSON notification sink; None = no file sink.
419    pub notify_file: Option<String>,
420}
421
422/// Resolve the effective working directory for a job.
423///
424/// If `cwd_override` is `Some`, use that path as the base. Otherwise use the
425/// current process working directory. In either case, attempt to canonicalize
426/// the path for consistent comparison; on failure, fall back to the absolute
427/// path representation (avoids symlink / permission issues on some systems).
428pub fn resolve_effective_cwd(cwd_override: Option<&str>) -> String {
429    let base = match cwd_override {
430        Some(p) => std::path::PathBuf::from(p),
431        None => std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")),
432    };
433
434    // Prefer canonicalized (resolves symlinks); fall back to making the path absolute.
435    match base.canonicalize() {
436        Ok(canonical) => canonical.display().to_string(),
437        Err(_) => {
438            // If base is already absolute, use as-is; otherwise prepend cwd.
439            if base.is_absolute() {
440                base.display().to_string()
441            } else {
442                // Best-effort: join with cwd, ignore errors.
443                let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
444                cwd.join(base).display().to_string()
445            }
446        }
447    }
448}
449
450/// Mask the values of specified keys in a list of KEY=VALUE strings.
451/// Keys listed in `mask_keys` will have their value replaced with "***".
452fn mask_env_vars(env_vars: &[String], mask_keys: &[String]) -> Vec<String> {
453    if mask_keys.is_empty() {
454        return env_vars.to_vec();
455    }
456    env_vars
457        .iter()
458        .map(|s| {
459            let (key, _val) = parse_env_var(s);
460            if mask_keys.iter().any(|k| k == &key) {
461                format!("{key}=***")
462            } else {
463                s.clone()
464            }
465        })
466        .collect()
467}
468
469/// Parse a single KEY=VALUE or KEY= string into (key, value).
470fn parse_env_var(s: &str) -> (String, String) {
471    if let Some(pos) = s.find('=') {
472        (s[..pos].to_string(), s[pos + 1..].to_string())
473    } else {
474        (s.to_string(), String::new())
475    }
476}
477
478/// Load environment variables from a .env-style file.
479/// Supports KEY=VALUE lines; lines starting with '#' and empty lines are ignored.
480fn load_env_file(path: &str) -> Result<Vec<(String, String)>> {
481    let contents =
482        std::fs::read_to_string(path).with_context(|| format!("read env-file {path}"))?;
483    let mut vars = Vec::new();
484    for line in contents.lines() {
485        let line = line.trim();
486        if line.is_empty() || line.starts_with('#') {
487            continue;
488        }
489        vars.push(parse_env_var(line));
490    }
491    Ok(vars)
492}
493
494/// Stream bytes from a child process output pipe to an individual log file and
495/// to the shared `full.log`.
496///
497/// Reads byte chunks (not lines) so that output without a trailing newline is
498/// still captured in the individual log immediately.  The `full.log` format
499/// `"<RFC3339> [LABEL] <line>"` is maintained via a line-accumulation buffer:
500/// bytes are appended to the buffer until a newline is found, at which point a
501/// formatted line is written to `full.log`.  Any remaining bytes at EOF are
502/// flushed as a final line.
503///
504/// This helper is used by both the stdout and stderr monitoring threads inside
505/// [`supervise`], replacing the previously duplicated per-stream implementations.
506/// Buffer size (8192 bytes) and newline-split logic are preserved unchanged.
507fn stream_to_logs<R>(
508    stream: R,
509    log_path: &std::path::Path,
510    full_log: std::sync::Arc<std::sync::Mutex<std::fs::File>>,
511    label: &str,
512) where
513    R: std::io::Read,
514{
515    use std::io::Write;
516    let mut log_file = std::fs::File::create(log_path).expect("create stream log file in thread");
517    let mut stream = stream;
518    let mut buf = [0u8; 8192];
519    // Incomplete-line buffer for full.log formatting.
520    let mut line_buf: Vec<u8> = Vec::new();
521    loop {
522        match stream.read(&mut buf) {
523            Ok(0) => break, // EOF
524            Ok(n) => {
525                let chunk = &buf[..n];
526                // Write raw bytes to the individual log (captures partial lines too).
527                let _ = log_file.write_all(chunk);
528                // Accumulate bytes for full.log line formatting.
529                for &b in chunk {
530                    if b == b'\n' {
531                        let line = String::from_utf8_lossy(&line_buf);
532                        if let Ok(mut fl) = full_log.lock() {
533                            let ts = now_rfc3339();
534                            let _ = writeln!(fl, "{ts} [{label}] {line}");
535                        }
536                        line_buf.clear();
537                    } else {
538                        line_buf.push(b);
539                    }
540                }
541            }
542            Err(_) => break,
543        }
544    }
545    // Flush any remaining incomplete line to full.log.
546    if !line_buf.is_empty() {
547        let line = String::from_utf8_lossy(&line_buf);
548        if let Ok(mut fl) = full_log.lock() {
549            let ts = now_rfc3339();
550            let _ = writeln!(fl, "{ts} [{label}] {line}");
551        }
552    }
553}
554
555/// Internal supervisor sub-command.
556///
557/// Runs the target command, streams stdout/stderr to individual log files
558/// (`stdout.log`, `stderr.log`) **and** to the combined `full.log`, then
559/// updates `state.json` when the process finishes.
560///
561/// On Windows, the child process is assigned to a named Job Object so that
562/// the entire process tree can be terminated with a single `kill` call.
563/// The Job Object name is recorded in `state.json` as `windows_job_name`.
564pub fn supervise(opts: SuperviseOpts) -> Result<()> {
565    use std::sync::{Arc, Mutex};
566
567    let job_id = opts.job_id;
568    let root = opts.root;
569    let command = opts.command;
570
571    if command.is_empty() {
572        anyhow::bail!("supervisor: no command");
573    }
574
575    let job_dir = JobDir::open(root, job_id)?;
576
577    // Read meta.json to get the started_at timestamp.
578    let meta = job_dir.read_meta()?;
579    let started_at = meta.created_at.clone();
580
581    // Determine full.log path.
582    let full_log_path = if let Some(p) = opts.full_log {
583        std::path::PathBuf::from(p)
584    } else {
585        job_dir.full_log_path()
586    };
587
588    // Create the full.log file (shared between stdout/stderr threads).
589    // Ensure parent directories exist for custom paths.
590    if let Some(parent) = full_log_path.parent() {
591        std::fs::create_dir_all(parent)
592            .with_context(|| format!("create dir for full.log: {}", parent.display()))?;
593    }
594    let full_log_file = std::fs::File::create(&full_log_path).context("create full.log")?;
595    let full_log = Arc::new(Mutex::new(full_log_file));
596
597    // Build the child environment.
598    let mut child_cmd = Command::new(&command[0]);
599    child_cmd.args(&command[1..]);
600
601    if opts.inherit_env {
602        // Start with the current environment (default).
603    } else {
604        child_cmd.env_clear();
605    }
606
607    // Apply env files in order.
608    for env_file in &opts.env_files {
609        let vars = load_env_file(env_file)?;
610        for (k, v) in vars {
611            child_cmd.env(&k, &v);
612        }
613    }
614
615    // Apply --env KEY=VALUE overrides (applied after env-files).
616    for env_var in &opts.env_vars {
617        let (k, v) = parse_env_var(env_var);
618        child_cmd.env(&k, &v);
619    }
620
621    // Set working directory if specified.
622    if let Some(cwd) = opts.cwd {
623        child_cmd.current_dir(cwd);
624    }
625
626    // Spawn the child with piped stdout/stderr so we can tee to logs.
627    let mut child = child_cmd
628        .stdin(std::process::Stdio::null())
629        .stdout(std::process::Stdio::piped())
630        .stderr(std::process::Stdio::piped())
631        .spawn()
632        .context("supervisor: spawn child")?;
633
634    let pid = child.id();
635    info!(job_id, pid, "child process started");
636
637    // On Windows, assign child to a named Job Object for process-tree management.
638    // The job name is derived from the job_id so that `kill` can look it up.
639    // Assignment is a MUST requirement on Windows: if it fails, the supervisor
640    // kills the child process and updates state.json to "failed" before returning
641    // an error, so that the run front-end (which may have already returned) can
642    // detect the failure via state.json on next poll.
643    #[cfg(windows)]
644    let windows_job_name = {
645        match assign_to_job_object(job_id, pid) {
646            Ok(name) => Some(name),
647            Err(e) => {
648                // Job Object assignment failed. Per design.md this is a MUST
649                // requirement on Windows. Kill the child process and update
650                // state.json to "failed" so the run front-end can detect it.
651                let kill_err = child.kill();
652                let _ = child.wait(); // reap to avoid zombies
653
654                let failed_state = JobState {
655                    job: JobStateJob {
656                        id: job_id.to_string(),
657                        status: JobStatus::Failed,
658                        started_at: started_at.clone(),
659                    },
660                    result: JobStateResult {
661                        exit_code: None,
662                        signal: None,
663                        duration_ms: None,
664                    },
665                    pid: Some(pid),
666                    finished_at: Some(now_rfc3339()),
667                    updated_at: now_rfc3339(),
668                    windows_job_name: None,
669                };
670                // Best-effort: if writing state fails, we still propagate the
671                // original assignment error.
672                let _ = job_dir.write_state(&failed_state);
673
674                // Dispatch completion event for the failed state if notifications are configured.
675                // This mirrors the dispatch logic in the normal exit path so that callers
676                // receive a job.finished event even when the supervisor fails early (Windows only).
677                if opts.notify_command.is_some() || opts.notify_file.is_some() {
678                    let finished_at_ts =
679                        failed_state.finished_at.clone().unwrap_or_else(now_rfc3339);
680                    let stdout_log = job_dir.stdout_path().display().to_string();
681                    let stderr_log = job_dir.stderr_path().display().to_string();
682                    let fail_event = crate::schema::CompletionEvent {
683                        schema_version: crate::schema::SCHEMA_VERSION.to_string(),
684                        event_type: "job.finished".to_string(),
685                        job_id: job_id.to_string(),
686                        state: JobStatus::Failed.as_str().to_string(),
687                        command: meta.command.clone(),
688                        cwd: meta.cwd.clone(),
689                        started_at: started_at.clone(),
690                        finished_at: finished_at_ts,
691                        duration_ms: None,
692                        exit_code: None,
693                        signal: None,
694                        stdout_log_path: stdout_log,
695                        stderr_log_path: stderr_log,
696                    };
697                    let fail_event_json = serde_json::to_string(&fail_event).unwrap_or_default();
698                    let fail_event_path = job_dir.completion_event_path().display().to_string();
699                    let mut fail_delivery_results: Vec<crate::schema::SinkDeliveryResult> =
700                        Vec::new();
701                    if let Err(we) = job_dir.write_completion_event_atomic(
702                        &crate::schema::CompletionEventRecord {
703                            event: fail_event.clone(),
704                            delivery_results: vec![],
705                        },
706                    ) {
707                        warn!(
708                            job_id,
709                            error = %we,
710                            "failed to write initial completion_event.json for failed job"
711                        );
712                    }
713                    if let Some(ref argv) = opts.notify_command {
714                        fail_delivery_results.push(dispatch_command_sink(
715                            argv,
716                            &fail_event_json,
717                            job_id,
718                            &fail_event_path,
719                        ));
720                    }
721                    if let Some(ref file_path) = opts.notify_file {
722                        fail_delivery_results.push(dispatch_file_sink(file_path, &fail_event_json));
723                    }
724                    if let Err(we) = job_dir.write_completion_event_atomic(
725                        &crate::schema::CompletionEventRecord {
726                            event: fail_event,
727                            delivery_results: fail_delivery_results,
728                        },
729                    ) {
730                        warn!(
731                            job_id,
732                            error = %we,
733                            "failed to update completion_event.json with delivery results for failed job"
734                        );
735                    }
736                }
737
738                if let Err(ke) = kill_err {
739                    return Err(anyhow::anyhow!(
740                        "supervisor: failed to assign pid {pid} to Job Object \
741                         (Windows MUST requirement): {e}; also failed to kill child: {ke}"
742                    ));
743                }
744                return Err(anyhow::anyhow!(
745                    "supervisor: failed to assign pid {pid} to Job Object \
746                     (Windows MUST requirement); child process was killed; \
747                     consider running outside a nested Job Object environment: {e}"
748                ));
749            }
750        }
751    };
752    #[cfg(not(windows))]
753    let windows_job_name: Option<String> = None;
754
755    // Update state.json with real child PID and Windows Job Object name.
756    // On Windows, windows_job_name is always Some at this point (guaranteed
757    // by the MUST requirement above), so state.json will always contain the
758    // Job Object identifier while the job is running.
759    let state = JobState {
760        job: JobStateJob {
761            id: job_id.to_string(),
762            status: JobStatus::Running,
763            started_at: started_at.clone(),
764        },
765        result: JobStateResult {
766            exit_code: None,
767            signal: None,
768            duration_ms: None,
769        },
770        pid: Some(pid),
771        finished_at: None,
772        updated_at: now_rfc3339(),
773        windows_job_name,
774    };
775    job_dir.write_state(&state)?;
776
777    let child_start_time = std::time::Instant::now();
778
779    // Take stdout/stderr handles before moving child.
780    let child_stdout = child.stdout.take().expect("child stdout piped");
781    let child_stderr = child.stderr.take().expect("child stderr piped");
782
783    // Thread: read stdout, write to stdout.log and full.log.
784    let stdout_log_path = job_dir.stdout_path();
785    let full_log_stdout = Arc::clone(&full_log);
786    let t_stdout = std::thread::spawn(move || {
787        stream_to_logs(child_stdout, &stdout_log_path, full_log_stdout, "STDOUT");
788    });
789
790    // Thread: read stderr, write to stderr.log and full.log.
791    let stderr_log_path = job_dir.stderr_path();
792    let full_log_stderr = Arc::clone(&full_log);
793    let t_stderr = std::thread::spawn(move || {
794        stream_to_logs(child_stderr, &stderr_log_path, full_log_stderr, "STDERR");
795    });
796
797    // Timeout / kill-after / progress-every handling.
798    // We spawn a watcher thread to handle timeout and periodic state.json updates.
799    let timeout_ms = opts.timeout_ms;
800    let kill_after_ms = opts.kill_after_ms;
801    let progress_every_ms = opts.progress_every_ms;
802    let state_path = job_dir.state_path();
803    let job_id_str = job_id.to_string();
804
805    // Use an atomic flag to signal the watcher thread when the child has exited.
806    use std::sync::atomic::{AtomicBool, Ordering};
807    let child_done = Arc::new(AtomicBool::new(false));
808
809    let watcher = if timeout_ms > 0 || progress_every_ms > 0 {
810        let state_path_clone = state_path.clone();
811        let child_done_clone = Arc::clone(&child_done);
812        Some(std::thread::spawn(move || {
813            let start = std::time::Instant::now();
814            let timeout_dur = if timeout_ms > 0 {
815                Some(std::time::Duration::from_millis(timeout_ms))
816            } else {
817                None
818            };
819            let progress_dur = if progress_every_ms > 0 {
820                Some(std::time::Duration::from_millis(progress_every_ms))
821            } else {
822                None
823            };
824
825            let poll_interval = std::time::Duration::from_millis(100);
826
827            loop {
828                std::thread::sleep(poll_interval);
829
830                // Exit the watcher loop if the child process has finished.
831                if child_done_clone.load(Ordering::Relaxed) {
832                    break;
833                }
834
835                let elapsed = start.elapsed();
836
837                // Check for timeout.
838                if let Some(td) = timeout_dur
839                    && elapsed >= td
840                {
841                    info!(job_id = %job_id_str, "timeout reached, sending SIGTERM");
842                    // Send SIGTERM.
843                    #[cfg(unix)]
844                    {
845                        unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM) };
846                    }
847                    // If kill_after > 0, wait kill_after ms then SIGKILL.
848                    if kill_after_ms > 0 {
849                        std::thread::sleep(std::time::Duration::from_millis(kill_after_ms));
850                        info!(job_id = %job_id_str, "kill-after elapsed, sending SIGKILL");
851                        #[cfg(unix)]
852                        {
853                            unsafe { libc::kill(pid as libc::pid_t, libc::SIGKILL) };
854                        }
855                    } else {
856                        // Immediate SIGKILL.
857                        #[cfg(unix)]
858                        {
859                            unsafe { libc::kill(pid as libc::pid_t, libc::SIGKILL) };
860                        }
861                    }
862                    break;
863                }
864
865                // Progress-every: update updated_at periodically.
866                if let Some(pd) = progress_dur {
867                    let elapsed_ms = elapsed.as_millis() as u64;
868                    let pd_ms = pd.as_millis() as u64;
869                    let poll_ms = poll_interval.as_millis() as u64;
870                    if elapsed_ms % pd_ms < poll_ms {
871                        // Read, update updated_at, write back.
872                        if let Ok(raw) = std::fs::read(&state_path_clone)
873                            && let Ok(mut st) =
874                                serde_json::from_slice::<crate::schema::JobState>(&raw)
875                        {
876                            st.updated_at = now_rfc3339();
877                            if let Ok(s) = serde_json::to_string_pretty(&st) {
878                                let _ = std::fs::write(&state_path_clone, s);
879                            }
880                        }
881                    }
882                }
883            }
884        }))
885    } else {
886        None
887    };
888
889    // Wait for child to finish.
890    let exit_status = child.wait().context("wait for child")?;
891
892    // Signal the watcher that the child has finished so it can exit its loop.
893    child_done.store(true, Ordering::Relaxed);
894
895    // Join logging threads.
896    let _ = t_stdout.join();
897    let _ = t_stderr.join();
898
899    // Join watcher if present.
900    if let Some(w) = watcher {
901        let _ = w.join();
902    }
903
904    let duration_ms = child_start_time.elapsed().as_millis() as u64;
905    let exit_code = exit_status.code();
906    let finished_at = now_rfc3339();
907
908    // Detect signal-killed processes on Unix for accurate state and completion event.
909    #[cfg(unix)]
910    let (terminal_status, signal_name) = {
911        use std::os::unix::process::ExitStatusExt;
912        if let Some(sig) = exit_status.signal() {
913            (JobStatus::Killed, Some(sig.to_string()))
914        } else {
915            (JobStatus::Exited, None)
916        }
917    };
918    #[cfg(not(unix))]
919    let (terminal_status, signal_name) = (JobStatus::Exited, None::<String>);
920
921    let state = JobState {
922        job: JobStateJob {
923            id: job_id.to_string(),
924            status: terminal_status.clone(),
925            started_at: started_at.clone(),
926        },
927        result: JobStateResult {
928            exit_code,
929            signal: signal_name.clone(),
930            duration_ms: Some(duration_ms),
931        },
932        pid: Some(pid),
933        finished_at: Some(finished_at.clone()),
934        updated_at: now_rfc3339(),
935        windows_job_name: None, // not needed after process exits
936    };
937    job_dir.write_state(&state)?;
938    info!(job_id, ?exit_code, "child process finished");
939
940    // Dispatch completion event to configured notification sinks.
941    // Failure here must not alter job state (delivery result is recorded separately).
942    let has_notification = opts.notify_command.is_some() || opts.notify_file.is_some();
943    if has_notification {
944        let stdout_log = job_dir.stdout_path().display().to_string();
945        let stderr_log = job_dir.stderr_path().display().to_string();
946        let event = crate::schema::CompletionEvent {
947            schema_version: crate::schema::SCHEMA_VERSION.to_string(),
948            event_type: "job.finished".to_string(),
949            job_id: job_id.to_string(),
950            state: terminal_status.as_str().to_string(),
951            command: meta.command.clone(),
952            cwd: meta.cwd.clone(),
953            started_at,
954            finished_at,
955            duration_ms: Some(duration_ms),
956            exit_code,
957            signal: signal_name,
958            stdout_log_path: stdout_log,
959            stderr_log_path: stderr_log,
960        };
961
962        let event_json = serde_json::to_string(&event).unwrap_or_default();
963        let event_path = job_dir.completion_event_path().display().to_string();
964        let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
965
966        // Write initial completion_event.json before dispatching sinks.
967        if let Err(e) =
968            job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
969                event: event.clone(),
970                delivery_results: vec![],
971            })
972        {
973            warn!(job_id, error = %e, "failed to write initial completion_event.json");
974        }
975
976        if let Some(ref argv) = opts.notify_command {
977            delivery_results.push(dispatch_command_sink(
978                argv,
979                &event_json,
980                job_id,
981                &event_path,
982            ));
983        }
984        if let Some(ref file_path) = opts.notify_file {
985            delivery_results.push(dispatch_file_sink(file_path, &event_json));
986        }
987
988        // Update completion_event.json with delivery results.
989        if let Err(e) =
990            job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
991                event,
992                delivery_results,
993            })
994        {
995            warn!(job_id, error = %e, "failed to update completion_event.json with delivery results");
996        }
997    }
998
999    Ok(())
1000}
1001
1002/// Dispatch the command sink: spawn argv[0] with argv[1..], pass event JSON via stdin,
1003/// and set AGENT_EXEC_EVENT_PATH / AGENT_EXEC_JOB_ID / AGENT_EXEC_EVENT_TYPE env vars.
1004/// Shell expansion is never used; argv must be a pre-split array.
1005fn dispatch_command_sink(
1006    argv: &[String],
1007    event_json: &str,
1008    job_id: &str,
1009    event_path: &str,
1010) -> crate::schema::SinkDeliveryResult {
1011    use std::io::Write;
1012    let attempted_at = now_rfc3339();
1013    let target = serde_json::to_string(argv).unwrap_or_default();
1014
1015    if argv.is_empty() {
1016        return crate::schema::SinkDeliveryResult {
1017            sink_type: "command".to_string(),
1018            target,
1019            success: false,
1020            error: Some("empty argv".to_string()),
1021            attempted_at,
1022        };
1023    }
1024
1025    let mut cmd = Command::new(&argv[0]);
1026    cmd.args(&argv[1..]);
1027    cmd.env("AGENT_EXEC_EVENT_PATH", event_path);
1028    cmd.env("AGENT_EXEC_JOB_ID", job_id);
1029    cmd.env("AGENT_EXEC_EVENT_TYPE", "job.finished");
1030    cmd.stdin(std::process::Stdio::piped());
1031    cmd.stdout(std::process::Stdio::null());
1032    cmd.stderr(std::process::Stdio::null());
1033
1034    match cmd.spawn() {
1035        Ok(mut child) => {
1036            if let Some(mut stdin) = child.stdin.take() {
1037                let _ = stdin.write_all(event_json.as_bytes());
1038            }
1039            match child.wait() {
1040                Ok(status) if status.success() => crate::schema::SinkDeliveryResult {
1041                    sink_type: "command".to_string(),
1042                    target,
1043                    success: true,
1044                    error: None,
1045                    attempted_at,
1046                },
1047                Ok(status) => crate::schema::SinkDeliveryResult {
1048                    sink_type: "command".to_string(),
1049                    target,
1050                    success: false,
1051                    error: Some(format!("exited with status {status}")),
1052                    attempted_at,
1053                },
1054                Err(e) => crate::schema::SinkDeliveryResult {
1055                    sink_type: "command".to_string(),
1056                    target,
1057                    success: false,
1058                    error: Some(format!("wait error: {e}")),
1059                    attempted_at,
1060                },
1061            }
1062        }
1063        Err(e) => crate::schema::SinkDeliveryResult {
1064            sink_type: "command".to_string(),
1065            target,
1066            success: false,
1067            error: Some(format!("spawn error: {e}")),
1068            attempted_at,
1069        },
1070    }
1071}
1072
1073/// Dispatch the file sink: append event JSON as a single NDJSON line.
1074/// Creates parent directories automatically.
1075fn dispatch_file_sink(file_path: &str, event_json: &str) -> crate::schema::SinkDeliveryResult {
1076    use std::io::Write;
1077    let attempted_at = now_rfc3339();
1078    let path = std::path::Path::new(file_path);
1079
1080    if let Some(parent) = path.parent()
1081        && let Err(e) = std::fs::create_dir_all(parent)
1082    {
1083        return crate::schema::SinkDeliveryResult {
1084            sink_type: "file".to_string(),
1085            target: file_path.to_string(),
1086            success: false,
1087            error: Some(format!("create parent dir: {e}")),
1088            attempted_at,
1089        };
1090    }
1091
1092    match std::fs::OpenOptions::new()
1093        .create(true)
1094        .append(true)
1095        .open(path)
1096    {
1097        Ok(mut f) => match writeln!(f, "{event_json}") {
1098            Ok(_) => crate::schema::SinkDeliveryResult {
1099                sink_type: "file".to_string(),
1100                target: file_path.to_string(),
1101                success: true,
1102                error: None,
1103                attempted_at,
1104            },
1105            Err(e) => crate::schema::SinkDeliveryResult {
1106                sink_type: "file".to_string(),
1107                target: file_path.to_string(),
1108                success: false,
1109                error: Some(format!("write error: {e}")),
1110                attempted_at,
1111            },
1112        },
1113        Err(e) => crate::schema::SinkDeliveryResult {
1114            sink_type: "file".to_string(),
1115            target: file_path.to_string(),
1116            success: false,
1117            error: Some(format!("open error: {e}")),
1118            attempted_at,
1119        },
1120    }
1121}
1122
1123/// Public alias so other modules can call the timestamp helper.
1124pub fn now_rfc3339_pub() -> String {
1125    now_rfc3339()
1126}
1127
1128fn now_rfc3339() -> String {
1129    // Use a simple approach that works without chrono.
1130    let d = std::time::SystemTime::now()
1131        .duration_since(std::time::UNIX_EPOCH)
1132        .unwrap_or_default();
1133    format_rfc3339(d.as_secs())
1134}
1135
1136fn format_rfc3339(secs: u64) -> String {
1137    // Manual conversion of Unix timestamp to UTC date-time string.
1138    let mut s = secs;
1139    let seconds = s % 60;
1140    s /= 60;
1141    let minutes = s % 60;
1142    s /= 60;
1143    let hours = s % 24;
1144    s /= 24;
1145
1146    // Days since 1970-01-01
1147    let mut days = s;
1148    let mut year = 1970u64;
1149    loop {
1150        let days_in_year = if is_leap(year) { 366 } else { 365 };
1151        if days < days_in_year {
1152            break;
1153        }
1154        days -= days_in_year;
1155        year += 1;
1156    }
1157
1158    let leap = is_leap(year);
1159    let month_days: [u64; 12] = [
1160        31,
1161        if leap { 29 } else { 28 },
1162        31,
1163        30,
1164        31,
1165        30,
1166        31,
1167        31,
1168        30,
1169        31,
1170        30,
1171        31,
1172    ];
1173    let mut month = 0usize;
1174    for (i, &d) in month_days.iter().enumerate() {
1175        if days < d {
1176            month = i;
1177            break;
1178        }
1179        days -= d;
1180    }
1181    let day = days + 1;
1182
1183    format!(
1184        "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
1185        year,
1186        month + 1,
1187        day,
1188        hours,
1189        minutes,
1190        seconds
1191    )
1192}
1193
1194fn is_leap(year: u64) -> bool {
1195    (year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
1196}
1197
1198/// Windows-only: create a named Job Object and assign the given child process
1199/// to it so that the entire process tree can be terminated via `kill`.
1200///
1201/// The Job Object is named `"AgentExec-{job_id}"`. This name is stored in
1202/// `state.json` so that future `kill` invocations can open the same Job Object
1203/// by name and call `TerminateJobObject` to stop the whole tree.
1204///
1205/// Returns `Ok(name)` on success.  Returns `Err` on failure — the caller
1206/// (`supervise`) treats failure as a fatal error because reliable process-tree
1207/// management is a Windows MUST requirement (design.md).
1208#[cfg(windows)]
1209fn assign_to_job_object(job_id: &str, pid: u32) -> Result<String> {
1210    use windows::Win32::Foundation::CloseHandle;
1211    use windows::Win32::System::JobObjects::{AssignProcessToJobObject, CreateJobObjectW};
1212    use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
1213    use windows::core::HSTRING;
1214
1215    let job_name = format!("AgentExec-{job_id}");
1216    let hname = HSTRING::from(job_name.as_str());
1217
1218    unsafe {
1219        // Open the child process handle (needed for AssignProcessToJobObject).
1220        let proc_handle =
1221            OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid).map_err(|e| {
1222                anyhow::anyhow!(
1223                    "supervisor: OpenProcess(pid={pid}) failed — cannot assign to Job Object: {e}"
1224                )
1225            })?;
1226
1227        // Create a named Job Object.
1228        let job = match CreateJobObjectW(None, &hname) {
1229            Ok(h) => h,
1230            Err(e) => {
1231                let _ = CloseHandle(proc_handle);
1232                return Err(anyhow::anyhow!(
1233                    "supervisor: CreateJobObjectW({job_name}) failed: {e}"
1234                ));
1235            }
1236        };
1237
1238        // Assign the child process to the Job Object.
1239        // This can fail if the process is already in another job (e.g. CI/nested).
1240        // Per design.md, assignment is a MUST on Windows — failure is a fatal error.
1241        if let Err(e) = AssignProcessToJobObject(job, proc_handle) {
1242            let _ = CloseHandle(job);
1243            let _ = CloseHandle(proc_handle);
1244            return Err(anyhow::anyhow!(
1245                "supervisor: AssignProcessToJobObject(pid={pid}) failed \
1246                 (process may already belong to another Job Object, e.g. in a CI environment): {e}"
1247            ));
1248        }
1249
1250        // Keep job handle open for the lifetime of the supervisor so the Job
1251        // Object remains valid. We intentionally do NOT close it here.
1252        // The OS will close it automatically when the supervisor exits.
1253        // (We close proc_handle since we only needed it for assignment.)
1254        let _ = CloseHandle(proc_handle);
1255        // Note: job handle is intentionally leaked here to keep the Job Object alive.
1256        // The handle will be closed when the supervisor process exits.
1257        std::mem::forget(job);
1258    }
1259
1260    info!(job_id, name = %job_name, "supervisor: child assigned to Job Object");
1261    Ok(job_name)
1262}
1263
1264#[cfg(test)]
1265mod tests {
1266    use super::*;
1267
1268    #[test]
1269    fn rfc3339_epoch() {
1270        assert_eq!(format_rfc3339(0), "1970-01-01T00:00:00Z");
1271    }
1272
1273    #[test]
1274    fn rfc3339_known_date() {
1275        // 2024-01-01T00:00:00Z = 1704067200
1276        assert_eq!(format_rfc3339(1704067200), "2024-01-01T00:00:00Z");
1277    }
1278}