Skip to main content

agent_exec/
run.rs

1//! Implementation of the `run` sub-command.
2//!
3//! Design decisions (from design.md):
4//! - `run` spawns a short-lived front-end that forks a `_supervise` child.
5//! - The supervisor continues logging stdout/stderr after `run` returns.
6//! - If `--snapshot-after` elapses before `run` returns, a snapshot is
7//!   included in the JSON response.
8
9use anyhow::{Context, Result};
10use std::path::Path;
11use std::process::Command;
12use tracing::{debug, info};
13use ulid::Ulid;
14
15use crate::jobstore::{JobDir, resolve_root};
16use crate::schema::{
17    JobMeta, JobMetaJob, JobState, JobStateJob, JobStateResult, JobStatus, Response, RunData,
18    Snapshot,
19};
20
21/// Options for the `run` sub-command.
22#[derive(Debug)]
23pub struct RunOpts<'a> {
24    /// Command and arguments to execute.
25    pub command: Vec<String>,
26    /// Override for jobs root directory.
27    pub root: Option<&'a str>,
28    /// Milliseconds to wait before returning; 0 = return immediately.
29    pub snapshot_after: u64,
30    /// Number of tail lines to include in snapshot.
31    pub tail_lines: u64,
32    /// Max bytes for tail.
33    pub max_bytes: u64,
34    /// Timeout in milliseconds; 0 = no timeout.
35    pub timeout_ms: u64,
36    /// Milliseconds after SIGTERM before SIGKILL; 0 = immediate SIGKILL.
37    pub kill_after_ms: u64,
38    /// Working directory for the command.
39    pub cwd: Option<&'a str>,
40    /// Environment variables as KEY=VALUE strings.
41    pub env_vars: Vec<String>,
42    /// Paths to env files, applied in order.
43    pub env_files: Vec<String>,
44    /// Whether to inherit the current process environment (default: true).
45    pub inherit_env: bool,
46    /// Keys to mask in JSON output (values replaced with "***").
47    pub mask: Vec<String>,
48    /// Override full.log path; None = use job dir.
49    pub log: Option<&'a str>,
50    /// Interval (ms) for state.json updated_at refresh; 0 = disabled.
51    pub progress_every_ms: u64,
52    /// If true, wait for the job to reach a terminal state before returning.
53    /// The response will include exit_code, finished_at, and final_snapshot.
54    pub wait: bool,
55    /// Poll interval in milliseconds when `wait` is true.
56    pub wait_poll_ms: u64,
57}
58
59impl<'a> Default for RunOpts<'a> {
60    fn default() -> Self {
61        RunOpts {
62            command: vec![],
63            root: None,
64            snapshot_after: 10_000,
65            tail_lines: 50,
66            max_bytes: 65536,
67            timeout_ms: 0,
68            kill_after_ms: 0,
69            cwd: None,
70            env_vars: vec![],
71            env_files: vec![],
72            inherit_env: true,
73            mask: vec![],
74            log: None,
75            progress_every_ms: 0,
76            wait: false,
77            wait_poll_ms: 200,
78        }
79    }
80}
81
82/// Maximum allowed value for `snapshot_after` in milliseconds (10 seconds).
83const MAX_SNAPSHOT_AFTER_MS: u64 = 10_000;
84
85/// Execute `run`: spawn job, possibly wait for snapshot, return JSON.
86pub fn execute(opts: RunOpts) -> Result<()> {
87    if opts.command.is_empty() {
88        anyhow::bail!("no command specified for run");
89    }
90
91    let elapsed_start = std::time::Instant::now();
92
93    let root = resolve_root(opts.root);
94    std::fs::create_dir_all(&root)
95        .with_context(|| format!("create jobs root {}", root.display()))?;
96
97    let job_id = Ulid::new().to_string();
98    let created_at = now_rfc3339();
99
100    // Extract only the key names from KEY=VALUE env var strings (values are not persisted).
101    let env_keys: Vec<String> = opts
102        .env_vars
103        .iter()
104        .map(|kv| kv.split('=').next().unwrap_or(kv.as_str()).to_string())
105        .collect();
106
107    // Apply masking: replace values of masked keys with "***" in env_vars for metadata.
108    let masked_env_vars = mask_env_vars(&opts.env_vars, &opts.mask);
109
110    // Resolve the effective working directory for this job.
111    // If --cwd was specified, use that path; otherwise use the current process's working directory.
112    // Canonicalize the path for consistent comparison; fall back to absolute path on failure.
113    let effective_cwd = resolve_effective_cwd(opts.cwd);
114
115    let meta = JobMeta {
116        job: JobMetaJob { id: job_id.clone() },
117        schema_version: crate::schema::SCHEMA_VERSION.to_string(),
118        command: opts.command.clone(),
119        created_at: created_at.clone(),
120        root: root.display().to_string(),
121        env_keys,
122        env_vars: masked_env_vars.clone(),
123        mask: opts.mask.clone(),
124        cwd: Some(effective_cwd),
125    };
126
127    let job_dir = JobDir::create(&root, &job_id, &meta)?;
128    info!(job_id = %job_id, "created job directory");
129
130    // Determine the full.log path (may be overridden by --log).
131    let full_log_path = if let Some(log) = opts.log {
132        log.to_string()
133    } else {
134        job_dir.full_log_path().display().to_string()
135    };
136
137    // Pre-create empty log files so they exist before the supervisor starts.
138    // This guarantees that `stdout.log`, `stderr.log`, and `full.log` are
139    // present immediately after `run` returns, even if the supervisor has
140    // not yet begun writing.
141    for log_path in [
142        job_dir.stdout_path(),
143        job_dir.stderr_path(),
144        job_dir.full_log_path(),
145    ] {
146        std::fs::OpenOptions::new()
147            .create(true)
148            .append(true)
149            .open(&log_path)
150            .with_context(|| format!("pre-create log file {}", log_path.display()))?;
151    }
152
153    // Spawn the supervisor (same binary, internal `_supervise` sub-command).
154    let exe = std::env::current_exe().context("resolve current exe")?;
155    let mut supervisor_cmd = Command::new(&exe);
156    supervisor_cmd
157        .arg("_supervise")
158        .arg("--job-id")
159        .arg(&job_id)
160        .arg("--root")
161        .arg(root.display().to_string())
162        .arg("--full-log")
163        .arg(&full_log_path);
164
165    if opts.timeout_ms > 0 {
166        supervisor_cmd
167            .arg("--timeout")
168            .arg(opts.timeout_ms.to_string());
169    }
170    if opts.kill_after_ms > 0 {
171        supervisor_cmd
172            .arg("--kill-after")
173            .arg(opts.kill_after_ms.to_string());
174    }
175    if let Some(cwd) = opts.cwd {
176        supervisor_cmd.arg("--cwd").arg(cwd);
177    }
178    for env_file in &opts.env_files {
179        supervisor_cmd.arg("--env-file").arg(env_file);
180    }
181    for env_var in &opts.env_vars {
182        supervisor_cmd.arg("--env").arg(env_var);
183    }
184    if !opts.inherit_env {
185        supervisor_cmd.arg("--no-inherit-env");
186    }
187    // Note: masking is handled by `run` (meta.json + JSON response). The supervisor
188    // receives the real env var values so the child process can use them as intended.
189    if opts.progress_every_ms > 0 {
190        supervisor_cmd
191            .arg("--progress-every")
192            .arg(opts.progress_every_ms.to_string());
193    }
194
195    supervisor_cmd
196        .arg("--")
197        .args(&opts.command)
198        .stdin(std::process::Stdio::null())
199        .stdout(std::process::Stdio::null())
200        .stderr(std::process::Stdio::null());
201
202    let supervisor = supervisor_cmd.spawn().context("spawn supervisor")?;
203
204    let supervisor_pid = supervisor.id();
205    debug!(supervisor_pid, "supervisor spawned");
206
207    // Write initial state with supervisor PID so `status` can track it.
208    // On Windows, this also pre-records the deterministic Job Object name
209    // (AgentExec-{job_id}) so that callers can find it immediately after run returns.
210    job_dir.init_state(supervisor_pid, &created_at)?;
211
212    // On Windows, poll state.json until the supervisor confirms Job Object
213    // assignment (state pid changes to child pid or state changes to "failed").
214    // This handshake ensures `run` can detect Job Object assignment failures
215    // before returning.  We wait up to 5 seconds (500 × 10ms intervals).
216    #[cfg(windows)]
217    {
218        let handshake_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
219        loop {
220            std::thread::sleep(std::time::Duration::from_millis(10));
221            if let Ok(current_state) = job_dir.read_state() {
222                // Supervisor has updated state once the pid changes from
223                // supervisor_pid to the child pid (success), or state becomes
224                // "failed" (Job Object assignment error).
225                let supervisor_updated = current_state
226                    .pid
227                    .map(|p| p != supervisor_pid)
228                    .unwrap_or(false)
229                    || *current_state.status() == JobStatus::Failed;
230                if supervisor_updated {
231                    if *current_state.status() == JobStatus::Failed {
232                        // Supervisor failed to assign the child to a Job Object
233                        // and has already killed the child and updated state.json.
234                        // Report failure to the caller.
235                        anyhow::bail!(
236                            "supervisor failed to assign child process to Job Object \
237                             (Windows MUST requirement); see stderr for details"
238                        );
239                    }
240                    debug!("supervisor confirmed Job Object assignment via state.json handshake");
241                    break;
242                }
243            }
244            if std::time::Instant::now() >= handshake_deadline {
245                // Supervisor did not confirm within 5 seconds. Proceed with
246                // the initial state (deterministic job name already written).
247                debug!("supervisor handshake timed out; proceeding with initial state");
248                break;
249            }
250        }
251    }
252
253    // Compute absolute paths for stdout.log and stderr.log.
254    let stdout_log_path = job_dir.stdout_path().display().to_string();
255    let stderr_log_path = job_dir.stderr_path().display().to_string();
256
257    // Clamp snapshot_after to MAX_SNAPSHOT_AFTER_MS, but only when --wait is NOT set.
258    // When --wait is set, we skip the snapshot_after phase entirely (the final_snapshot
259    // from the --wait phase replaces it), so the clamp is irrelevant.
260    let effective_snapshot_after = if opts.wait {
261        // Skip the snapshot_after wait when --wait is active; the terminal-state
262        // poll below will produce the definitive final_snapshot.
263        0
264    } else {
265        opts.snapshot_after.min(MAX_SNAPSHOT_AFTER_MS)
266    };
267
268    // Start a single wait_start timer that spans both the snapshot_after phase
269    // and the optional --wait phase so waited_ms reflects total wait time.
270    let wait_start = std::time::Instant::now();
271
272    // Optionally wait for snapshot and measure waited_ms.
273    // Uses a polling loop so we can exit early when output is available
274    // or the job has finished, rather than always sleeping the full duration.
275    let snapshot = if effective_snapshot_after > 0 {
276        debug!(ms = effective_snapshot_after, "polling for snapshot");
277        let deadline = wait_start + std::time::Duration::from_millis(effective_snapshot_after);
278        // Poll interval: 15ms gives good responsiveness without excessive CPU.
279        let poll_interval = std::time::Duration::from_millis(15);
280        loop {
281            std::thread::sleep(poll_interval);
282            // Early exit: job state changed from running (finished or failed).
283            // Output availability alone does NOT cause early exit; we always
284            // wait until the deadline when the job is still running.
285            if let Ok(st) = job_dir.read_state()
286                && *st.status() != JobStatus::Running
287            {
288                debug!("snapshot poll: job no longer running, exiting early");
289                break;
290            }
291            // Exit when deadline is reached.
292            if std::time::Instant::now() >= deadline {
293                debug!("snapshot poll: deadline reached");
294                break;
295            }
296        }
297        let snap = build_snapshot(&job_dir, opts.tail_lines, opts.max_bytes);
298        Some(snap)
299    } else {
300        None
301    };
302
303    // If --wait is set, wait for the job to reach a terminal state.
304    // Unlike snapshot_after, there is no upper bound on wait time.
305    // waited_ms will accumulate the full time spent (snapshot_after + wait phases).
306    let (final_state, exit_code_opt, finished_at_opt, final_snapshot_opt) = if opts.wait {
307        debug!("--wait: polling for terminal state");
308        let poll = std::time::Duration::from_millis(opts.wait_poll_ms.max(1));
309        loop {
310            std::thread::sleep(poll);
311            if let Ok(st) = job_dir.read_state()
312                && *st.status() != JobStatus::Running
313            {
314                let snap = build_snapshot(&job_dir, opts.tail_lines, opts.max_bytes);
315                let ec = st.exit_code();
316                let fa = st.finished_at.clone();
317                let state_str = st.status().as_str().to_string();
318                break (state_str, ec, fa, Some(snap));
319            }
320        }
321    } else {
322        (JobStatus::Running.as_str().to_string(), None, None, None)
323    };
324
325    // waited_ms reflects the total time spent waiting (snapshot_after + --wait phases).
326    let waited_ms = wait_start.elapsed().as_millis() as u64;
327
328    let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
329
330    let response = Response::new(
331        "run",
332        RunData {
333            job_id,
334            state: final_state,
335            // Include masked env_vars in the JSON response so callers can inspect
336            // which variables were set (with secret values replaced by "***").
337            env_vars: masked_env_vars,
338            snapshot,
339            stdout_log_path,
340            stderr_log_path,
341            waited_ms,
342            elapsed_ms,
343            exit_code: exit_code_opt,
344            finished_at: finished_at_opt,
345            final_snapshot: final_snapshot_opt,
346        },
347    );
348    response.print();
349    Ok(())
350}
351
352fn build_snapshot(job_dir: &JobDir, tail_lines: u64, max_bytes: u64) -> Snapshot {
353    let stdout = job_dir.read_tail_metrics("stdout.log", tail_lines, max_bytes);
354    let stderr = job_dir.read_tail_metrics("stderr.log", tail_lines, max_bytes);
355    Snapshot {
356        truncated: stdout.truncated || stderr.truncated,
357        encoding: "utf-8-lossy".to_string(),
358        stdout_observed_bytes: stdout.observed_bytes,
359        stderr_observed_bytes: stderr.observed_bytes,
360        stdout_included_bytes: stdout.included_bytes,
361        stderr_included_bytes: stderr.included_bytes,
362        stdout_tail: stdout.tail,
363        stderr_tail: stderr.tail,
364    }
365}
366
367/// Options for the `_supervise` internal sub-command.
368///
369/// Masking is the responsibility of `run` (which writes masked values to meta.json
370/// and includes them in the JSON response). The supervisor only needs the real
371/// environment variable values to launch the child process correctly.
372#[derive(Debug)]
373pub struct SuperviseOpts<'a> {
374    pub job_id: &'a str,
375    pub root: &'a Path,
376    pub command: &'a [String],
377    /// Override full.log path; None = use job dir default.
378    pub full_log: Option<&'a str>,
379    /// Timeout in milliseconds; 0 = no timeout.
380    pub timeout_ms: u64,
381    /// Milliseconds after SIGTERM before SIGKILL; 0 = immediate SIGKILL.
382    pub kill_after_ms: u64,
383    /// Working directory for the child process.
384    pub cwd: Option<&'a str>,
385    /// Environment variables as KEY=VALUE strings (real values, not masked).
386    pub env_vars: Vec<String>,
387    /// Paths to env files, applied in order.
388    pub env_files: Vec<String>,
389    /// Whether to inherit the current process environment.
390    pub inherit_env: bool,
391    /// Interval (ms) for state.json updated_at refresh; 0 = disabled.
392    pub progress_every_ms: u64,
393}
394
395/// Resolve the effective working directory for a job.
396///
397/// If `cwd_override` is `Some`, use that path as the base. Otherwise use the
398/// current process working directory. In either case, attempt to canonicalize
399/// the path for consistent comparison; on failure, fall back to the absolute
400/// path representation (avoids symlink / permission issues on some systems).
401pub fn resolve_effective_cwd(cwd_override: Option<&str>) -> String {
402    let base = match cwd_override {
403        Some(p) => std::path::PathBuf::from(p),
404        None => std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")),
405    };
406
407    // Prefer canonicalized (resolves symlinks); fall back to making the path absolute.
408    match base.canonicalize() {
409        Ok(canonical) => canonical.display().to_string(),
410        Err(_) => {
411            // If base is already absolute, use as-is; otherwise prepend cwd.
412            if base.is_absolute() {
413                base.display().to_string()
414            } else {
415                // Best-effort: join with cwd, ignore errors.
416                let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
417                cwd.join(base).display().to_string()
418            }
419        }
420    }
421}
422
423/// Mask the values of specified keys in a list of KEY=VALUE strings.
424/// Keys listed in `mask_keys` will have their value replaced with "***".
425fn mask_env_vars(env_vars: &[String], mask_keys: &[String]) -> Vec<String> {
426    if mask_keys.is_empty() {
427        return env_vars.to_vec();
428    }
429    env_vars
430        .iter()
431        .map(|s| {
432            let (key, _val) = parse_env_var(s);
433            if mask_keys.iter().any(|k| k == &key) {
434                format!("{key}=***")
435            } else {
436                s.clone()
437            }
438        })
439        .collect()
440}
441
442/// Parse a single KEY=VALUE or KEY= string into (key, value).
443fn parse_env_var(s: &str) -> (String, String) {
444    if let Some(pos) = s.find('=') {
445        (s[..pos].to_string(), s[pos + 1..].to_string())
446    } else {
447        (s.to_string(), String::new())
448    }
449}
450
451/// Load environment variables from a .env-style file.
452/// Supports KEY=VALUE lines; lines starting with '#' and empty lines are ignored.
453fn load_env_file(path: &str) -> Result<Vec<(String, String)>> {
454    let contents =
455        std::fs::read_to_string(path).with_context(|| format!("read env-file {path}"))?;
456    let mut vars = Vec::new();
457    for line in contents.lines() {
458        let line = line.trim();
459        if line.is_empty() || line.starts_with('#') {
460            continue;
461        }
462        vars.push(parse_env_var(line));
463    }
464    Ok(vars)
465}
466
467/// Stream bytes from a child process output pipe to an individual log file and
468/// to the shared `full.log`.
469///
470/// Reads byte chunks (not lines) so that output without a trailing newline is
471/// still captured in the individual log immediately.  The `full.log` format
472/// `"<RFC3339> [LABEL] <line>"` is maintained via a line-accumulation buffer:
473/// bytes are appended to the buffer until a newline is found, at which point a
474/// formatted line is written to `full.log`.  Any remaining bytes at EOF are
475/// flushed as a final line.
476///
477/// This helper is used by both the stdout and stderr monitoring threads inside
478/// [`supervise`], replacing the previously duplicated per-stream implementations.
479/// Buffer size (8192 bytes) and newline-split logic are preserved unchanged.
480fn stream_to_logs<R>(
481    stream: R,
482    log_path: &std::path::Path,
483    full_log: std::sync::Arc<std::sync::Mutex<std::fs::File>>,
484    label: &str,
485) where
486    R: std::io::Read,
487{
488    use std::io::Write;
489    let mut log_file = std::fs::File::create(log_path).expect("create stream log file in thread");
490    let mut stream = stream;
491    let mut buf = [0u8; 8192];
492    // Incomplete-line buffer for full.log formatting.
493    let mut line_buf: Vec<u8> = Vec::new();
494    loop {
495        match stream.read(&mut buf) {
496            Ok(0) => break, // EOF
497            Ok(n) => {
498                let chunk = &buf[..n];
499                // Write raw bytes to the individual log (captures partial lines too).
500                let _ = log_file.write_all(chunk);
501                // Accumulate bytes for full.log line formatting.
502                for &b in chunk {
503                    if b == b'\n' {
504                        let line = String::from_utf8_lossy(&line_buf);
505                        if let Ok(mut fl) = full_log.lock() {
506                            let ts = now_rfc3339();
507                            let _ = writeln!(fl, "{ts} [{label}] {line}");
508                        }
509                        line_buf.clear();
510                    } else {
511                        line_buf.push(b);
512                    }
513                }
514            }
515            Err(_) => break,
516        }
517    }
518    // Flush any remaining incomplete line to full.log.
519    if !line_buf.is_empty() {
520        let line = String::from_utf8_lossy(&line_buf);
521        if let Ok(mut fl) = full_log.lock() {
522            let ts = now_rfc3339();
523            let _ = writeln!(fl, "{ts} [{label}] {line}");
524        }
525    }
526}
527
528/// Internal supervisor sub-command.
529///
530/// Runs the target command, streams stdout/stderr to individual log files
531/// (`stdout.log`, `stderr.log`) **and** to the combined `full.log`, then
532/// updates `state.json` when the process finishes.
533///
534/// On Windows, the child process is assigned to a named Job Object so that
535/// the entire process tree can be terminated with a single `kill` call.
536/// The Job Object name is recorded in `state.json` as `windows_job_name`.
537pub fn supervise(opts: SuperviseOpts) -> Result<()> {
538    use std::sync::{Arc, Mutex};
539
540    let job_id = opts.job_id;
541    let root = opts.root;
542    let command = opts.command;
543
544    if command.is_empty() {
545        anyhow::bail!("supervisor: no command");
546    }
547
548    let job_dir = JobDir::open(root, job_id)?;
549
550    // Read meta.json to get the started_at timestamp.
551    let meta = job_dir.read_meta()?;
552    let started_at = meta.created_at.clone();
553
554    // Determine full.log path.
555    let full_log_path = if let Some(p) = opts.full_log {
556        std::path::PathBuf::from(p)
557    } else {
558        job_dir.full_log_path()
559    };
560
561    // Create the full.log file (shared between stdout/stderr threads).
562    // Ensure parent directories exist for custom paths.
563    if let Some(parent) = full_log_path.parent() {
564        std::fs::create_dir_all(parent)
565            .with_context(|| format!("create dir for full.log: {}", parent.display()))?;
566    }
567    let full_log_file = std::fs::File::create(&full_log_path).context("create full.log")?;
568    let full_log = Arc::new(Mutex::new(full_log_file));
569
570    // Build the child environment.
571    let mut child_cmd = Command::new(&command[0]);
572    child_cmd.args(&command[1..]);
573
574    if opts.inherit_env {
575        // Start with the current environment (default).
576    } else {
577        child_cmd.env_clear();
578    }
579
580    // Apply env files in order.
581    for env_file in &opts.env_files {
582        let vars = load_env_file(env_file)?;
583        for (k, v) in vars {
584            child_cmd.env(&k, &v);
585        }
586    }
587
588    // Apply --env KEY=VALUE overrides (applied after env-files).
589    for env_var in &opts.env_vars {
590        let (k, v) = parse_env_var(env_var);
591        child_cmd.env(&k, &v);
592    }
593
594    // Set working directory if specified.
595    if let Some(cwd) = opts.cwd {
596        child_cmd.current_dir(cwd);
597    }
598
599    // Spawn the child with piped stdout/stderr so we can tee to logs.
600    let mut child = child_cmd
601        .stdin(std::process::Stdio::null())
602        .stdout(std::process::Stdio::piped())
603        .stderr(std::process::Stdio::piped())
604        .spawn()
605        .context("supervisor: spawn child")?;
606
607    let pid = child.id();
608    info!(job_id, pid, "child process started");
609
610    // On Windows, assign child to a named Job Object for process-tree management.
611    // The job name is derived from the job_id so that `kill` can look it up.
612    // Assignment is a MUST requirement on Windows: if it fails, the supervisor
613    // kills the child process and updates state.json to "failed" before returning
614    // an error, so that the run front-end (which may have already returned) can
615    // detect the failure via state.json on next poll.
616    #[cfg(windows)]
617    let windows_job_name = {
618        match assign_to_job_object(job_id, pid) {
619            Ok(name) => Some(name),
620            Err(e) => {
621                // Job Object assignment failed. Per design.md this is a MUST
622                // requirement on Windows. Kill the child process and update
623                // state.json to "failed" so the run front-end can detect it.
624                let kill_err = child.kill();
625                let _ = child.wait(); // reap to avoid zombies
626
627                let failed_state = JobState {
628                    job: JobStateJob {
629                        id: job_id.to_string(),
630                        status: JobStatus::Failed,
631                        started_at: started_at.clone(),
632                    },
633                    result: JobStateResult {
634                        exit_code: None,
635                        signal: None,
636                        duration_ms: None,
637                    },
638                    pid: Some(pid),
639                    finished_at: Some(now_rfc3339()),
640                    updated_at: now_rfc3339(),
641                    windows_job_name: None,
642                };
643                // Best-effort: if writing state fails, we still propagate the
644                // original assignment error.
645                let _ = job_dir.write_state(&failed_state);
646
647                if let Err(ke) = kill_err {
648                    return Err(anyhow::anyhow!(
649                        "supervisor: failed to assign pid {pid} to Job Object \
650                         (Windows MUST requirement): {e}; also failed to kill child: {ke}"
651                    ));
652                }
653                return Err(anyhow::anyhow!(
654                    "supervisor: failed to assign pid {pid} to Job Object \
655                     (Windows MUST requirement); child process was killed; \
656                     consider running outside a nested Job Object environment: {e}"
657                ));
658            }
659        }
660    };
661    #[cfg(not(windows))]
662    let windows_job_name: Option<String> = None;
663
664    // Update state.json with real child PID and Windows Job Object name.
665    // On Windows, windows_job_name is always Some at this point (guaranteed
666    // by the MUST requirement above), so state.json will always contain the
667    // Job Object identifier while the job is running.
668    let state = JobState {
669        job: JobStateJob {
670            id: job_id.to_string(),
671            status: JobStatus::Running,
672            started_at: started_at.clone(),
673        },
674        result: JobStateResult {
675            exit_code: None,
676            signal: None,
677            duration_ms: None,
678        },
679        pid: Some(pid),
680        finished_at: None,
681        updated_at: now_rfc3339(),
682        windows_job_name,
683    };
684    job_dir.write_state(&state)?;
685
686    let child_start_time = std::time::Instant::now();
687
688    // Take stdout/stderr handles before moving child.
689    let child_stdout = child.stdout.take().expect("child stdout piped");
690    let child_stderr = child.stderr.take().expect("child stderr piped");
691
692    // Thread: read stdout, write to stdout.log and full.log.
693    let stdout_log_path = job_dir.stdout_path();
694    let full_log_stdout = Arc::clone(&full_log);
695    let t_stdout = std::thread::spawn(move || {
696        stream_to_logs(child_stdout, &stdout_log_path, full_log_stdout, "STDOUT");
697    });
698
699    // Thread: read stderr, write to stderr.log and full.log.
700    let stderr_log_path = job_dir.stderr_path();
701    let full_log_stderr = Arc::clone(&full_log);
702    let t_stderr = std::thread::spawn(move || {
703        stream_to_logs(child_stderr, &stderr_log_path, full_log_stderr, "STDERR");
704    });
705
706    // Timeout / kill-after / progress-every handling.
707    // We spawn a watcher thread to handle timeout and periodic state.json updates.
708    let timeout_ms = opts.timeout_ms;
709    let kill_after_ms = opts.kill_after_ms;
710    let progress_every_ms = opts.progress_every_ms;
711    let state_path = job_dir.state_path();
712    let job_id_str = job_id.to_string();
713
714    // Use an atomic flag to signal the watcher thread when the child has exited.
715    use std::sync::atomic::{AtomicBool, Ordering};
716    let child_done = Arc::new(AtomicBool::new(false));
717
718    let watcher = if timeout_ms > 0 || progress_every_ms > 0 {
719        let state_path_clone = state_path.clone();
720        let child_done_clone = Arc::clone(&child_done);
721        Some(std::thread::spawn(move || {
722            let start = std::time::Instant::now();
723            let timeout_dur = if timeout_ms > 0 {
724                Some(std::time::Duration::from_millis(timeout_ms))
725            } else {
726                None
727            };
728            let progress_dur = if progress_every_ms > 0 {
729                Some(std::time::Duration::from_millis(progress_every_ms))
730            } else {
731                None
732            };
733
734            let poll_interval = std::time::Duration::from_millis(100);
735
736            loop {
737                std::thread::sleep(poll_interval);
738
739                // Exit the watcher loop if the child process has finished.
740                if child_done_clone.load(Ordering::Relaxed) {
741                    break;
742                }
743
744                let elapsed = start.elapsed();
745
746                // Check for timeout.
747                if let Some(td) = timeout_dur
748                    && elapsed >= td
749                {
750                    info!(job_id = %job_id_str, "timeout reached, sending SIGTERM");
751                    // Send SIGTERM.
752                    #[cfg(unix)]
753                    {
754                        unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM) };
755                    }
756                    // If kill_after > 0, wait kill_after ms then SIGKILL.
757                    if kill_after_ms > 0 {
758                        std::thread::sleep(std::time::Duration::from_millis(kill_after_ms));
759                        info!(job_id = %job_id_str, "kill-after elapsed, sending SIGKILL");
760                        #[cfg(unix)]
761                        {
762                            unsafe { libc::kill(pid as libc::pid_t, libc::SIGKILL) };
763                        }
764                    } else {
765                        // Immediate SIGKILL.
766                        #[cfg(unix)]
767                        {
768                            unsafe { libc::kill(pid as libc::pid_t, libc::SIGKILL) };
769                        }
770                    }
771                    break;
772                }
773
774                // Progress-every: update updated_at periodically.
775                if let Some(pd) = progress_dur {
776                    let elapsed_ms = elapsed.as_millis() as u64;
777                    let pd_ms = pd.as_millis() as u64;
778                    let poll_ms = poll_interval.as_millis() as u64;
779                    if elapsed_ms % pd_ms < poll_ms {
780                        // Read, update updated_at, write back.
781                        if let Ok(raw) = std::fs::read(&state_path_clone)
782                            && let Ok(mut st) =
783                                serde_json::from_slice::<crate::schema::JobState>(&raw)
784                        {
785                            st.updated_at = now_rfc3339();
786                            if let Ok(s) = serde_json::to_string_pretty(&st) {
787                                let _ = std::fs::write(&state_path_clone, s);
788                            }
789                        }
790                    }
791                }
792            }
793        }))
794    } else {
795        None
796    };
797
798    // Wait for child to finish.
799    let exit_status = child.wait().context("wait for child")?;
800
801    // Signal the watcher that the child has finished so it can exit its loop.
802    child_done.store(true, Ordering::Relaxed);
803
804    // Join logging threads.
805    let _ = t_stdout.join();
806    let _ = t_stderr.join();
807
808    // Join watcher if present.
809    if let Some(w) = watcher {
810        let _ = w.join();
811    }
812
813    let duration_ms = child_start_time.elapsed().as_millis() as u64;
814    let exit_code = exit_status.code();
815    let finished_at = now_rfc3339();
816
817    let state = JobState {
818        job: JobStateJob {
819            id: job_id.to_string(),
820            status: JobStatus::Exited, // non-zero exit still "exited"
821            started_at,
822        },
823        result: JobStateResult {
824            exit_code,
825            signal: None,
826            duration_ms: Some(duration_ms),
827        },
828        pid: Some(pid),
829        finished_at: Some(finished_at),
830        updated_at: now_rfc3339(),
831        windows_job_name: None, // not needed after process exits
832    };
833    job_dir.write_state(&state)?;
834    info!(job_id, ?exit_code, "child process finished");
835    Ok(())
836}
837
838/// Public alias so other modules can call the timestamp helper.
839pub fn now_rfc3339_pub() -> String {
840    now_rfc3339()
841}
842
843fn now_rfc3339() -> String {
844    // Use a simple approach that works without chrono.
845    let d = std::time::SystemTime::now()
846        .duration_since(std::time::UNIX_EPOCH)
847        .unwrap_or_default();
848    format_rfc3339(d.as_secs())
849}
850
851fn format_rfc3339(secs: u64) -> String {
852    // Manual conversion of Unix timestamp to UTC date-time string.
853    let mut s = secs;
854    let seconds = s % 60;
855    s /= 60;
856    let minutes = s % 60;
857    s /= 60;
858    let hours = s % 24;
859    s /= 24;
860
861    // Days since 1970-01-01
862    let mut days = s;
863    let mut year = 1970u64;
864    loop {
865        let days_in_year = if is_leap(year) { 366 } else { 365 };
866        if days < days_in_year {
867            break;
868        }
869        days -= days_in_year;
870        year += 1;
871    }
872
873    let leap = is_leap(year);
874    let month_days: [u64; 12] = [
875        31,
876        if leap { 29 } else { 28 },
877        31,
878        30,
879        31,
880        30,
881        31,
882        31,
883        30,
884        31,
885        30,
886        31,
887    ];
888    let mut month = 0usize;
889    for (i, &d) in month_days.iter().enumerate() {
890        if days < d {
891            month = i;
892            break;
893        }
894        days -= d;
895    }
896    let day = days + 1;
897
898    format!(
899        "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
900        year,
901        month + 1,
902        day,
903        hours,
904        minutes,
905        seconds
906    )
907}
908
909fn is_leap(year: u64) -> bool {
910    (year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
911}
912
913/// Windows-only: create a named Job Object and assign the given child process
914/// to it so that the entire process tree can be terminated via `kill`.
915///
916/// The Job Object is named `"AgentExec-{job_id}"`. This name is stored in
917/// `state.json` so that future `kill` invocations can open the same Job Object
918/// by name and call `TerminateJobObject` to stop the whole tree.
919///
920/// Returns `Ok(name)` on success.  Returns `Err` on failure — the caller
921/// (`supervise`) treats failure as a fatal error because reliable process-tree
922/// management is a Windows MUST requirement (design.md).
923#[cfg(windows)]
924fn assign_to_job_object(job_id: &str, pid: u32) -> Result<String> {
925    use windows::Win32::Foundation::CloseHandle;
926    use windows::Win32::System::JobObjects::{AssignProcessToJobObject, CreateJobObjectW};
927    use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
928    use windows::core::HSTRING;
929
930    let job_name = format!("AgentExec-{job_id}");
931    let hname = HSTRING::from(job_name.as_str());
932
933    unsafe {
934        // Open the child process handle (needed for AssignProcessToJobObject).
935        let proc_handle =
936            OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid).map_err(|e| {
937                anyhow::anyhow!(
938                    "supervisor: OpenProcess(pid={pid}) failed — cannot assign to Job Object: {e}"
939                )
940            })?;
941
942        // Create a named Job Object.
943        let job = match CreateJobObjectW(None, &hname) {
944            Ok(h) => h,
945            Err(e) => {
946                let _ = CloseHandle(proc_handle);
947                return Err(anyhow::anyhow!(
948                    "supervisor: CreateJobObjectW({job_name}) failed: {e}"
949                ));
950            }
951        };
952
953        // Assign the child process to the Job Object.
954        // This can fail if the process is already in another job (e.g. CI/nested).
955        // Per design.md, assignment is a MUST on Windows — failure is a fatal error.
956        if let Err(e) = AssignProcessToJobObject(job, proc_handle) {
957            let _ = CloseHandle(job);
958            let _ = CloseHandle(proc_handle);
959            return Err(anyhow::anyhow!(
960                "supervisor: AssignProcessToJobObject(pid={pid}) failed \
961                 (process may already belong to another Job Object, e.g. in a CI environment): {e}"
962            ));
963        }
964
965        // Keep job handle open for the lifetime of the supervisor so the Job
966        // Object remains valid. We intentionally do NOT close it here.
967        // The OS will close it automatically when the supervisor exits.
968        // (We close proc_handle since we only needed it for assignment.)
969        let _ = CloseHandle(proc_handle);
970        // Note: job handle is intentionally leaked here to keep the Job Object alive.
971        // The handle will be closed when the supervisor process exits.
972        std::mem::forget(job);
973    }
974
975    info!(job_id, name = %job_name, "supervisor: child assigned to Job Object");
976    Ok(job_name)
977}
978
979#[cfg(test)]
980mod tests {
981    use super::*;
982
983    #[test]
984    fn rfc3339_epoch() {
985        assert_eq!(format_rfc3339(0), "1970-01-01T00:00:00Z");
986    }
987
988    #[test]
989    fn rfc3339_known_date() {
990        // 2024-01-01T00:00:00Z = 1704067200
991        assert_eq!(format_rfc3339(1704067200), "2024-01-01T00:00:00Z");
992    }
993}