Skip to main content

agent_exec/
run.rs

1//! Implementation of the `run` sub-command.
2//!
3//! Design decisions (from design.md):
4//! - `run` spawns a short-lived front-end that forks a `_supervise` child.
5//! - The supervisor continues logging stdout/stderr after `run` returns.
6//! - If `--snapshot-after` elapses before `run` returns, a snapshot is
7//!   included in the JSON response.
8
9use anyhow::{Context, Result};
10use std::path::Path;
11use std::process::Command;
12use tracing::{debug, info, warn};
13use ulid::Ulid;
14
15use crate::jobstore::{JobDir, resolve_root};
16use crate::schema::{
17    JobMeta, JobMetaJob, JobState, JobStateJob, JobStateResult, JobStatus, Response, RunData,
18    Snapshot,
19};
20use crate::tag::dedup_tags;
21
22/// Options for the `run` sub-command.
23///
24/// # Definition-time option alignment rule
25///
26/// Every definition-time option accepted here MUST also be accepted by `create` (and vice versa),
27/// since both commands write the same persisted job definition to `meta.json`. When adding a
28/// new persisted metadata field, wire it through both `run` and `create` unless the spec
29/// explicitly documents it as launch-only (e.g. snapshot timing, tail sizing, --wait).
30#[derive(Debug)]
31pub struct RunOpts<'a> {
32    /// Command and arguments to execute.
33    pub command: Vec<String>,
34    /// Override for jobs root directory.
35    pub root: Option<&'a str>,
36    /// Milliseconds to wait before returning; 0 = return immediately.
37    pub snapshot_after: u64,
38    /// Number of tail lines to include in snapshot.
39    pub tail_lines: u64,
40    /// Max bytes for tail.
41    pub max_bytes: u64,
42    /// Timeout in milliseconds; 0 = no timeout.
43    pub timeout_ms: u64,
44    /// Milliseconds after SIGTERM before SIGKILL; 0 = immediate SIGKILL.
45    pub kill_after_ms: u64,
46    /// Working directory for the command.
47    pub cwd: Option<&'a str>,
48    /// Environment variables as KEY=VALUE strings.
49    pub env_vars: Vec<String>,
50    /// Paths to env files, applied in order.
51    pub env_files: Vec<String>,
52    /// Whether to inherit the current process environment (default: true).
53    pub inherit_env: bool,
54    /// Keys to mask in JSON output (values replaced with "***").
55    pub mask: Vec<String>,
56    /// User-defined tags for this job (deduplicated preserving first-seen order).
57    pub tags: Vec<String>,
58    /// Override full.log path; None = use job dir.
59    pub log: Option<&'a str>,
60    /// Interval (ms) for state.json updated_at refresh; 0 = disabled.
61    pub progress_every_ms: u64,
62    /// If true, wait for the job to reach a terminal state before returning.
63    /// The response will include exit_code, finished_at, and final_snapshot.
64    pub wait: bool,
65    /// Poll interval in milliseconds when `wait` is true.
66    pub wait_poll_ms: u64,
67    /// Maximum wait duration in milliseconds when `wait` is true.
68    /// Ignored when `wait_forever` is true.
69    pub wait_until_ms: u64,
70    /// If true, wait indefinitely when `wait` is true.
71    pub wait_forever: bool,
72    /// Shell command string for command notification sink; executed via platform shell.
73    /// None = no command sink.
74    pub notify_command: Option<String>,
75    /// File path for NDJSON notification sink; None = no file sink.
76    pub notify_file: Option<String>,
77    /// Pattern to match against output lines (output-match notification).
78    pub output_pattern: Option<String>,
79    /// Match type for output-match: "contains" or "regex".
80    pub output_match_type: Option<String>,
81    /// Stream selector: "stdout", "stderr", or "either".
82    pub output_stream: Option<String>,
83    /// Shell command string for output-match command sink.
84    pub output_command: Option<String>,
85    /// File path for output-match NDJSON file sink.
86    pub output_file: Option<String>,
87    /// Resolved shell wrapper argv used to execute command strings.
88    /// e.g. `["sh", "-lc"]` or `["bash", "-lc"]`.
89    pub shell_wrapper: Vec<String>,
90}
91
92impl<'a> Default for RunOpts<'a> {
93    fn default() -> Self {
94        RunOpts {
95            command: vec![],
96            root: None,
97            snapshot_after: 10_000,
98            tail_lines: 50,
99            max_bytes: 65536,
100            timeout_ms: 0,
101            kill_after_ms: 0,
102            cwd: None,
103            env_vars: vec![],
104            env_files: vec![],
105            inherit_env: true,
106            mask: vec![],
107            tags: vec![],
108            log: None,
109            progress_every_ms: 0,
110            wait: false,
111            wait_poll_ms: 200,
112            wait_until_ms: 30_000,
113            wait_forever: false,
114            notify_command: None,
115            notify_file: None,
116            output_pattern: None,
117            output_match_type: None,
118            output_stream: None,
119            output_command: None,
120            output_file: None,
121            shell_wrapper: crate::config::default_shell_wrapper(),
122        }
123    }
124}
125
126/// Maximum allowed value for `snapshot_after` in milliseconds (10 seconds).
127const MAX_SNAPSHOT_AFTER_MS: u64 = 10_000;
128
129/// Parameters for spawning a supervisor process.
130///
131/// Shared by `run::execute` and `start::execute`.
132pub struct SpawnSupervisorParams {
133    pub job_id: String,
134    pub root: std::path::PathBuf,
135    pub full_log_path: String,
136    pub timeout_ms: u64,
137    pub kill_after_ms: u64,
138    pub cwd: Option<String>,
139    /// Real (unmasked) KEY=VALUE env var pairs.
140    pub env_vars: Vec<String>,
141    pub env_files: Vec<String>,
142    pub inherit_env: bool,
143    pub progress_every_ms: u64,
144    pub notify_command: Option<String>,
145    pub notify_file: Option<String>,
146    pub shell_wrapper: Vec<String>,
147    pub command: Vec<String>,
148}
149
150/// Spawn the supervisor process and write the initial running state to `state.json`.
151///
152/// Returns the supervisor PID and the actual `started_at` timestamp.
153/// Also handles the Windows Job Object handshake before returning.
154pub fn spawn_supervisor_process(
155    job_dir: &JobDir,
156    params: SpawnSupervisorParams,
157) -> Result<(u32, String)> {
158    let started_at = now_rfc3339();
159
160    let exe = std::env::current_exe().context("resolve current exe")?;
161    let mut supervisor_cmd = Command::new(&exe);
162    supervisor_cmd
163        .arg("_supervise")
164        .arg("--job-id")
165        .arg(&params.job_id)
166        .arg("--supervise-root")
167        .arg(params.root.display().to_string())
168        .arg("--full-log")
169        .arg(&params.full_log_path);
170
171    if params.timeout_ms > 0 {
172        supervisor_cmd
173            .arg("--timeout")
174            .arg(params.timeout_ms.to_string());
175    }
176    if params.kill_after_ms > 0 {
177        supervisor_cmd
178            .arg("--kill-after")
179            .arg(params.kill_after_ms.to_string());
180    }
181    if let Some(ref cwd) = params.cwd {
182        supervisor_cmd.arg("--cwd").arg(cwd);
183    }
184    for env_file in &params.env_files {
185        supervisor_cmd.arg("--env-file").arg(env_file);
186    }
187    for env_var in &params.env_vars {
188        supervisor_cmd.arg("--env").arg(env_var);
189    }
190    if !params.inherit_env {
191        supervisor_cmd.arg("--no-inherit-env");
192    }
193    if params.progress_every_ms > 0 {
194        supervisor_cmd
195            .arg("--progress-every")
196            .arg(params.progress_every_ms.to_string());
197    }
198    if let Some(ref nc) = params.notify_command {
199        supervisor_cmd.arg("--notify-command").arg(nc);
200    }
201    if let Some(ref nf) = params.notify_file {
202        supervisor_cmd.arg("--notify-file").arg(nf);
203    }
204    let wrapper_json =
205        serde_json::to_string(&params.shell_wrapper).context("serialize shell wrapper")?;
206    supervisor_cmd
207        .arg("--shell-wrapper-resolved")
208        .arg(&wrapper_json);
209
210    supervisor_cmd
211        .arg("--")
212        .args(&params.command)
213        .stdin(std::process::Stdio::null())
214        .stdout(std::process::Stdio::null())
215        .stderr(std::process::Stdio::null());
216
217    let supervisor = supervisor_cmd.spawn().context("spawn supervisor")?;
218    let supervisor_pid = supervisor.id();
219    debug!(supervisor_pid, "supervisor spawned");
220
221    // Write initial running state.
222    job_dir.init_state(supervisor_pid, &started_at)?;
223
224    // Windows Job Object handshake.
225    #[cfg(windows)]
226    {
227        let handshake_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
228        loop {
229            std::thread::sleep(std::time::Duration::from_millis(10));
230            if let Ok(current_state) = job_dir.read_state() {
231                let supervisor_updated = current_state
232                    .pid
233                    .map(|p| p != supervisor_pid)
234                    .unwrap_or(false)
235                    || *current_state.status() == crate::schema::JobStatus::Failed;
236                if supervisor_updated {
237                    if *current_state.status() == crate::schema::JobStatus::Failed {
238                        anyhow::bail!(
239                            "supervisor failed to assign child process to Job Object \
240                             (Windows MUST requirement); see stderr for details"
241                        );
242                    }
243                    debug!("supervisor confirmed Job Object assignment via state.json handshake");
244                    break;
245                }
246            }
247            if std::time::Instant::now() >= handshake_deadline {
248                debug!("supervisor handshake timed out; proceeding with initial state");
249                break;
250            }
251        }
252    }
253
254    Ok((supervisor_pid, started_at))
255}
256
257/// Pre-create empty log files (stdout.log, stderr.log, full.log) so they exist
258/// immediately after job creation, before the supervisor starts writing.
259pub fn pre_create_log_files(job_dir: &JobDir) -> Result<()> {
260    for log_path in [
261        job_dir.stdout_path(),
262        job_dir.stderr_path(),
263        job_dir.full_log_path(),
264    ] {
265        std::fs::OpenOptions::new()
266            .create(true)
267            .append(true)
268            .open(&log_path)
269            .with_context(|| format!("pre-create log file {}", log_path.display()))?;
270    }
271    Ok(())
272}
273
274/// Options controlling the snapshot/wait phase after job launch.
275pub struct SnapshotWaitOpts {
276    pub snapshot_after: u64,
277    pub tail_lines: u64,
278    pub max_bytes: u64,
279    pub wait: bool,
280    pub wait_poll_ms: u64,
281    pub wait_until_ms: u64,
282    pub wait_forever: bool,
283}
284
285/// Run the snapshot/wait phase and return (final_state_str, exit_code, finished_at,
286/// snapshot, final_snapshot, waited_ms).
287pub fn run_snapshot_wait(
288    job_dir: &JobDir,
289    opts: &SnapshotWaitOpts,
290) -> (
291    String,
292    Option<i32>,
293    Option<String>,
294    Option<Snapshot>,
295    Option<Snapshot>,
296    u64,
297) {
298    use crate::schema::JobStatus;
299
300    let effective_snapshot_after = if opts.wait {
301        0
302    } else {
303        opts.snapshot_after.min(MAX_SNAPSHOT_AFTER_MS)
304    };
305
306    let wait_start = std::time::Instant::now();
307
308    let snapshot = if effective_snapshot_after > 0 {
309        debug!(ms = effective_snapshot_after, "polling for snapshot");
310        let deadline = wait_start + std::time::Duration::from_millis(effective_snapshot_after);
311        let poll_interval = std::time::Duration::from_millis(15);
312        loop {
313            std::thread::sleep(poll_interval);
314            if let Ok(st) = job_dir.read_state()
315                && !st.status().is_non_terminal()
316            {
317                debug!("snapshot poll: job no longer running/created, exiting early");
318                break;
319            }
320            if std::time::Instant::now() >= deadline {
321                debug!("snapshot poll: deadline reached");
322                break;
323            }
324        }
325        Some(build_snapshot(job_dir, opts.tail_lines, opts.max_bytes))
326    } else {
327        None
328    };
329
330    let (final_state, exit_code_opt, finished_at_opt, final_snapshot_opt) = if opts.wait {
331        debug!(
332            wait_until_ms = opts.wait_until_ms,
333            wait_forever = opts.wait_forever,
334            "--wait: polling for terminal or deadline"
335        );
336        let poll = std::time::Duration::from_millis(opts.wait_poll_ms.max(1));
337        let wait_deadline = if opts.wait_forever {
338            None
339        } else {
340            Some(wait_start + std::time::Duration::from_millis(opts.wait_until_ms))
341        };
342
343        loop {
344            std::thread::sleep(poll);
345            if let Ok(st) = job_dir.read_state() {
346                if !st.status().is_non_terminal() {
347                    let snap = build_snapshot(job_dir, opts.tail_lines, opts.max_bytes);
348                    let ec = st.exit_code();
349                    let fa = st.finished_at.clone();
350                    let state_str = st.status().as_str().to_string();
351                    break (state_str, ec, fa, Some(snap));
352                }
353
354                if let Some(deadline) = wait_deadline
355                    && std::time::Instant::now() >= deadline
356                {
357                    break (st.status().as_str().to_string(), None, None, None);
358                }
359            }
360        }
361    } else {
362        (JobStatus::Running.as_str().to_string(), None, None, None)
363    };
364
365    let waited_ms = wait_start.elapsed().as_millis() as u64;
366    (
367        final_state,
368        exit_code_opt,
369        finished_at_opt,
370        snapshot,
371        final_snapshot_opt,
372        waited_ms,
373    )
374}
375
376/// Execute `run`: spawn job, possibly wait for snapshot, return JSON.
377pub fn execute(opts: RunOpts) -> Result<()> {
378    if opts.command.is_empty() {
379        anyhow::bail!("no command specified for run");
380    }
381
382    let elapsed_start = std::time::Instant::now();
383
384    let root = resolve_root(opts.root);
385    std::fs::create_dir_all(&root)
386        .with_context(|| format!("create jobs root {}", root.display()))?;
387
388    let job_id = Ulid::new().to_string();
389    let created_at = now_rfc3339();
390
391    // Extract only the key names from KEY=VALUE env var strings (values are not persisted).
392    let env_keys: Vec<String> = opts
393        .env_vars
394        .iter()
395        .map(|kv| kv.split('=').next().unwrap_or(kv.as_str()).to_string())
396        .collect();
397
398    // Apply masking: replace values of masked keys with "***" in env_vars for metadata.
399    let masked_env_vars = mask_env_vars(&opts.env_vars, &opts.mask);
400
401    // Resolve the effective working directory for this job.
402    // If --cwd was specified, use that path; otherwise use the current process's working directory.
403    // Canonicalize the path for consistent comparison; fall back to absolute path on failure.
404    let effective_cwd = resolve_effective_cwd(opts.cwd);
405
406    // Build output-match config from definition-time options (same logic as `create` and `notify set`).
407    let on_output_match = crate::notify::build_output_match_config(
408        opts.output_pattern,
409        opts.output_match_type,
410        opts.output_stream,
411        opts.output_command,
412        opts.output_file,
413        None,
414    );
415
416    let notification =
417        if opts.notify_command.is_some() || opts.notify_file.is_some() || on_output_match.is_some()
418        {
419            Some(crate::schema::NotificationConfig {
420                notify_command: opts.notify_command.clone(),
421                notify_file: opts.notify_file.clone(),
422                on_output_match,
423            })
424        } else {
425            None
426        };
427
428    // Validate and deduplicate tags (preserving first-seen order).
429    let tags = dedup_tags(opts.tags)?;
430
431    let meta = JobMeta {
432        job: JobMetaJob { id: job_id.clone() },
433        schema_version: crate::schema::SCHEMA_VERSION.to_string(),
434        command: opts.command.clone(),
435        created_at: created_at.clone(),
436        root: root.display().to_string(),
437        env_keys,
438        env_vars: masked_env_vars.clone(),
439        // For `run`, env_vars_runtime is not populated because the supervisor
440        // is spawned immediately with the real values; no deferred start needed.
441        env_vars_runtime: vec![],
442        mask: opts.mask.clone(),
443        cwd: Some(effective_cwd),
444        notification,
445        // Execution-definition fields (used by start if ever applicable).
446        inherit_env: opts.inherit_env,
447        env_files: opts.env_files.clone(),
448        timeout_ms: opts.timeout_ms,
449        kill_after_ms: opts.kill_after_ms,
450        progress_every_ms: opts.progress_every_ms,
451        shell_wrapper: Some(opts.shell_wrapper.clone()),
452        tags: tags.clone(),
453    };
454
455    let job_dir = JobDir::create(&root, &job_id, &meta)?;
456    info!(job_id = %job_id, "created job directory");
457
458    // Determine the full.log path (may be overridden by --log).
459    let full_log_path = if let Some(log) = opts.log {
460        log.to_string()
461    } else {
462        job_dir.full_log_path().display().to_string()
463    };
464
465    // Pre-create empty log files so they exist before the supervisor starts.
466    pre_create_log_files(&job_dir)?;
467
468    // Spawn the supervisor using the shared helper.
469    // Note: masking is handled by `run` (meta.json + JSON response). The supervisor
470    // receives the real env var values so the child process can use them as intended.
471    let (_supervisor_pid, _started_at) = spawn_supervisor_process(
472        &job_dir,
473        SpawnSupervisorParams {
474            job_id: job_id.clone(),
475            root: root.clone(),
476            full_log_path: full_log_path.clone(),
477            timeout_ms: opts.timeout_ms,
478            kill_after_ms: opts.kill_after_ms,
479            cwd: opts.cwd.map(|s| s.to_string()),
480            env_vars: opts.env_vars.clone(),
481            env_files: opts.env_files.clone(),
482            inherit_env: opts.inherit_env,
483            progress_every_ms: opts.progress_every_ms,
484            notify_command: opts.notify_command.clone(),
485            notify_file: opts.notify_file.clone(),
486            shell_wrapper: opts.shell_wrapper.clone(),
487            command: opts.command.clone(),
488        },
489    )?;
490
491    // Compute absolute paths for stdout.log and stderr.log.
492    let stdout_log_path = job_dir.stdout_path().display().to_string();
493    let stderr_log_path = job_dir.stderr_path().display().to_string();
494
495    let (final_state, exit_code_opt, finished_at_opt, snapshot, final_snapshot_opt, waited_ms) =
496        run_snapshot_wait(
497            &job_dir,
498            &SnapshotWaitOpts {
499                snapshot_after: opts.snapshot_after,
500                tail_lines: opts.tail_lines,
501                max_bytes: opts.max_bytes,
502                wait: opts.wait,
503                wait_poll_ms: opts.wait_poll_ms,
504                wait_until_ms: opts.wait_until_ms,
505                wait_forever: opts.wait_forever,
506            },
507        );
508
509    let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
510
511    let response = Response::new(
512        "run",
513        RunData {
514            job_id,
515            state: final_state,
516            tags,
517            // Include masked env_vars in the JSON response so callers can inspect
518            // which variables were set (with secret values replaced by "***").
519            env_vars: masked_env_vars,
520            snapshot,
521            stdout_log_path,
522            stderr_log_path,
523            waited_ms,
524            elapsed_ms,
525            exit_code: exit_code_opt,
526            finished_at: finished_at_opt,
527            final_snapshot: final_snapshot_opt,
528        },
529    );
530    response.print();
531    Ok(())
532}
533
534fn build_snapshot(job_dir: &JobDir, tail_lines: u64, max_bytes: u64) -> Snapshot {
535    let stdout = job_dir.read_tail_metrics("stdout.log", tail_lines, max_bytes);
536    let stderr = job_dir.read_tail_metrics("stderr.log", tail_lines, max_bytes);
537    Snapshot {
538        truncated: stdout.truncated || stderr.truncated,
539        encoding: "utf-8-lossy".to_string(),
540        stdout_observed_bytes: stdout.observed_bytes,
541        stderr_observed_bytes: stderr.observed_bytes,
542        stdout_included_bytes: stdout.included_bytes,
543        stderr_included_bytes: stderr.included_bytes,
544        stdout_tail: stdout.tail,
545        stderr_tail: stderr.tail,
546    }
547}
548
549/// Options for the `_supervise` internal sub-command.
550///
551/// Masking is the responsibility of `run` (which writes masked values to meta.json
552/// and includes them in the JSON response). The supervisor only needs the real
553/// environment variable values to launch the child process correctly.
554#[derive(Debug)]
555pub struct SuperviseOpts<'a> {
556    pub job_id: &'a str,
557    pub root: &'a Path,
558    pub command: &'a [String],
559    /// Override full.log path; None = use job dir default.
560    pub full_log: Option<&'a str>,
561    /// Timeout in milliseconds; 0 = no timeout.
562    pub timeout_ms: u64,
563    /// Milliseconds after SIGTERM before SIGKILL; 0 = immediate SIGKILL.
564    pub kill_after_ms: u64,
565    /// Working directory for the child process.
566    pub cwd: Option<&'a str>,
567    /// Environment variables as KEY=VALUE strings (real values, not masked).
568    pub env_vars: Vec<String>,
569    /// Paths to env files, applied in order.
570    pub env_files: Vec<String>,
571    /// Whether to inherit the current process environment.
572    pub inherit_env: bool,
573    /// Interval (ms) for state.json updated_at refresh; 0 = disabled.
574    pub progress_every_ms: u64,
575    /// Shell command string for command notification sink; executed via platform shell.
576    /// None = no command sink.
577    pub notify_command: Option<String>,
578    /// File path for NDJSON notification sink; None = no file sink.
579    pub notify_file: Option<String>,
580    /// Resolved shell wrapper argv used to execute command strings.
581    pub shell_wrapper: Vec<String>,
582}
583
584/// Resolve the effective working directory for a job.
585///
586/// If `cwd_override` is `Some`, use that path as the base. Otherwise use the
587/// current process working directory. In either case, attempt to canonicalize
588/// the path for consistent comparison; on failure, fall back to the absolute
589/// path representation (avoids symlink / permission issues on some systems).
590pub fn resolve_effective_cwd(cwd_override: Option<&str>) -> String {
591    let base = match cwd_override {
592        Some(p) => std::path::PathBuf::from(p),
593        None => std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")),
594    };
595
596    // Prefer canonicalized (resolves symlinks); fall back to making the path absolute.
597    match base.canonicalize() {
598        Ok(canonical) => canonical.display().to_string(),
599        Err(_) => {
600            // If base is already absolute, use as-is; otherwise prepend cwd.
601            if base.is_absolute() {
602                base.display().to_string()
603            } else {
604                // Best-effort: join with cwd, ignore errors.
605                let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
606                cwd.join(base).display().to_string()
607            }
608        }
609    }
610}
611
612/// Mask the values of specified keys in a list of KEY=VALUE strings.
613/// Keys listed in `mask_keys` will have their value replaced with "***".
614pub fn mask_env_vars(env_vars: &[String], mask_keys: &[String]) -> Vec<String> {
615    if mask_keys.is_empty() {
616        return env_vars.to_vec();
617    }
618    env_vars
619        .iter()
620        .map(|s| {
621            let (key, _val) = parse_env_var(s);
622            if mask_keys.iter().any(|k| k == &key) {
623                format!("{key}=***")
624            } else {
625                s.clone()
626            }
627        })
628        .collect()
629}
630
631/// Parse a single KEY=VALUE or KEY= string into (key, value).
632fn parse_env_var(s: &str) -> (String, String) {
633    if let Some(pos) = s.find('=') {
634        (s[..pos].to_string(), s[pos + 1..].to_string())
635    } else {
636        (s.to_string(), String::new())
637    }
638}
639
640/// Load environment variables from a .env-style file.
641/// Supports KEY=VALUE lines; lines starting with '#' and empty lines are ignored.
642fn load_env_file(path: &str) -> Result<Vec<(String, String)>> {
643    let contents =
644        std::fs::read_to_string(path).with_context(|| format!("read env-file {path}"))?;
645    let mut vars = Vec::new();
646    for line in contents.lines() {
647        let line = line.trim();
648        if line.is_empty() || line.starts_with('#') {
649            continue;
650        }
651        vars.push(parse_env_var(line));
652    }
653    Ok(vars)
654}
655
656/// Shared state for output-match checking, used by streaming threads in `supervise`.
657///
658/// Reloads `meta.json` on every observed line so that a `notify set` update is
659/// visible for the very next line, regardless of how recently the last reload
660/// occurred.  Multiple streaming threads share the same checker via `Arc`; the
661/// internal `Mutex` serialises access.
662struct OutputMatchChecker {
663    job_dir_path: std::path::PathBuf,
664    shell_wrapper: Vec<String>,
665    inner: std::sync::Mutex<OutputMatchInner>,
666}
667
668struct OutputMatchInner {
669    config: Option<crate::schema::NotificationConfig>,
670}
671
672impl OutputMatchChecker {
673    fn new(
674        job_dir_path: std::path::PathBuf,
675        shell_wrapper: Vec<String>,
676        initial_config: Option<crate::schema::NotificationConfig>,
677    ) -> Self {
678        Self {
679            job_dir_path,
680            shell_wrapper,
681            inner: std::sync::Mutex::new(OutputMatchInner {
682                config: initial_config,
683            }),
684        }
685    }
686
687    /// Check a newly observed output line for a configured match.
688    ///
689    /// Reloads `meta.json` on every call so that `notify set` updates are
690    /// visible for the next line without any delay.
691    /// Dispatches `job.output.matched` events outside the lock to avoid blocking
692    /// other streaming threads.
693    fn check_line(&self, line: &str, stream: &str) {
694        use crate::schema::{OutputMatchStream, OutputMatchType};
695
696        // Lock, reload, evaluate match, then release before dispatching.
697        let match_info: Option<crate::schema::OutputMatchConfig> = {
698            let mut inner = self.inner.lock().unwrap();
699
700            // Reload config on every line to pick up `notify set` updates immediately.
701            {
702                let meta_path = self.job_dir_path.join("meta.json");
703                if let Ok(raw) = std::fs::read(&meta_path)
704                    && let Ok(meta) = serde_json::from_slice::<crate::schema::JobMeta>(&raw)
705                {
706                    inner.config = meta.notification;
707                }
708            }
709
710            let Some(ref notification) = inner.config else {
711                return;
712            };
713            let Some(ref match_cfg) = notification.on_output_match else {
714                return;
715            };
716
717            // Check stream filter.
718            let stream_matches = match match_cfg.stream {
719                OutputMatchStream::Stdout => stream == "stdout",
720                OutputMatchStream::Stderr => stream == "stderr",
721                OutputMatchStream::Either => true,
722            };
723            if !stream_matches {
724                return;
725            }
726
727            // Check pattern.
728            let matched = match &match_cfg.match_type {
729                OutputMatchType::Contains => line.contains(&match_cfg.pattern),
730                OutputMatchType::Regex => regex::Regex::new(&match_cfg.pattern)
731                    .map(|re| re.is_match(line))
732                    .unwrap_or(false),
733            };
734
735            if matched {
736                Some(match_cfg.clone())
737            } else {
738                None
739            }
740        }; // Lock released.
741
742        if let Some(match_cfg) = match_info {
743            self.dispatch_match(line, stream, &match_cfg);
744        }
745    }
746
747    /// Dispatch a `job.output.matched` event and append a delivery record to
748    /// `notification_events.ndjson`.  Failures are non-fatal.
749    fn dispatch_match(
750        &self,
751        line: &str,
752        stream: &str,
753        match_cfg: &crate::schema::OutputMatchConfig,
754    ) {
755        use std::io::Write;
756
757        let job_id = self
758            .job_dir_path
759            .file_name()
760            .and_then(|n| n.to_str())
761            .unwrap_or("unknown");
762
763        let stdout_log_path = self.job_dir_path.join("stdout.log").display().to_string();
764        let stderr_log_path = self.job_dir_path.join("stderr.log").display().to_string();
765        let events_path = self.job_dir_path.join("notification_events.ndjson");
766        let events_path_str = events_path.display().to_string();
767
768        let match_type_str = match &match_cfg.match_type {
769            crate::schema::OutputMatchType::Contains => "contains",
770            crate::schema::OutputMatchType::Regex => "regex",
771        };
772
773        let event = crate::schema::OutputMatchEvent {
774            schema_version: crate::schema::SCHEMA_VERSION.to_string(),
775            event_type: "job.output.matched".to_string(),
776            job_id: job_id.to_string(),
777            pattern: match_cfg.pattern.clone(),
778            match_type: match_type_str.to_string(),
779            stream: stream.to_string(),
780            line: line.to_string(),
781            stdout_log_path,
782            stderr_log_path,
783        };
784
785        let event_json = serde_json::to_string(&event).unwrap_or_default();
786        let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
787
788        if let Some(ref cmd) = match_cfg.command {
789            delivery_results.push(dispatch_command_sink(
790                cmd,
791                &event_json,
792                job_id,
793                &events_path_str,
794                &self.shell_wrapper,
795                "job.output.matched",
796            ));
797        }
798        if let Some(ref file_path) = match_cfg.file {
799            delivery_results.push(dispatch_file_sink(file_path, &event_json));
800        }
801
802        // Append delivery record to notification_events.ndjson.
803        let record = crate::schema::OutputMatchEventRecord {
804            event,
805            delivery_results,
806        };
807        if let Ok(record_json) = serde_json::to_string(&record)
808            && let Ok(mut f) = std::fs::OpenOptions::new()
809                .create(true)
810                .append(true)
811                .open(&events_path)
812        {
813            let _ = writeln!(f, "{record_json}");
814        }
815    }
816}
817
818/// Stream bytes from a child process output pipe to an individual log file and
819/// to the shared `full.log`.
820///
821/// Reads byte chunks (not lines) so that output without a trailing newline is
822/// still captured in the individual log immediately.  The `full.log` format
823/// `"<RFC3339> [LABEL] <line>"` is maintained via a line-accumulation buffer:
824/// bytes are appended to the buffer until a newline is found, at which point a
825/// formatted line is written to `full.log`.  Any remaining bytes at EOF are
826/// flushed as a final line.
827///
828/// The optional `on_line` callback is invoked for each complete line (without
829/// the trailing newline) and is used to drive output-match checking.
830///
831/// This helper is used by both the stdout and stderr monitoring threads inside
832/// [`supervise`], replacing the previously duplicated per-stream implementations.
833/// Buffer size (8192 bytes) and newline-split logic are preserved unchanged.
834fn stream_to_logs<R, F>(
835    stream: R,
836    log_path: &std::path::Path,
837    full_log: std::sync::Arc<std::sync::Mutex<std::fs::File>>,
838    label: &str,
839    on_line: Option<F>,
840) where
841    R: std::io::Read,
842    F: Fn(&str),
843{
844    use std::io::Write;
845    let mut log_file = std::fs::File::create(log_path).expect("create stream log file in thread");
846    let mut stream = stream;
847    let mut buf = [0u8; 8192];
848    // Incomplete-line buffer for full.log formatting.
849    let mut line_buf: Vec<u8> = Vec::new();
850    loop {
851        match stream.read(&mut buf) {
852            Ok(0) => break, // EOF
853            Ok(n) => {
854                let chunk = &buf[..n];
855                // Write raw bytes to the individual log (captures partial lines too).
856                let _ = log_file.write_all(chunk);
857                // Accumulate bytes for full.log line formatting.
858                for &b in chunk {
859                    if b == b'\n' {
860                        let line = String::from_utf8_lossy(&line_buf);
861                        if let Ok(mut fl) = full_log.lock() {
862                            let ts = now_rfc3339();
863                            let _ = writeln!(fl, "{ts} [{label}] {line}");
864                        }
865                        if let Some(ref f) = on_line {
866                            f(&line);
867                        }
868                        line_buf.clear();
869                    } else {
870                        line_buf.push(b);
871                    }
872                }
873            }
874            Err(_) => break,
875        }
876    }
877    // Flush any remaining incomplete line to full.log and trigger callback.
878    if !line_buf.is_empty() {
879        let line = String::from_utf8_lossy(&line_buf);
880        if let Ok(mut fl) = full_log.lock() {
881            let ts = now_rfc3339();
882            let _ = writeln!(fl, "{ts} [{label}] {line}");
883        }
884        if let Some(ref f) = on_line {
885            f(&line);
886        }
887    }
888}
889
890/// Internal supervisor sub-command.
891///
892/// Runs the target command, streams stdout/stderr to individual log files
893/// (`stdout.log`, `stderr.log`) **and** to the combined `full.log`, then
894/// updates `state.json` when the process finishes.
895///
896/// On Windows, the child process is assigned to a named Job Object so that
897/// the entire process tree can be terminated with a single `kill` call.
898/// The Job Object name is recorded in `state.json` as `windows_job_name`.
899pub fn supervise(opts: SuperviseOpts) -> Result<()> {
900    use std::sync::{Arc, Mutex};
901
902    let job_id = opts.job_id;
903    let root = opts.root;
904    let command = opts.command;
905
906    if command.is_empty() {
907        anyhow::bail!("supervisor: no command");
908    }
909
910    let job_dir = JobDir::open(root, job_id)?;
911
912    // Read meta.json for notification config and cwd (used in completion event).
913    let meta = job_dir.read_meta()?;
914    // Use the actual execution start time, not meta.created_at.
915    // For `run`, these are nearly identical. For `start`, the job may have
916    // been created long before it was started.
917    let started_at = now_rfc3339();
918
919    // Determine full.log path.
920    let full_log_path = if let Some(p) = opts.full_log {
921        std::path::PathBuf::from(p)
922    } else {
923        job_dir.full_log_path()
924    };
925
926    // Create the full.log file (shared between stdout/stderr threads).
927    // Ensure parent directories exist for custom paths.
928    if let Some(parent) = full_log_path.parent() {
929        std::fs::create_dir_all(parent)
930            .with_context(|| format!("create dir for full.log: {}", parent.display()))?;
931    }
932    let full_log_file = std::fs::File::create(&full_log_path).context("create full.log")?;
933    let full_log = Arc::new(Mutex::new(full_log_file));
934
935    // Execute command through the shell wrapper.
936    //
937    // Two launch modes:
938    //
939    //   String mode (command.len() == 1):  The single element is a shell command
940    //   string passed as-is to the wrapper (e.g. `"echo hello && ls"` preserves
941    //   shell operators).  The wrapper process is the workload boundary.
942    //
943    //   Argv mode (command.len() > 1):  The wrapper is used for login-shell
944    //   environment initialisation but immediately hands off to the target via
945    //   `exec "$@"`.  The shell replaces itself so the observed child PID and
946    //   lifecycle align with the intended workload, not the wrapper.
947    //
948    // --notify-command delivery always uses the wrapper in string mode
949    // (see dispatch_command_sink); this change only affects job argv launches.
950    if opts.shell_wrapper.is_empty() {
951        anyhow::bail!("supervisor: shell wrapper must not be empty");
952    }
953    let mut child_cmd = Command::new(&opts.shell_wrapper[0]);
954    if command.len() == 1 {
955        // Shell-string mode: pass the command string to the wrapper as-is.
956        child_cmd.args(&opts.shell_wrapper[1..]).arg(&command[0]);
957    } else {
958        // Argv mode: launch the workload via the shell wrapper.
959        //
960        // On Unix the wrapper hands off to the workload via `exec "$@"` so the
961        // shell replaces itself and the observed PID / lifecycle align with the
962        // intended workload, not the wrapper.
963        //
964        // On non-Unix platforms (Windows) there is no POSIX `exec`; the wrapper
965        // is invoked in shell-string mode with the argv joined into a single
966        // quoted command string, preserving the existing cmd/C semantics.
967        #[cfg(unix)]
968        {
969            // `--` serves as $0; argv elements become $1..$n so `$@` expands
970            // to the full workload argv.
971            child_cmd
972                .args(&opts.shell_wrapper[1..])
973                .arg("exec \"$@\"")
974                .arg("--")
975                .args(command);
976        }
977        #[cfg(not(unix))]
978        {
979            // Windows fallback: join argv into a shell-compatible string and
980            // pass it to the wrapper as a single command string (same as
981            // shell-string mode), so cmd /C semantics are preserved.
982            let joined = command
983                .iter()
984                .map(|a| {
985                    if a.contains(' ') {
986                        format!("\"{}\"", a)
987                    } else {
988                        a.clone()
989                    }
990                })
991                .collect::<Vec<_>>()
992                .join(" ");
993            child_cmd.args(&opts.shell_wrapper[1..]).arg(joined);
994        }
995    }
996
997    if opts.inherit_env {
998        // Start with the current environment (default).
999    } else {
1000        child_cmd.env_clear();
1001    }
1002
1003    // Apply env files in order.
1004    for env_file in &opts.env_files {
1005        let vars = load_env_file(env_file)?;
1006        for (k, v) in vars {
1007            child_cmd.env(&k, &v);
1008        }
1009    }
1010
1011    // Apply --env KEY=VALUE overrides (applied after env-files).
1012    for env_var in &opts.env_vars {
1013        let (k, v) = parse_env_var(env_var);
1014        child_cmd.env(&k, &v);
1015    }
1016
1017    // Set working directory if specified.
1018    if let Some(cwd) = opts.cwd {
1019        child_cmd.current_dir(cwd);
1020    }
1021
1022    // Put the child in its own process group so that timeout signals
1023    // (SIGTERM / SIGKILL) reach the entire process tree, not just the
1024    // shell wrapper.  Without this, `sh -lc "sleep 60"` would absorb
1025    // the signal while the grandchild (`sleep`) keeps running.
1026    #[cfg(unix)]
1027    {
1028        use std::os::unix::process::CommandExt;
1029        // SAFETY: setsid is async-signal-safe and called before exec.
1030        unsafe {
1031            child_cmd.pre_exec(|| {
1032                libc::setsid();
1033                Ok(())
1034            });
1035        }
1036    }
1037
1038    // Spawn the child with piped stdout/stderr so we can tee to logs.
1039    let mut child = child_cmd
1040        .stdin(std::process::Stdio::null())
1041        .stdout(std::process::Stdio::piped())
1042        .stderr(std::process::Stdio::piped())
1043        .spawn()
1044        .context("supervisor: spawn child")?;
1045
1046    let pid = child.id();
1047    info!(job_id, pid, "child process started");
1048
1049    // On Windows, assign child to a named Job Object for process-tree management.
1050    // The job name is derived from the job_id so that `kill` can look it up.
1051    // Assignment is a MUST requirement on Windows: if it fails, the supervisor
1052    // kills the child process and updates state.json to "failed" before returning
1053    // an error, so that the run front-end (which may have already returned) can
1054    // detect the failure via state.json on next poll.
1055    #[cfg(windows)]
1056    let windows_job_name = {
1057        match assign_to_job_object(job_id, pid) {
1058            Ok(name) => Some(name),
1059            Err(e) => {
1060                // Job Object assignment failed. Per design.md this is a MUST
1061                // requirement on Windows. Kill the child process and update
1062                // state.json to "failed" so the run front-end can detect it.
1063                let kill_err = child.kill();
1064                let _ = child.wait(); // reap to avoid zombies
1065
1066                let failed_state = JobState {
1067                    job: JobStateJob {
1068                        id: job_id.to_string(),
1069                        status: JobStatus::Failed,
1070                        started_at: Some(started_at.clone()),
1071                    },
1072                    result: JobStateResult {
1073                        exit_code: None,
1074                        signal: None,
1075                        duration_ms: None,
1076                    },
1077                    pid: Some(pid),
1078                    finished_at: Some(now_rfc3339()),
1079                    updated_at: now_rfc3339(),
1080                    windows_job_name: None,
1081                };
1082                // Best-effort: if writing state fails, we still propagate the
1083                // original assignment error.
1084                let _ = job_dir.write_state(&failed_state);
1085
1086                // Dispatch completion event for the failed state if notifications are configured.
1087                // This mirrors the dispatch logic in the normal exit path so that callers
1088                // receive a job.finished event even when the supervisor fails early (Windows only).
1089                if opts.notify_command.is_some() || opts.notify_file.is_some() {
1090                    let finished_at_ts =
1091                        failed_state.finished_at.clone().unwrap_or_else(now_rfc3339);
1092                    let stdout_log = job_dir.stdout_path().display().to_string();
1093                    let stderr_log = job_dir.stderr_path().display().to_string();
1094                    let fail_event = crate::schema::CompletionEvent {
1095                        schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1096                        event_type: "job.finished".to_string(),
1097                        job_id: job_id.to_string(),
1098                        state: JobStatus::Failed.as_str().to_string(),
1099                        command: meta.command.clone(),
1100                        cwd: meta.cwd.clone(),
1101                        started_at: started_at.clone(),
1102                        finished_at: finished_at_ts,
1103                        duration_ms: None,
1104                        exit_code: None,
1105                        signal: None,
1106                        stdout_log_path: stdout_log,
1107                        stderr_log_path: stderr_log,
1108                    };
1109                    let fail_event_json = serde_json::to_string(&fail_event).unwrap_or_default();
1110                    let fail_event_path = job_dir.completion_event_path().display().to_string();
1111                    let mut fail_delivery_results: Vec<crate::schema::SinkDeliveryResult> =
1112                        Vec::new();
1113                    if let Err(we) = job_dir.write_completion_event_atomic(
1114                        &crate::schema::CompletionEventRecord {
1115                            event: fail_event.clone(),
1116                            delivery_results: vec![],
1117                        },
1118                    ) {
1119                        warn!(
1120                            job_id,
1121                            error = %we,
1122                            "failed to write initial completion_event.json for failed job"
1123                        );
1124                    }
1125                    if let Some(ref shell_cmd) = opts.notify_command {
1126                        fail_delivery_results.push(dispatch_command_sink(
1127                            shell_cmd,
1128                            &fail_event_json,
1129                            job_id,
1130                            &fail_event_path,
1131                            &opts.shell_wrapper,
1132                            "job.finished",
1133                        ));
1134                    }
1135                    if let Some(ref file_path) = opts.notify_file {
1136                        fail_delivery_results.push(dispatch_file_sink(file_path, &fail_event_json));
1137                    }
1138                    if let Err(we) = job_dir.write_completion_event_atomic(
1139                        &crate::schema::CompletionEventRecord {
1140                            event: fail_event,
1141                            delivery_results: fail_delivery_results,
1142                        },
1143                    ) {
1144                        warn!(
1145                            job_id,
1146                            error = %we,
1147                            "failed to update completion_event.json with delivery results for failed job"
1148                        );
1149                    }
1150                }
1151
1152                if let Err(ke) = kill_err {
1153                    return Err(anyhow::anyhow!(
1154                        "supervisor: failed to assign pid {pid} to Job Object \
1155                         (Windows MUST requirement): {e}; also failed to kill child: {ke}"
1156                    ));
1157                }
1158                return Err(anyhow::anyhow!(
1159                    "supervisor: failed to assign pid {pid} to Job Object \
1160                     (Windows MUST requirement); child process was killed; \
1161                     consider running outside a nested Job Object environment: {e}"
1162                ));
1163            }
1164        }
1165    };
1166    #[cfg(not(windows))]
1167    let windows_job_name: Option<String> = None;
1168
1169    // Update state.json with real child PID and Windows Job Object name.
1170    // On Windows, windows_job_name is always Some at this point (guaranteed
1171    // by the MUST requirement above), so state.json will always contain the
1172    // Job Object identifier while the job is running.
1173    let state = JobState {
1174        job: JobStateJob {
1175            id: job_id.to_string(),
1176            status: JobStatus::Running,
1177            started_at: Some(started_at.clone()),
1178        },
1179        result: JobStateResult {
1180            exit_code: None,
1181            signal: None,
1182            duration_ms: None,
1183        },
1184        pid: Some(pid),
1185        finished_at: None,
1186        updated_at: now_rfc3339(),
1187        windows_job_name,
1188    };
1189    job_dir.write_state(&state)?;
1190
1191    let child_start_time = std::time::Instant::now();
1192
1193    // Take stdout/stderr handles before moving child.
1194    let child_stdout = child.stdout.take().expect("child stdout piped");
1195    let child_stderr = child.stderr.take().expect("child stderr piped");
1196
1197    // Create shared output-match checker from the initial meta notification config.
1198    let match_checker = std::sync::Arc::new(OutputMatchChecker::new(
1199        job_dir.path.clone(),
1200        opts.shell_wrapper.clone(),
1201        meta.notification.clone(),
1202    ));
1203
1204    // Completion channels for log threads: each thread sends `()` after stream_to_logs returns.
1205    // Used for bounded joins below (allows supervisor to exit promptly when descendants
1206    // hold inherited pipe ends open indefinitely).
1207    let (tx_stdout_done, rx_stdout_done) = std::sync::mpsc::channel::<()>();
1208    let (tx_stderr_done, rx_stderr_done) = std::sync::mpsc::channel::<()>();
1209
1210    // Thread: read stdout, write to stdout.log and full.log.
1211    let stdout_log_path = job_dir.stdout_path();
1212    let full_log_stdout = Arc::clone(&full_log);
1213    let match_checker_stdout = std::sync::Arc::clone(&match_checker);
1214    let t_stdout = std::thread::spawn(move || {
1215        stream_to_logs(
1216            child_stdout,
1217            &stdout_log_path,
1218            full_log_stdout,
1219            "STDOUT",
1220            Some(move |line: &str| match_checker_stdout.check_line(line, "stdout")),
1221        );
1222        let _ = tx_stdout_done.send(());
1223    });
1224
1225    // Thread: read stderr, write to stderr.log and full.log.
1226    let stderr_log_path = job_dir.stderr_path();
1227    let full_log_stderr = Arc::clone(&full_log);
1228    let match_checker_stderr = std::sync::Arc::clone(&match_checker);
1229    let t_stderr = std::thread::spawn(move || {
1230        stream_to_logs(
1231            child_stderr,
1232            &stderr_log_path,
1233            full_log_stderr,
1234            "STDERR",
1235            Some(move |line: &str| match_checker_stderr.check_line(line, "stderr")),
1236        );
1237        let _ = tx_stderr_done.send(());
1238    });
1239
1240    // Timeout / kill-after / progress-every handling.
1241    // We spawn a watcher thread to handle timeout and periodic state.json updates.
1242    let timeout_ms = opts.timeout_ms;
1243    let kill_after_ms = opts.kill_after_ms;
1244    let progress_every_ms = opts.progress_every_ms;
1245    let state_path = job_dir.state_path();
1246    let job_id_str = job_id.to_string();
1247
1248    // Use an atomic flag to signal the watcher thread when the child has exited.
1249    use std::sync::atomic::{AtomicBool, Ordering};
1250    let child_done = Arc::new(AtomicBool::new(false));
1251
1252    let watcher = if timeout_ms > 0 || progress_every_ms > 0 {
1253        let state_path_clone = state_path.clone();
1254        let child_done_clone = Arc::clone(&child_done);
1255        Some(std::thread::spawn(move || {
1256            let start = std::time::Instant::now();
1257            let timeout_dur = if timeout_ms > 0 {
1258                Some(std::time::Duration::from_millis(timeout_ms))
1259            } else {
1260                None
1261            };
1262            let progress_dur = if progress_every_ms > 0 {
1263                Some(std::time::Duration::from_millis(progress_every_ms))
1264            } else {
1265                None
1266            };
1267
1268            let poll_interval = std::time::Duration::from_millis(100);
1269
1270            loop {
1271                std::thread::sleep(poll_interval);
1272
1273                // Exit the watcher loop if the child process has finished.
1274                if child_done_clone.load(Ordering::Relaxed) {
1275                    break;
1276                }
1277
1278                let elapsed = start.elapsed();
1279
1280                // Check for timeout.
1281                if let Some(td) = timeout_dur
1282                    && elapsed >= td
1283                {
1284                    info!(job_id = %job_id_str, "timeout reached, sending SIGTERM to process group");
1285                    // Send SIGTERM to the entire process group (negative PID).
1286                    // The child was placed in its own session/group via setsid.
1287                    #[cfg(unix)]
1288                    {
1289                        unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGTERM) };
1290                    }
1291                    // If kill_after > 0, wait kill_after ms then SIGKILL.
1292                    if kill_after_ms > 0 {
1293                        std::thread::sleep(std::time::Duration::from_millis(kill_after_ms));
1294                        info!(job_id = %job_id_str, "kill-after elapsed, sending SIGKILL to process group");
1295                        #[cfg(unix)]
1296                        {
1297                            unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1298                        }
1299                    } else {
1300                        // Immediate SIGKILL to the process group.
1301                        #[cfg(unix)]
1302                        {
1303                            unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1304                        }
1305                    }
1306                    break;
1307                }
1308
1309                // Progress-every: update updated_at periodically.
1310                if let Some(pd) = progress_dur {
1311                    let elapsed_ms = elapsed.as_millis() as u64;
1312                    let pd_ms = pd.as_millis() as u64;
1313                    let poll_ms = poll_interval.as_millis() as u64;
1314                    if elapsed_ms % pd_ms < poll_ms {
1315                        // Read, update updated_at, write back.
1316                        if let Ok(raw) = std::fs::read(&state_path_clone)
1317                            && let Ok(mut st) =
1318                                serde_json::from_slice::<crate::schema::JobState>(&raw)
1319                        {
1320                            st.updated_at = now_rfc3339();
1321                            if let Ok(s) = serde_json::to_string_pretty(&st) {
1322                                let _ = std::fs::write(&state_path_clone, s);
1323                            }
1324                        }
1325                    }
1326                }
1327            }
1328        }))
1329    } else {
1330        None
1331    };
1332
1333    // Wait for child to finish.
1334    let exit_status = child.wait().context("wait for child")?;
1335
1336    // Signal the watcher that the child has finished so it can exit its loop.
1337    child_done.store(true, Ordering::Relaxed);
1338
1339    // Persist terminal state immediately after the wrapped root process exits.
1340    // This must happen BEFORE joining log threads, because log threads block on
1341    // EOF of stdout/stderr pipes and descendant processes that inherited those
1342    // pipes may keep them open indefinitely. Persisting state here ensures that
1343    // `status` and `wait` can observe the terminal state without waiting for
1344    // all descendants to close their inherited handles.
1345    let duration_ms = child_start_time.elapsed().as_millis() as u64;
1346    let exit_code = exit_status.code();
1347    let finished_at = now_rfc3339();
1348
1349    // Detect signal-killed processes on Unix for accurate state and completion event.
1350    #[cfg(unix)]
1351    let (terminal_status, signal_name) = {
1352        use std::os::unix::process::ExitStatusExt;
1353        if let Some(sig) = exit_status.signal() {
1354            (JobStatus::Killed, Some(sig.to_string()))
1355        } else {
1356            (JobStatus::Exited, None)
1357        }
1358    };
1359    #[cfg(not(unix))]
1360    let (terminal_status, signal_name) = (JobStatus::Exited, None::<String>);
1361
1362    let state = JobState {
1363        job: JobStateJob {
1364            id: job_id.to_string(),
1365            status: terminal_status.clone(),
1366            started_at: Some(started_at.clone()),
1367        },
1368        result: JobStateResult {
1369            exit_code,
1370            signal: signal_name.clone(),
1371            duration_ms: Some(duration_ms),
1372        },
1373        pid: Some(pid),
1374        finished_at: Some(finished_at.clone()),
1375        updated_at: now_rfc3339(),
1376        windows_job_name: None, // not needed after process exits
1377    };
1378    job_dir.write_state(&state)?;
1379    info!(job_id, ?exit_code, "child process finished");
1380
1381    // Bounded join for log-reader threads.
1382    //
1383    // After the wrapped root process exits, we give log threads a short window
1384    // to drain any remaining output and fire output-match callbacks (which is
1385    // the common case: no descendants, pipe write-end closes promptly).
1386    //
1387    // If a thread does not complete within the window, it is detached (dropped).
1388    // Detaching means the thread will be killed when the supervisor process exits,
1389    // which is the correct trade-off: supervisor must not linger indefinitely
1390    // because a descendant holds an inherited pipe write-end open.
1391    const LOG_DRAIN_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000);
1392    let drain_deadline = std::time::Instant::now() + LOG_DRAIN_TIMEOUT;
1393
1394    let remaining = drain_deadline
1395        .checked_duration_since(std::time::Instant::now())
1396        .unwrap_or(std::time::Duration::ZERO);
1397    if rx_stdout_done.recv_timeout(remaining).is_ok() {
1398        let _ = t_stdout.join();
1399    } else {
1400        drop(t_stdout); // detach: descendant holds the pipe open
1401    }
1402
1403    let remaining = drain_deadline
1404        .checked_duration_since(std::time::Instant::now())
1405        .unwrap_or(std::time::Duration::ZERO);
1406    if rx_stderr_done.recv_timeout(remaining).is_ok() {
1407        let _ = t_stderr.join();
1408    } else {
1409        drop(t_stderr); // detach: descendant holds the pipe open
1410    }
1411
1412    // Join watcher if present; it exits promptly once child_done is set.
1413    if let Some(w) = watcher {
1414        let _ = w.join();
1415    }
1416
1417    // Reload the latest notification config from meta.json to pick up any post-creation
1418    // updates (e.g. from `notify set` invoked after the job was launched).
1419    let latest_notification = job_dir.read_meta().ok().and_then(|m| m.notification);
1420    let (current_notify_command, current_notify_file) = match &latest_notification {
1421        Some(n) => (n.notify_command.clone(), n.notify_file.clone()),
1422        None => (None, None),
1423    };
1424
1425    // Dispatch completion event to configured notification sinks.
1426    // Failure here must not alter job state (delivery result is recorded separately).
1427    let has_notification = current_notify_command.is_some() || current_notify_file.is_some();
1428    if has_notification {
1429        let stdout_log = job_dir.stdout_path().display().to_string();
1430        let stderr_log = job_dir.stderr_path().display().to_string();
1431        let event = crate::schema::CompletionEvent {
1432            schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1433            event_type: "job.finished".to_string(),
1434            job_id: job_id.to_string(),
1435            state: terminal_status.as_str().to_string(),
1436            command: meta.command.clone(),
1437            cwd: meta.cwd.clone(),
1438            started_at,
1439            finished_at,
1440            duration_ms: Some(duration_ms),
1441            exit_code,
1442            signal: signal_name,
1443            stdout_log_path: stdout_log,
1444            stderr_log_path: stderr_log,
1445        };
1446
1447        let event_json = serde_json::to_string(&event).unwrap_or_default();
1448        let event_path = job_dir.completion_event_path().display().to_string();
1449        let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
1450
1451        // Write initial completion_event.json before dispatching sinks.
1452        if let Err(e) =
1453            job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1454                event: event.clone(),
1455                delivery_results: vec![],
1456            })
1457        {
1458            warn!(job_id, error = %e, "failed to write initial completion_event.json");
1459        }
1460
1461        if let Some(ref shell_cmd) = current_notify_command {
1462            delivery_results.push(dispatch_command_sink(
1463                shell_cmd,
1464                &event_json,
1465                job_id,
1466                &event_path,
1467                &opts.shell_wrapper,
1468                "job.finished",
1469            ));
1470        }
1471        if let Some(ref file_path) = current_notify_file {
1472            delivery_results.push(dispatch_file_sink(file_path, &event_json));
1473        }
1474
1475        // Update completion_event.json with delivery results.
1476        if let Err(e) =
1477            job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1478                event,
1479                delivery_results,
1480            })
1481        {
1482            warn!(job_id, error = %e, "failed to update completion_event.json with delivery results");
1483        }
1484    }
1485
1486    Ok(())
1487}
1488
1489/// Dispatch the command sink: execute the shell command string via the configured shell wrapper,
1490/// pass event JSON via stdin, and set AGENT_EXEC_EVENT_PATH / AGENT_EXEC_JOB_ID /
1491/// AGENT_EXEC_EVENT_TYPE env vars.
1492///
1493/// The shell wrapper argv (e.g. `["sh", "-lc"]`) is provided by the caller.
1494/// The command string is appended as the final argument to the wrapper.
1495fn dispatch_command_sink(
1496    shell_cmd: &str,
1497    event_json: &str,
1498    job_id: &str,
1499    event_path: &str,
1500    shell_wrapper: &[String],
1501    event_type: &str,
1502) -> crate::schema::SinkDeliveryResult {
1503    use std::io::Write;
1504    let attempted_at = now_rfc3339();
1505    let target = shell_cmd.to_string();
1506
1507    if shell_cmd.trim().is_empty() {
1508        return crate::schema::SinkDeliveryResult {
1509            sink_type: "command".to_string(),
1510            target,
1511            success: false,
1512            error: Some("empty shell command".to_string()),
1513            attempted_at,
1514        };
1515    }
1516
1517    if shell_wrapper.is_empty() {
1518        return crate::schema::SinkDeliveryResult {
1519            sink_type: "command".to_string(),
1520            target,
1521            success: false,
1522            error: Some("shell wrapper must not be empty".to_string()),
1523            attempted_at,
1524        };
1525    }
1526
1527    let mut cmd = Command::new(&shell_wrapper[0]);
1528    cmd.args(&shell_wrapper[1..]).arg(shell_cmd);
1529
1530    cmd.env("AGENT_EXEC_EVENT_PATH", event_path);
1531    cmd.env("AGENT_EXEC_JOB_ID", job_id);
1532    cmd.env("AGENT_EXEC_EVENT_TYPE", event_type);
1533    cmd.stdin(std::process::Stdio::piped());
1534    cmd.stdout(std::process::Stdio::null());
1535    cmd.stderr(std::process::Stdio::null());
1536
1537    match cmd.spawn() {
1538        Ok(mut child) => {
1539            if let Some(mut stdin) = child.stdin.take() {
1540                let _ = stdin.write_all(event_json.as_bytes());
1541            }
1542            match child.wait() {
1543                Ok(status) if status.success() => crate::schema::SinkDeliveryResult {
1544                    sink_type: "command".to_string(),
1545                    target,
1546                    success: true,
1547                    error: None,
1548                    attempted_at,
1549                },
1550                Ok(status) => crate::schema::SinkDeliveryResult {
1551                    sink_type: "command".to_string(),
1552                    target,
1553                    success: false,
1554                    error: Some(format!("exited with status {status}")),
1555                    attempted_at,
1556                },
1557                Err(e) => crate::schema::SinkDeliveryResult {
1558                    sink_type: "command".to_string(),
1559                    target,
1560                    success: false,
1561                    error: Some(format!("wait error: {e}")),
1562                    attempted_at,
1563                },
1564            }
1565        }
1566        Err(e) => crate::schema::SinkDeliveryResult {
1567            sink_type: "command".to_string(),
1568            target,
1569            success: false,
1570            error: Some(format!("spawn error: {e}")),
1571            attempted_at,
1572        },
1573    }
1574}
1575
1576/// Dispatch the file sink: append event JSON as a single NDJSON line.
1577/// Creates parent directories automatically.
1578fn dispatch_file_sink(file_path: &str, event_json: &str) -> crate::schema::SinkDeliveryResult {
1579    use std::io::Write;
1580    let attempted_at = now_rfc3339();
1581    let path = std::path::Path::new(file_path);
1582
1583    if let Some(parent) = path.parent()
1584        && let Err(e) = std::fs::create_dir_all(parent)
1585    {
1586        return crate::schema::SinkDeliveryResult {
1587            sink_type: "file".to_string(),
1588            target: file_path.to_string(),
1589            success: false,
1590            error: Some(format!("create parent dir: {e}")),
1591            attempted_at,
1592        };
1593    }
1594
1595    match std::fs::OpenOptions::new()
1596        .create(true)
1597        .append(true)
1598        .open(path)
1599    {
1600        Ok(mut f) => match writeln!(f, "{event_json}") {
1601            Ok(_) => crate::schema::SinkDeliveryResult {
1602                sink_type: "file".to_string(),
1603                target: file_path.to_string(),
1604                success: true,
1605                error: None,
1606                attempted_at,
1607            },
1608            Err(e) => crate::schema::SinkDeliveryResult {
1609                sink_type: "file".to_string(),
1610                target: file_path.to_string(),
1611                success: false,
1612                error: Some(format!("write error: {e}")),
1613                attempted_at,
1614            },
1615        },
1616        Err(e) => crate::schema::SinkDeliveryResult {
1617            sink_type: "file".to_string(),
1618            target: file_path.to_string(),
1619            success: false,
1620            error: Some(format!("open error: {e}")),
1621            attempted_at,
1622        },
1623    }
1624}
1625
1626/// Public alias so other modules can call the timestamp helper.
1627pub fn now_rfc3339_pub() -> String {
1628    now_rfc3339()
1629}
1630
1631fn now_rfc3339() -> String {
1632    // Use a simple approach that works without chrono.
1633    let d = std::time::SystemTime::now()
1634        .duration_since(std::time::UNIX_EPOCH)
1635        .unwrap_or_default();
1636    format_rfc3339(d.as_secs())
1637}
1638
1639fn format_rfc3339(secs: u64) -> String {
1640    // Manual conversion of Unix timestamp to UTC date-time string.
1641    let mut s = secs;
1642    let seconds = s % 60;
1643    s /= 60;
1644    let minutes = s % 60;
1645    s /= 60;
1646    let hours = s % 24;
1647    s /= 24;
1648
1649    // Days since 1970-01-01
1650    let mut days = s;
1651    let mut year = 1970u64;
1652    loop {
1653        let days_in_year = if is_leap(year) { 366 } else { 365 };
1654        if days < days_in_year {
1655            break;
1656        }
1657        days -= days_in_year;
1658        year += 1;
1659    }
1660
1661    let leap = is_leap(year);
1662    let month_days: [u64; 12] = [
1663        31,
1664        if leap { 29 } else { 28 },
1665        31,
1666        30,
1667        31,
1668        30,
1669        31,
1670        31,
1671        30,
1672        31,
1673        30,
1674        31,
1675    ];
1676    let mut month = 0usize;
1677    for (i, &d) in month_days.iter().enumerate() {
1678        if days < d {
1679            month = i;
1680            break;
1681        }
1682        days -= d;
1683    }
1684    let day = days + 1;
1685
1686    format!(
1687        "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
1688        year,
1689        month + 1,
1690        day,
1691        hours,
1692        minutes,
1693        seconds
1694    )
1695}
1696
1697fn is_leap(year: u64) -> bool {
1698    (year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
1699}
1700
1701/// Windows-only: create a named Job Object and assign the given child process
1702/// to it so that the entire process tree can be terminated via `kill`.
1703///
1704/// The Job Object is named `"AgentExec-{job_id}"`. This name is stored in
1705/// `state.json` so that future `kill` invocations can open the same Job Object
1706/// by name and call `TerminateJobObject` to stop the whole tree.
1707///
1708/// Returns `Ok(name)` on success.  Returns `Err` on failure — the caller
1709/// (`supervise`) treats failure as a fatal error because reliable process-tree
1710/// management is a Windows MUST requirement (design.md).
1711#[cfg(windows)]
1712fn assign_to_job_object(job_id: &str, pid: u32) -> Result<String> {
1713    use windows::Win32::Foundation::CloseHandle;
1714    use windows::Win32::System::JobObjects::{AssignProcessToJobObject, CreateJobObjectW};
1715    use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
1716    use windows::core::HSTRING;
1717
1718    let job_name = format!("AgentExec-{job_id}");
1719    let hname = HSTRING::from(job_name.as_str());
1720
1721    unsafe {
1722        // Open the child process handle (needed for AssignProcessToJobObject).
1723        let proc_handle =
1724            OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid).map_err(|e| {
1725                anyhow::anyhow!(
1726                    "supervisor: OpenProcess(pid={pid}) failed — cannot assign to Job Object: {e}"
1727                )
1728            })?;
1729
1730        // Create a named Job Object.
1731        let job = match CreateJobObjectW(None, &hname) {
1732            Ok(h) => h,
1733            Err(e) => {
1734                let _ = CloseHandle(proc_handle);
1735                return Err(anyhow::anyhow!(
1736                    "supervisor: CreateJobObjectW({job_name}) failed: {e}"
1737                ));
1738            }
1739        };
1740
1741        // Assign the child process to the Job Object.
1742        // This can fail if the process is already in another job (e.g. CI/nested).
1743        // Per design.md, assignment is a MUST on Windows — failure is a fatal error.
1744        if let Err(e) = AssignProcessToJobObject(job, proc_handle) {
1745            let _ = CloseHandle(job);
1746            let _ = CloseHandle(proc_handle);
1747            return Err(anyhow::anyhow!(
1748                "supervisor: AssignProcessToJobObject(pid={pid}) failed \
1749                 (process may already belong to another Job Object, e.g. in a CI environment): {e}"
1750            ));
1751        }
1752
1753        // Keep job handle open for the lifetime of the supervisor so the Job
1754        // Object remains valid. We intentionally do NOT close it here.
1755        // The OS will close it automatically when the supervisor exits.
1756        // (We close proc_handle since we only needed it for assignment.)
1757        let _ = CloseHandle(proc_handle);
1758        // Note: job handle is intentionally leaked here to keep the Job Object alive.
1759        // The handle will be closed when the supervisor process exits.
1760        std::mem::forget(job);
1761    }
1762
1763    info!(job_id, name = %job_name, "supervisor: child assigned to Job Object");
1764    Ok(job_name)
1765}
1766
1767#[cfg(test)]
1768mod tests {
1769    use super::*;
1770
1771    #[test]
1772    fn rfc3339_epoch() {
1773        assert_eq!(format_rfc3339(0), "1970-01-01T00:00:00Z");
1774    }
1775
1776    #[test]
1777    fn rfc3339_known_date() {
1778        // 2024-01-01T00:00:00Z = 1704067200
1779        assert_eq!(format_rfc3339(1704067200), "2024-01-01T00:00:00Z");
1780    }
1781}