Skip to main content

agent_exec/
run.rs

1//! Implementation of the `run` sub-command.
2//!
3//! Design decisions (from design.md):
4//! - `run` spawns a short-lived front-end that forks a `_supervise` child.
5//! - The supervisor continues logging stdout/stderr after `run` returns.
6//! - If `--snapshot-after` elapses before `run` returns, a snapshot is
7//!   included in the JSON response.
8
9use anyhow::{Context, Result};
10use std::path::Path;
11use std::process::Command;
12use tracing::{debug, info, warn};
13use ulid::Ulid;
14
15use crate::jobstore::{JobDir, resolve_root};
16use crate::schema::{
17    JobMeta, JobMetaJob, JobState, JobStateJob, JobStateResult, JobStatus, Response, RunData,
18    Snapshot,
19};
20use crate::tag::dedup_tags;
21
22/// Options for the `run` sub-command.
23///
24/// # Definition-time option alignment rule
25///
26/// Every definition-time option accepted here MUST also be accepted by `create` (and vice versa),
27/// since both commands write the same persisted job definition to `meta.json`. When adding a
28/// new persisted metadata field, wire it through both `run` and `create` unless the spec
29/// explicitly documents it as launch-only (e.g. snapshot timing, tail sizing, --wait).
30#[derive(Debug)]
31pub struct RunOpts<'a> {
32    /// Command and arguments to execute.
33    pub command: Vec<String>,
34    /// Override for jobs root directory.
35    pub root: Option<&'a str>,
36    /// Milliseconds to wait before returning; 0 = return immediately.
37    pub snapshot_after: u64,
38    /// Number of tail lines to include in snapshot.
39    pub tail_lines: u64,
40    /// Max bytes for tail.
41    pub max_bytes: u64,
42    /// Timeout in milliseconds; 0 = no timeout.
43    pub timeout_ms: u64,
44    /// Milliseconds after SIGTERM before SIGKILL; 0 = immediate SIGKILL.
45    pub kill_after_ms: u64,
46    /// Working directory for the command.
47    pub cwd: Option<&'a str>,
48    /// Environment variables as KEY=VALUE strings.
49    pub env_vars: Vec<String>,
50    /// Paths to env files, applied in order.
51    pub env_files: Vec<String>,
52    /// Whether to inherit the current process environment (default: true).
53    pub inherit_env: bool,
54    /// Keys to mask in JSON output (values replaced with "***").
55    pub mask: Vec<String>,
56    /// User-defined tags for this job (deduplicated preserving first-seen order).
57    pub tags: Vec<String>,
58    /// Override full.log path; None = use job dir.
59    pub log: Option<&'a str>,
60    /// Interval (ms) for state.json updated_at refresh; 0 = disabled.
61    pub progress_every_ms: u64,
62    /// If true, wait for the job to reach a terminal state before returning.
63    /// The response will include exit_code, finished_at, and final_snapshot.
64    pub wait: bool,
65    /// Poll interval in milliseconds when `wait` is true.
66    pub wait_poll_ms: u64,
67    /// Shell command string for command notification sink; executed via platform shell.
68    /// None = no command sink.
69    pub notify_command: Option<String>,
70    /// File path for NDJSON notification sink; None = no file sink.
71    pub notify_file: Option<String>,
72    /// Pattern to match against output lines (output-match notification).
73    pub output_pattern: Option<String>,
74    /// Match type for output-match: "contains" or "regex".
75    pub output_match_type: Option<String>,
76    /// Stream selector: "stdout", "stderr", or "either".
77    pub output_stream: Option<String>,
78    /// Shell command string for output-match command sink.
79    pub output_command: Option<String>,
80    /// File path for output-match NDJSON file sink.
81    pub output_file: Option<String>,
82    /// Resolved shell wrapper argv used to execute command strings.
83    /// e.g. `["sh", "-lc"]` or `["bash", "-lc"]`.
84    pub shell_wrapper: Vec<String>,
85}
86
87impl<'a> Default for RunOpts<'a> {
88    fn default() -> Self {
89        RunOpts {
90            command: vec![],
91            root: None,
92            snapshot_after: 10_000,
93            tail_lines: 50,
94            max_bytes: 65536,
95            timeout_ms: 0,
96            kill_after_ms: 0,
97            cwd: None,
98            env_vars: vec![],
99            env_files: vec![],
100            inherit_env: true,
101            mask: vec![],
102            tags: vec![],
103            log: None,
104            progress_every_ms: 0,
105            wait: false,
106            wait_poll_ms: 200,
107            notify_command: None,
108            notify_file: None,
109            output_pattern: None,
110            output_match_type: None,
111            output_stream: None,
112            output_command: None,
113            output_file: None,
114            shell_wrapper: crate::config::default_shell_wrapper(),
115        }
116    }
117}
118
119/// Maximum allowed value for `snapshot_after` in milliseconds (10 seconds).
120const MAX_SNAPSHOT_AFTER_MS: u64 = 10_000;
121
122/// Parameters for spawning a supervisor process.
123///
124/// Shared by `run::execute` and `start::execute`.
125pub struct SpawnSupervisorParams {
126    pub job_id: String,
127    pub root: std::path::PathBuf,
128    pub full_log_path: String,
129    pub timeout_ms: u64,
130    pub kill_after_ms: u64,
131    pub cwd: Option<String>,
132    /// Real (unmasked) KEY=VALUE env var pairs.
133    pub env_vars: Vec<String>,
134    pub env_files: Vec<String>,
135    pub inherit_env: bool,
136    pub progress_every_ms: u64,
137    pub notify_command: Option<String>,
138    pub notify_file: Option<String>,
139    pub shell_wrapper: Vec<String>,
140    pub command: Vec<String>,
141}
142
143/// Spawn the supervisor process and write the initial running state to `state.json`.
144///
145/// Returns the supervisor PID and the actual `started_at` timestamp.
146/// Also handles the Windows Job Object handshake before returning.
147pub fn spawn_supervisor_process(
148    job_dir: &JobDir,
149    params: SpawnSupervisorParams,
150) -> Result<(u32, String)> {
151    let started_at = now_rfc3339();
152
153    let exe = std::env::current_exe().context("resolve current exe")?;
154    let mut supervisor_cmd = Command::new(&exe);
155    supervisor_cmd
156        .arg("_supervise")
157        .arg("--job-id")
158        .arg(&params.job_id)
159        .arg("--supervise-root")
160        .arg(params.root.display().to_string())
161        .arg("--full-log")
162        .arg(&params.full_log_path);
163
164    if params.timeout_ms > 0 {
165        supervisor_cmd
166            .arg("--timeout")
167            .arg(params.timeout_ms.to_string());
168    }
169    if params.kill_after_ms > 0 {
170        supervisor_cmd
171            .arg("--kill-after")
172            .arg(params.kill_after_ms.to_string());
173    }
174    if let Some(ref cwd) = params.cwd {
175        supervisor_cmd.arg("--cwd").arg(cwd);
176    }
177    for env_file in &params.env_files {
178        supervisor_cmd.arg("--env-file").arg(env_file);
179    }
180    for env_var in &params.env_vars {
181        supervisor_cmd.arg("--env").arg(env_var);
182    }
183    if !params.inherit_env {
184        supervisor_cmd.arg("--no-inherit-env");
185    }
186    if params.progress_every_ms > 0 {
187        supervisor_cmd
188            .arg("--progress-every")
189            .arg(params.progress_every_ms.to_string());
190    }
191    if let Some(ref nc) = params.notify_command {
192        supervisor_cmd.arg("--notify-command").arg(nc);
193    }
194    if let Some(ref nf) = params.notify_file {
195        supervisor_cmd.arg("--notify-file").arg(nf);
196    }
197    let wrapper_json =
198        serde_json::to_string(&params.shell_wrapper).context("serialize shell wrapper")?;
199    supervisor_cmd
200        .arg("--shell-wrapper-resolved")
201        .arg(&wrapper_json);
202
203    supervisor_cmd
204        .arg("--")
205        .args(&params.command)
206        .stdin(std::process::Stdio::null())
207        .stdout(std::process::Stdio::null())
208        .stderr(std::process::Stdio::null());
209
210    let supervisor = supervisor_cmd.spawn().context("spawn supervisor")?;
211    let supervisor_pid = supervisor.id();
212    debug!(supervisor_pid, "supervisor spawned");
213
214    // Write initial running state.
215    job_dir.init_state(supervisor_pid, &started_at)?;
216
217    // Windows Job Object handshake.
218    #[cfg(windows)]
219    {
220        let handshake_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
221        loop {
222            std::thread::sleep(std::time::Duration::from_millis(10));
223            if let Ok(current_state) = job_dir.read_state() {
224                let supervisor_updated = current_state
225                    .pid
226                    .map(|p| p != supervisor_pid)
227                    .unwrap_or(false)
228                    || *current_state.status() == crate::schema::JobStatus::Failed;
229                if supervisor_updated {
230                    if *current_state.status() == crate::schema::JobStatus::Failed {
231                        anyhow::bail!(
232                            "supervisor failed to assign child process to Job Object \
233                             (Windows MUST requirement); see stderr for details"
234                        );
235                    }
236                    debug!("supervisor confirmed Job Object assignment via state.json handshake");
237                    break;
238                }
239            }
240            if std::time::Instant::now() >= handshake_deadline {
241                debug!("supervisor handshake timed out; proceeding with initial state");
242                break;
243            }
244        }
245    }
246
247    Ok((supervisor_pid, started_at))
248}
249
250/// Pre-create empty log files (stdout.log, stderr.log, full.log) so they exist
251/// immediately after job creation, before the supervisor starts writing.
252pub fn pre_create_log_files(job_dir: &JobDir) -> Result<()> {
253    for log_path in [
254        job_dir.stdout_path(),
255        job_dir.stderr_path(),
256        job_dir.full_log_path(),
257    ] {
258        std::fs::OpenOptions::new()
259            .create(true)
260            .append(true)
261            .open(&log_path)
262            .with_context(|| format!("pre-create log file {}", log_path.display()))?;
263    }
264    Ok(())
265}
266
267/// Options controlling the snapshot/wait phase after job launch.
268pub struct SnapshotWaitOpts {
269    pub snapshot_after: u64,
270    pub tail_lines: u64,
271    pub max_bytes: u64,
272    pub wait: bool,
273    pub wait_poll_ms: u64,
274}
275
276/// Run the snapshot/wait phase and return (final_state_str, exit_code, finished_at,
277/// snapshot, final_snapshot, waited_ms).
278pub fn run_snapshot_wait(
279    job_dir: &JobDir,
280    opts: &SnapshotWaitOpts,
281) -> (
282    String,
283    Option<i32>,
284    Option<String>,
285    Option<Snapshot>,
286    Option<Snapshot>,
287    u64,
288) {
289    use crate::schema::JobStatus;
290
291    let effective_snapshot_after = if opts.wait {
292        0
293    } else {
294        opts.snapshot_after.min(MAX_SNAPSHOT_AFTER_MS)
295    };
296
297    let wait_start = std::time::Instant::now();
298
299    let snapshot = if effective_snapshot_after > 0 {
300        debug!(ms = effective_snapshot_after, "polling for snapshot");
301        let deadline = wait_start + std::time::Duration::from_millis(effective_snapshot_after);
302        let poll_interval = std::time::Duration::from_millis(15);
303        loop {
304            std::thread::sleep(poll_interval);
305            if let Ok(st) = job_dir.read_state()
306                && !st.status().is_non_terminal()
307            {
308                debug!("snapshot poll: job no longer running/created, exiting early");
309                break;
310            }
311            if std::time::Instant::now() >= deadline {
312                debug!("snapshot poll: deadline reached");
313                break;
314            }
315        }
316        Some(build_snapshot(job_dir, opts.tail_lines, opts.max_bytes))
317    } else {
318        None
319    };
320
321    let (final_state, exit_code_opt, finished_at_opt, final_snapshot_opt) = if opts.wait {
322        debug!("--wait: polling for terminal state");
323        let poll = std::time::Duration::from_millis(opts.wait_poll_ms.max(1));
324        loop {
325            std::thread::sleep(poll);
326            if let Ok(st) = job_dir.read_state()
327                && !st.status().is_non_terminal()
328            {
329                let snap = build_snapshot(job_dir, opts.tail_lines, opts.max_bytes);
330                let ec = st.exit_code();
331                let fa = st.finished_at.clone();
332                let state_str = st.status().as_str().to_string();
333                break (state_str, ec, fa, Some(snap));
334            }
335        }
336    } else {
337        (JobStatus::Running.as_str().to_string(), None, None, None)
338    };
339
340    let waited_ms = wait_start.elapsed().as_millis() as u64;
341    (
342        final_state,
343        exit_code_opt,
344        finished_at_opt,
345        snapshot,
346        final_snapshot_opt,
347        waited_ms,
348    )
349}
350
351/// Execute `run`: spawn job, possibly wait for snapshot, return JSON.
352pub fn execute(opts: RunOpts) -> Result<()> {
353    if opts.command.is_empty() {
354        anyhow::bail!("no command specified for run");
355    }
356
357    let elapsed_start = std::time::Instant::now();
358
359    let root = resolve_root(opts.root);
360    std::fs::create_dir_all(&root)
361        .with_context(|| format!("create jobs root {}", root.display()))?;
362
363    let job_id = Ulid::new().to_string();
364    let created_at = now_rfc3339();
365
366    // Extract only the key names from KEY=VALUE env var strings (values are not persisted).
367    let env_keys: Vec<String> = opts
368        .env_vars
369        .iter()
370        .map(|kv| kv.split('=').next().unwrap_or(kv.as_str()).to_string())
371        .collect();
372
373    // Apply masking: replace values of masked keys with "***" in env_vars for metadata.
374    let masked_env_vars = mask_env_vars(&opts.env_vars, &opts.mask);
375
376    // Resolve the effective working directory for this job.
377    // If --cwd was specified, use that path; otherwise use the current process's working directory.
378    // Canonicalize the path for consistent comparison; fall back to absolute path on failure.
379    let effective_cwd = resolve_effective_cwd(opts.cwd);
380
381    // Build output-match config from definition-time options (same logic as `create` and `notify set`).
382    let on_output_match = crate::notify::build_output_match_config(
383        opts.output_pattern,
384        opts.output_match_type,
385        opts.output_stream,
386        opts.output_command,
387        opts.output_file,
388        None,
389    );
390
391    let notification =
392        if opts.notify_command.is_some() || opts.notify_file.is_some() || on_output_match.is_some()
393        {
394            Some(crate::schema::NotificationConfig {
395                notify_command: opts.notify_command.clone(),
396                notify_file: opts.notify_file.clone(),
397                on_output_match,
398            })
399        } else {
400            None
401        };
402
403    // Validate and deduplicate tags (preserving first-seen order).
404    let tags = dedup_tags(opts.tags)?;
405
406    let meta = JobMeta {
407        job: JobMetaJob { id: job_id.clone() },
408        schema_version: crate::schema::SCHEMA_VERSION.to_string(),
409        command: opts.command.clone(),
410        created_at: created_at.clone(),
411        root: root.display().to_string(),
412        env_keys,
413        env_vars: masked_env_vars.clone(),
414        // For `run`, env_vars_runtime is not populated because the supervisor
415        // is spawned immediately with the real values; no deferred start needed.
416        env_vars_runtime: vec![],
417        mask: opts.mask.clone(),
418        cwd: Some(effective_cwd),
419        notification,
420        // Execution-definition fields (used by start if ever applicable).
421        inherit_env: opts.inherit_env,
422        env_files: opts.env_files.clone(),
423        timeout_ms: opts.timeout_ms,
424        kill_after_ms: opts.kill_after_ms,
425        progress_every_ms: opts.progress_every_ms,
426        shell_wrapper: Some(opts.shell_wrapper.clone()),
427        tags: tags.clone(),
428    };
429
430    let job_dir = JobDir::create(&root, &job_id, &meta)?;
431    info!(job_id = %job_id, "created job directory");
432
433    // Determine the full.log path (may be overridden by --log).
434    let full_log_path = if let Some(log) = opts.log {
435        log.to_string()
436    } else {
437        job_dir.full_log_path().display().to_string()
438    };
439
440    // Pre-create empty log files so they exist before the supervisor starts.
441    pre_create_log_files(&job_dir)?;
442
443    // Spawn the supervisor using the shared helper.
444    // Note: masking is handled by `run` (meta.json + JSON response). The supervisor
445    // receives the real env var values so the child process can use them as intended.
446    let (_supervisor_pid, _started_at) = spawn_supervisor_process(
447        &job_dir,
448        SpawnSupervisorParams {
449            job_id: job_id.clone(),
450            root: root.clone(),
451            full_log_path: full_log_path.clone(),
452            timeout_ms: opts.timeout_ms,
453            kill_after_ms: opts.kill_after_ms,
454            cwd: opts.cwd.map(|s| s.to_string()),
455            env_vars: opts.env_vars.clone(),
456            env_files: opts.env_files.clone(),
457            inherit_env: opts.inherit_env,
458            progress_every_ms: opts.progress_every_ms,
459            notify_command: opts.notify_command.clone(),
460            notify_file: opts.notify_file.clone(),
461            shell_wrapper: opts.shell_wrapper.clone(),
462            command: opts.command.clone(),
463        },
464    )?;
465
466    // Compute absolute paths for stdout.log and stderr.log.
467    let stdout_log_path = job_dir.stdout_path().display().to_string();
468    let stderr_log_path = job_dir.stderr_path().display().to_string();
469
470    let (final_state, exit_code_opt, finished_at_opt, snapshot, final_snapshot_opt, waited_ms) =
471        run_snapshot_wait(
472            &job_dir,
473            &SnapshotWaitOpts {
474                snapshot_after: opts.snapshot_after,
475                tail_lines: opts.tail_lines,
476                max_bytes: opts.max_bytes,
477                wait: opts.wait,
478                wait_poll_ms: opts.wait_poll_ms,
479            },
480        );
481
482    let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
483
484    let response = Response::new(
485        "run",
486        RunData {
487            job_id,
488            state: final_state,
489            tags,
490            // Include masked env_vars in the JSON response so callers can inspect
491            // which variables were set (with secret values replaced by "***").
492            env_vars: masked_env_vars,
493            snapshot,
494            stdout_log_path,
495            stderr_log_path,
496            waited_ms,
497            elapsed_ms,
498            exit_code: exit_code_opt,
499            finished_at: finished_at_opt,
500            final_snapshot: final_snapshot_opt,
501        },
502    );
503    response.print();
504    Ok(())
505}
506
507fn build_snapshot(job_dir: &JobDir, tail_lines: u64, max_bytes: u64) -> Snapshot {
508    let stdout = job_dir.read_tail_metrics("stdout.log", tail_lines, max_bytes);
509    let stderr = job_dir.read_tail_metrics("stderr.log", tail_lines, max_bytes);
510    Snapshot {
511        truncated: stdout.truncated || stderr.truncated,
512        encoding: "utf-8-lossy".to_string(),
513        stdout_observed_bytes: stdout.observed_bytes,
514        stderr_observed_bytes: stderr.observed_bytes,
515        stdout_included_bytes: stdout.included_bytes,
516        stderr_included_bytes: stderr.included_bytes,
517        stdout_tail: stdout.tail,
518        stderr_tail: stderr.tail,
519    }
520}
521
522/// Options for the `_supervise` internal sub-command.
523///
524/// Masking is the responsibility of `run` (which writes masked values to meta.json
525/// and includes them in the JSON response). The supervisor only needs the real
526/// environment variable values to launch the child process correctly.
527#[derive(Debug)]
528pub struct SuperviseOpts<'a> {
529    pub job_id: &'a str,
530    pub root: &'a Path,
531    pub command: &'a [String],
532    /// Override full.log path; None = use job dir default.
533    pub full_log: Option<&'a str>,
534    /// Timeout in milliseconds; 0 = no timeout.
535    pub timeout_ms: u64,
536    /// Milliseconds after SIGTERM before SIGKILL; 0 = immediate SIGKILL.
537    pub kill_after_ms: u64,
538    /// Working directory for the child process.
539    pub cwd: Option<&'a str>,
540    /// Environment variables as KEY=VALUE strings (real values, not masked).
541    pub env_vars: Vec<String>,
542    /// Paths to env files, applied in order.
543    pub env_files: Vec<String>,
544    /// Whether to inherit the current process environment.
545    pub inherit_env: bool,
546    /// Interval (ms) for state.json updated_at refresh; 0 = disabled.
547    pub progress_every_ms: u64,
548    /// Shell command string for command notification sink; executed via platform shell.
549    /// None = no command sink.
550    pub notify_command: Option<String>,
551    /// File path for NDJSON notification sink; None = no file sink.
552    pub notify_file: Option<String>,
553    /// Resolved shell wrapper argv used to execute command strings.
554    pub shell_wrapper: Vec<String>,
555}
556
557/// Resolve the effective working directory for a job.
558///
559/// If `cwd_override` is `Some`, use that path as the base. Otherwise use the
560/// current process working directory. In either case, attempt to canonicalize
561/// the path for consistent comparison; on failure, fall back to the absolute
562/// path representation (avoids symlink / permission issues on some systems).
563pub fn resolve_effective_cwd(cwd_override: Option<&str>) -> String {
564    let base = match cwd_override {
565        Some(p) => std::path::PathBuf::from(p),
566        None => std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")),
567    };
568
569    // Prefer canonicalized (resolves symlinks); fall back to making the path absolute.
570    match base.canonicalize() {
571        Ok(canonical) => canonical.display().to_string(),
572        Err(_) => {
573            // If base is already absolute, use as-is; otherwise prepend cwd.
574            if base.is_absolute() {
575                base.display().to_string()
576            } else {
577                // Best-effort: join with cwd, ignore errors.
578                let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
579                cwd.join(base).display().to_string()
580            }
581        }
582    }
583}
584
585/// Mask the values of specified keys in a list of KEY=VALUE strings.
586/// Keys listed in `mask_keys` will have their value replaced with "***".
587pub fn mask_env_vars(env_vars: &[String], mask_keys: &[String]) -> Vec<String> {
588    if mask_keys.is_empty() {
589        return env_vars.to_vec();
590    }
591    env_vars
592        .iter()
593        .map(|s| {
594            let (key, _val) = parse_env_var(s);
595            if mask_keys.iter().any(|k| k == &key) {
596                format!("{key}=***")
597            } else {
598                s.clone()
599            }
600        })
601        .collect()
602}
603
604/// Parse a single KEY=VALUE or KEY= string into (key, value).
605fn parse_env_var(s: &str) -> (String, String) {
606    if let Some(pos) = s.find('=') {
607        (s[..pos].to_string(), s[pos + 1..].to_string())
608    } else {
609        (s.to_string(), String::new())
610    }
611}
612
613/// Load environment variables from a .env-style file.
614/// Supports KEY=VALUE lines; lines starting with '#' and empty lines are ignored.
615fn load_env_file(path: &str) -> Result<Vec<(String, String)>> {
616    let contents =
617        std::fs::read_to_string(path).with_context(|| format!("read env-file {path}"))?;
618    let mut vars = Vec::new();
619    for line in contents.lines() {
620        let line = line.trim();
621        if line.is_empty() || line.starts_with('#') {
622            continue;
623        }
624        vars.push(parse_env_var(line));
625    }
626    Ok(vars)
627}
628
629/// Shared state for output-match checking, used by streaming threads in `supervise`.
630///
631/// Reloads `meta.json` on every observed line so that a `notify set` update is
632/// visible for the very next line, regardless of how recently the last reload
633/// occurred.  Multiple streaming threads share the same checker via `Arc`; the
634/// internal `Mutex` serialises access.
635struct OutputMatchChecker {
636    job_dir_path: std::path::PathBuf,
637    shell_wrapper: Vec<String>,
638    inner: std::sync::Mutex<OutputMatchInner>,
639}
640
641struct OutputMatchInner {
642    config: Option<crate::schema::NotificationConfig>,
643}
644
645impl OutputMatchChecker {
646    fn new(
647        job_dir_path: std::path::PathBuf,
648        shell_wrapper: Vec<String>,
649        initial_config: Option<crate::schema::NotificationConfig>,
650    ) -> Self {
651        Self {
652            job_dir_path,
653            shell_wrapper,
654            inner: std::sync::Mutex::new(OutputMatchInner {
655                config: initial_config,
656            }),
657        }
658    }
659
660    /// Check a newly observed output line for a configured match.
661    ///
662    /// Reloads `meta.json` on every call so that `notify set` updates are
663    /// visible for the next line without any delay.
664    /// Dispatches `job.output.matched` events outside the lock to avoid blocking
665    /// other streaming threads.
666    fn check_line(&self, line: &str, stream: &str) {
667        use crate::schema::{OutputMatchStream, OutputMatchType};
668
669        // Lock, reload, evaluate match, then release before dispatching.
670        let match_info: Option<crate::schema::OutputMatchConfig> = {
671            let mut inner = self.inner.lock().unwrap();
672
673            // Reload config on every line to pick up `notify set` updates immediately.
674            {
675                let meta_path = self.job_dir_path.join("meta.json");
676                if let Ok(raw) = std::fs::read(&meta_path)
677                    && let Ok(meta) = serde_json::from_slice::<crate::schema::JobMeta>(&raw)
678                {
679                    inner.config = meta.notification;
680                }
681            }
682
683            let Some(ref notification) = inner.config else {
684                return;
685            };
686            let Some(ref match_cfg) = notification.on_output_match else {
687                return;
688            };
689
690            // Check stream filter.
691            let stream_matches = match match_cfg.stream {
692                OutputMatchStream::Stdout => stream == "stdout",
693                OutputMatchStream::Stderr => stream == "stderr",
694                OutputMatchStream::Either => true,
695            };
696            if !stream_matches {
697                return;
698            }
699
700            // Check pattern.
701            let matched = match &match_cfg.match_type {
702                OutputMatchType::Contains => line.contains(&match_cfg.pattern),
703                OutputMatchType::Regex => regex::Regex::new(&match_cfg.pattern)
704                    .map(|re| re.is_match(line))
705                    .unwrap_or(false),
706            };
707
708            if matched {
709                Some(match_cfg.clone())
710            } else {
711                None
712            }
713        }; // Lock released.
714
715        if let Some(match_cfg) = match_info {
716            self.dispatch_match(line, stream, &match_cfg);
717        }
718    }
719
720    /// Dispatch a `job.output.matched` event and append a delivery record to
721    /// `notification_events.ndjson`.  Failures are non-fatal.
722    fn dispatch_match(
723        &self,
724        line: &str,
725        stream: &str,
726        match_cfg: &crate::schema::OutputMatchConfig,
727    ) {
728        use std::io::Write;
729
730        let job_id = self
731            .job_dir_path
732            .file_name()
733            .and_then(|n| n.to_str())
734            .unwrap_or("unknown");
735
736        let stdout_log_path = self.job_dir_path.join("stdout.log").display().to_string();
737        let stderr_log_path = self.job_dir_path.join("stderr.log").display().to_string();
738        let events_path = self.job_dir_path.join("notification_events.ndjson");
739        let events_path_str = events_path.display().to_string();
740
741        let match_type_str = match &match_cfg.match_type {
742            crate::schema::OutputMatchType::Contains => "contains",
743            crate::schema::OutputMatchType::Regex => "regex",
744        };
745
746        let event = crate::schema::OutputMatchEvent {
747            schema_version: crate::schema::SCHEMA_VERSION.to_string(),
748            event_type: "job.output.matched".to_string(),
749            job_id: job_id.to_string(),
750            pattern: match_cfg.pattern.clone(),
751            match_type: match_type_str.to_string(),
752            stream: stream.to_string(),
753            line: line.to_string(),
754            stdout_log_path,
755            stderr_log_path,
756        };
757
758        let event_json = serde_json::to_string(&event).unwrap_or_default();
759        let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
760
761        if let Some(ref cmd) = match_cfg.command {
762            delivery_results.push(dispatch_command_sink(
763                cmd,
764                &event_json,
765                job_id,
766                &events_path_str,
767                &self.shell_wrapper,
768                "job.output.matched",
769            ));
770        }
771        if let Some(ref file_path) = match_cfg.file {
772            delivery_results.push(dispatch_file_sink(file_path, &event_json));
773        }
774
775        // Append delivery record to notification_events.ndjson.
776        let record = crate::schema::OutputMatchEventRecord {
777            event,
778            delivery_results,
779        };
780        if let Ok(record_json) = serde_json::to_string(&record)
781            && let Ok(mut f) = std::fs::OpenOptions::new()
782                .create(true)
783                .append(true)
784                .open(&events_path)
785        {
786            let _ = writeln!(f, "{record_json}");
787        }
788    }
789}
790
791/// Stream bytes from a child process output pipe to an individual log file and
792/// to the shared `full.log`.
793///
794/// Reads byte chunks (not lines) so that output without a trailing newline is
795/// still captured in the individual log immediately.  The `full.log` format
796/// `"<RFC3339> [LABEL] <line>"` is maintained via a line-accumulation buffer:
797/// bytes are appended to the buffer until a newline is found, at which point a
798/// formatted line is written to `full.log`.  Any remaining bytes at EOF are
799/// flushed as a final line.
800///
801/// The optional `on_line` callback is invoked for each complete line (without
802/// the trailing newline) and is used to drive output-match checking.
803///
804/// This helper is used by both the stdout and stderr monitoring threads inside
805/// [`supervise`], replacing the previously duplicated per-stream implementations.
806/// Buffer size (8192 bytes) and newline-split logic are preserved unchanged.
807fn stream_to_logs<R, F>(
808    stream: R,
809    log_path: &std::path::Path,
810    full_log: std::sync::Arc<std::sync::Mutex<std::fs::File>>,
811    label: &str,
812    on_line: Option<F>,
813) where
814    R: std::io::Read,
815    F: Fn(&str),
816{
817    use std::io::Write;
818    let mut log_file = std::fs::File::create(log_path).expect("create stream log file in thread");
819    let mut stream = stream;
820    let mut buf = [0u8; 8192];
821    // Incomplete-line buffer for full.log formatting.
822    let mut line_buf: Vec<u8> = Vec::new();
823    loop {
824        match stream.read(&mut buf) {
825            Ok(0) => break, // EOF
826            Ok(n) => {
827                let chunk = &buf[..n];
828                // Write raw bytes to the individual log (captures partial lines too).
829                let _ = log_file.write_all(chunk);
830                // Accumulate bytes for full.log line formatting.
831                for &b in chunk {
832                    if b == b'\n' {
833                        let line = String::from_utf8_lossy(&line_buf);
834                        if let Ok(mut fl) = full_log.lock() {
835                            let ts = now_rfc3339();
836                            let _ = writeln!(fl, "{ts} [{label}] {line}");
837                        }
838                        if let Some(ref f) = on_line {
839                            f(&line);
840                        }
841                        line_buf.clear();
842                    } else {
843                        line_buf.push(b);
844                    }
845                }
846            }
847            Err(_) => break,
848        }
849    }
850    // Flush any remaining incomplete line to full.log and trigger callback.
851    if !line_buf.is_empty() {
852        let line = String::from_utf8_lossy(&line_buf);
853        if let Ok(mut fl) = full_log.lock() {
854            let ts = now_rfc3339();
855            let _ = writeln!(fl, "{ts} [{label}] {line}");
856        }
857        if let Some(ref f) = on_line {
858            f(&line);
859        }
860    }
861}
862
863/// Internal supervisor sub-command.
864///
865/// Runs the target command, streams stdout/stderr to individual log files
866/// (`stdout.log`, `stderr.log`) **and** to the combined `full.log`, then
867/// updates `state.json` when the process finishes.
868///
869/// On Windows, the child process is assigned to a named Job Object so that
870/// the entire process tree can be terminated with a single `kill` call.
871/// The Job Object name is recorded in `state.json` as `windows_job_name`.
872pub fn supervise(opts: SuperviseOpts) -> Result<()> {
873    use std::sync::{Arc, Mutex};
874
875    let job_id = opts.job_id;
876    let root = opts.root;
877    let command = opts.command;
878
879    if command.is_empty() {
880        anyhow::bail!("supervisor: no command");
881    }
882
883    let job_dir = JobDir::open(root, job_id)?;
884
885    // Read meta.json for notification config and cwd (used in completion event).
886    let meta = job_dir.read_meta()?;
887    // Use the actual execution start time, not meta.created_at.
888    // For `run`, these are nearly identical. For `start`, the job may have
889    // been created long before it was started.
890    let started_at = now_rfc3339();
891
892    // Determine full.log path.
893    let full_log_path = if let Some(p) = opts.full_log {
894        std::path::PathBuf::from(p)
895    } else {
896        job_dir.full_log_path()
897    };
898
899    // Create the full.log file (shared between stdout/stderr threads).
900    // Ensure parent directories exist for custom paths.
901    if let Some(parent) = full_log_path.parent() {
902        std::fs::create_dir_all(parent)
903            .with_context(|| format!("create dir for full.log: {}", parent.display()))?;
904    }
905    let full_log_file = std::fs::File::create(&full_log_path).context("create full.log")?;
906    let full_log = Arc::new(Mutex::new(full_log_file));
907
908    // Execute command through the shell wrapper (same launcher as --notify-command).
909    //
910    // Single-element commands are treated as shell command strings and passed
911    // as-is (e.g. `"echo hello && ls"` preserves shell operators).
912    // Multi-element argv have each element POSIX-single-quoted before joining so
913    // that arguments with spaces, `$`, or special characters are preserved through
914    // the shell layer (e.g. `["sh", "-c", "exit 42"]` → `'sh' '-c' 'exit 42'`).
915    let cmd_str = build_cmd_str(command);
916    if opts.shell_wrapper.is_empty() {
917        anyhow::bail!("supervisor: shell wrapper must not be empty");
918    }
919    let mut child_cmd = Command::new(&opts.shell_wrapper[0]);
920    child_cmd.args(&opts.shell_wrapper[1..]).arg(&cmd_str);
921
922    if opts.inherit_env {
923        // Start with the current environment (default).
924    } else {
925        child_cmd.env_clear();
926    }
927
928    // Apply env files in order.
929    for env_file in &opts.env_files {
930        let vars = load_env_file(env_file)?;
931        for (k, v) in vars {
932            child_cmd.env(&k, &v);
933        }
934    }
935
936    // Apply --env KEY=VALUE overrides (applied after env-files).
937    for env_var in &opts.env_vars {
938        let (k, v) = parse_env_var(env_var);
939        child_cmd.env(&k, &v);
940    }
941
942    // Set working directory if specified.
943    if let Some(cwd) = opts.cwd {
944        child_cmd.current_dir(cwd);
945    }
946
947    // Put the child in its own process group so that timeout signals
948    // (SIGTERM / SIGKILL) reach the entire process tree, not just the
949    // shell wrapper.  Without this, `sh -lc "sleep 60"` would absorb
950    // the signal while the grandchild (`sleep`) keeps running.
951    #[cfg(unix)]
952    {
953        use std::os::unix::process::CommandExt;
954        // SAFETY: setsid is async-signal-safe and called before exec.
955        unsafe {
956            child_cmd.pre_exec(|| {
957                libc::setsid();
958                Ok(())
959            });
960        }
961    }
962
963    // Spawn the child with piped stdout/stderr so we can tee to logs.
964    let mut child = child_cmd
965        .stdin(std::process::Stdio::null())
966        .stdout(std::process::Stdio::piped())
967        .stderr(std::process::Stdio::piped())
968        .spawn()
969        .context("supervisor: spawn child")?;
970
971    let pid = child.id();
972    info!(job_id, pid, "child process started");
973
974    // On Windows, assign child to a named Job Object for process-tree management.
975    // The job name is derived from the job_id so that `kill` can look it up.
976    // Assignment is a MUST requirement on Windows: if it fails, the supervisor
977    // kills the child process and updates state.json to "failed" before returning
978    // an error, so that the run front-end (which may have already returned) can
979    // detect the failure via state.json on next poll.
980    #[cfg(windows)]
981    let windows_job_name = {
982        match assign_to_job_object(job_id, pid) {
983            Ok(name) => Some(name),
984            Err(e) => {
985                // Job Object assignment failed. Per design.md this is a MUST
986                // requirement on Windows. Kill the child process and update
987                // state.json to "failed" so the run front-end can detect it.
988                let kill_err = child.kill();
989                let _ = child.wait(); // reap to avoid zombies
990
991                let failed_state = JobState {
992                    job: JobStateJob {
993                        id: job_id.to_string(),
994                        status: JobStatus::Failed,
995                        started_at: Some(started_at.clone()),
996                    },
997                    result: JobStateResult {
998                        exit_code: None,
999                        signal: None,
1000                        duration_ms: None,
1001                    },
1002                    pid: Some(pid),
1003                    finished_at: Some(now_rfc3339()),
1004                    updated_at: now_rfc3339(),
1005                    windows_job_name: None,
1006                };
1007                // Best-effort: if writing state fails, we still propagate the
1008                // original assignment error.
1009                let _ = job_dir.write_state(&failed_state);
1010
1011                // Dispatch completion event for the failed state if notifications are configured.
1012                // This mirrors the dispatch logic in the normal exit path so that callers
1013                // receive a job.finished event even when the supervisor fails early (Windows only).
1014                if opts.notify_command.is_some() || opts.notify_file.is_some() {
1015                    let finished_at_ts =
1016                        failed_state.finished_at.clone().unwrap_or_else(now_rfc3339);
1017                    let stdout_log = job_dir.stdout_path().display().to_string();
1018                    let stderr_log = job_dir.stderr_path().display().to_string();
1019                    let fail_event = crate::schema::CompletionEvent {
1020                        schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1021                        event_type: "job.finished".to_string(),
1022                        job_id: job_id.to_string(),
1023                        state: JobStatus::Failed.as_str().to_string(),
1024                        command: meta.command.clone(),
1025                        cwd: meta.cwd.clone(),
1026                        started_at: started_at.clone(),
1027                        finished_at: finished_at_ts,
1028                        duration_ms: None,
1029                        exit_code: None,
1030                        signal: None,
1031                        stdout_log_path: stdout_log,
1032                        stderr_log_path: stderr_log,
1033                    };
1034                    let fail_event_json = serde_json::to_string(&fail_event).unwrap_or_default();
1035                    let fail_event_path = job_dir.completion_event_path().display().to_string();
1036                    let mut fail_delivery_results: Vec<crate::schema::SinkDeliveryResult> =
1037                        Vec::new();
1038                    if let Err(we) = job_dir.write_completion_event_atomic(
1039                        &crate::schema::CompletionEventRecord {
1040                            event: fail_event.clone(),
1041                            delivery_results: vec![],
1042                        },
1043                    ) {
1044                        warn!(
1045                            job_id,
1046                            error = %we,
1047                            "failed to write initial completion_event.json for failed job"
1048                        );
1049                    }
1050                    if let Some(ref shell_cmd) = opts.notify_command {
1051                        fail_delivery_results.push(dispatch_command_sink(
1052                            shell_cmd,
1053                            &fail_event_json,
1054                            job_id,
1055                            &fail_event_path,
1056                            &opts.shell_wrapper,
1057                            "job.finished",
1058                        ));
1059                    }
1060                    if let Some(ref file_path) = opts.notify_file {
1061                        fail_delivery_results.push(dispatch_file_sink(file_path, &fail_event_json));
1062                    }
1063                    if let Err(we) = job_dir.write_completion_event_atomic(
1064                        &crate::schema::CompletionEventRecord {
1065                            event: fail_event,
1066                            delivery_results: fail_delivery_results,
1067                        },
1068                    ) {
1069                        warn!(
1070                            job_id,
1071                            error = %we,
1072                            "failed to update completion_event.json with delivery results for failed job"
1073                        );
1074                    }
1075                }
1076
1077                if let Err(ke) = kill_err {
1078                    return Err(anyhow::anyhow!(
1079                        "supervisor: failed to assign pid {pid} to Job Object \
1080                         (Windows MUST requirement): {e}; also failed to kill child: {ke}"
1081                    ));
1082                }
1083                return Err(anyhow::anyhow!(
1084                    "supervisor: failed to assign pid {pid} to Job Object \
1085                     (Windows MUST requirement); child process was killed; \
1086                     consider running outside a nested Job Object environment: {e}"
1087                ));
1088            }
1089        }
1090    };
1091    #[cfg(not(windows))]
1092    let windows_job_name: Option<String> = None;
1093
1094    // Update state.json with real child PID and Windows Job Object name.
1095    // On Windows, windows_job_name is always Some at this point (guaranteed
1096    // by the MUST requirement above), so state.json will always contain the
1097    // Job Object identifier while the job is running.
1098    let state = JobState {
1099        job: JobStateJob {
1100            id: job_id.to_string(),
1101            status: JobStatus::Running,
1102            started_at: Some(started_at.clone()),
1103        },
1104        result: JobStateResult {
1105            exit_code: None,
1106            signal: None,
1107            duration_ms: None,
1108        },
1109        pid: Some(pid),
1110        finished_at: None,
1111        updated_at: now_rfc3339(),
1112        windows_job_name,
1113    };
1114    job_dir.write_state(&state)?;
1115
1116    let child_start_time = std::time::Instant::now();
1117
1118    // Take stdout/stderr handles before moving child.
1119    let child_stdout = child.stdout.take().expect("child stdout piped");
1120    let child_stderr = child.stderr.take().expect("child stderr piped");
1121
1122    // Create shared output-match checker from the initial meta notification config.
1123    let match_checker = std::sync::Arc::new(OutputMatchChecker::new(
1124        job_dir.path.clone(),
1125        opts.shell_wrapper.clone(),
1126        meta.notification.clone(),
1127    ));
1128
1129    // Completion channels for log threads: each thread sends `()` after stream_to_logs returns.
1130    // Used for bounded joins below (allows supervisor to exit promptly when descendants
1131    // hold inherited pipe ends open indefinitely).
1132    let (tx_stdout_done, rx_stdout_done) = std::sync::mpsc::channel::<()>();
1133    let (tx_stderr_done, rx_stderr_done) = std::sync::mpsc::channel::<()>();
1134
1135    // Thread: read stdout, write to stdout.log and full.log.
1136    let stdout_log_path = job_dir.stdout_path();
1137    let full_log_stdout = Arc::clone(&full_log);
1138    let match_checker_stdout = std::sync::Arc::clone(&match_checker);
1139    let t_stdout = std::thread::spawn(move || {
1140        stream_to_logs(
1141            child_stdout,
1142            &stdout_log_path,
1143            full_log_stdout,
1144            "STDOUT",
1145            Some(move |line: &str| match_checker_stdout.check_line(line, "stdout")),
1146        );
1147        let _ = tx_stdout_done.send(());
1148    });
1149
1150    // Thread: read stderr, write to stderr.log and full.log.
1151    let stderr_log_path = job_dir.stderr_path();
1152    let full_log_stderr = Arc::clone(&full_log);
1153    let match_checker_stderr = std::sync::Arc::clone(&match_checker);
1154    let t_stderr = std::thread::spawn(move || {
1155        stream_to_logs(
1156            child_stderr,
1157            &stderr_log_path,
1158            full_log_stderr,
1159            "STDERR",
1160            Some(move |line: &str| match_checker_stderr.check_line(line, "stderr")),
1161        );
1162        let _ = tx_stderr_done.send(());
1163    });
1164
1165    // Timeout / kill-after / progress-every handling.
1166    // We spawn a watcher thread to handle timeout and periodic state.json updates.
1167    let timeout_ms = opts.timeout_ms;
1168    let kill_after_ms = opts.kill_after_ms;
1169    let progress_every_ms = opts.progress_every_ms;
1170    let state_path = job_dir.state_path();
1171    let job_id_str = job_id.to_string();
1172
1173    // Use an atomic flag to signal the watcher thread when the child has exited.
1174    use std::sync::atomic::{AtomicBool, Ordering};
1175    let child_done = Arc::new(AtomicBool::new(false));
1176
1177    let watcher = if timeout_ms > 0 || progress_every_ms > 0 {
1178        let state_path_clone = state_path.clone();
1179        let child_done_clone = Arc::clone(&child_done);
1180        Some(std::thread::spawn(move || {
1181            let start = std::time::Instant::now();
1182            let timeout_dur = if timeout_ms > 0 {
1183                Some(std::time::Duration::from_millis(timeout_ms))
1184            } else {
1185                None
1186            };
1187            let progress_dur = if progress_every_ms > 0 {
1188                Some(std::time::Duration::from_millis(progress_every_ms))
1189            } else {
1190                None
1191            };
1192
1193            let poll_interval = std::time::Duration::from_millis(100);
1194
1195            loop {
1196                std::thread::sleep(poll_interval);
1197
1198                // Exit the watcher loop if the child process has finished.
1199                if child_done_clone.load(Ordering::Relaxed) {
1200                    break;
1201                }
1202
1203                let elapsed = start.elapsed();
1204
1205                // Check for timeout.
1206                if let Some(td) = timeout_dur
1207                    && elapsed >= td
1208                {
1209                    info!(job_id = %job_id_str, "timeout reached, sending SIGTERM to process group");
1210                    // Send SIGTERM to the entire process group (negative PID).
1211                    // The child was placed in its own session/group via setsid.
1212                    #[cfg(unix)]
1213                    {
1214                        unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGTERM) };
1215                    }
1216                    // If kill_after > 0, wait kill_after ms then SIGKILL.
1217                    if kill_after_ms > 0 {
1218                        std::thread::sleep(std::time::Duration::from_millis(kill_after_ms));
1219                        info!(job_id = %job_id_str, "kill-after elapsed, sending SIGKILL to process group");
1220                        #[cfg(unix)]
1221                        {
1222                            unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1223                        }
1224                    } else {
1225                        // Immediate SIGKILL to the process group.
1226                        #[cfg(unix)]
1227                        {
1228                            unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1229                        }
1230                    }
1231                    break;
1232                }
1233
1234                // Progress-every: update updated_at periodically.
1235                if let Some(pd) = progress_dur {
1236                    let elapsed_ms = elapsed.as_millis() as u64;
1237                    let pd_ms = pd.as_millis() as u64;
1238                    let poll_ms = poll_interval.as_millis() as u64;
1239                    if elapsed_ms % pd_ms < poll_ms {
1240                        // Read, update updated_at, write back.
1241                        if let Ok(raw) = std::fs::read(&state_path_clone)
1242                            && let Ok(mut st) =
1243                                serde_json::from_slice::<crate::schema::JobState>(&raw)
1244                        {
1245                            st.updated_at = now_rfc3339();
1246                            if let Ok(s) = serde_json::to_string_pretty(&st) {
1247                                let _ = std::fs::write(&state_path_clone, s);
1248                            }
1249                        }
1250                    }
1251                }
1252            }
1253        }))
1254    } else {
1255        None
1256    };
1257
1258    // Wait for child to finish.
1259    let exit_status = child.wait().context("wait for child")?;
1260
1261    // Signal the watcher that the child has finished so it can exit its loop.
1262    child_done.store(true, Ordering::Relaxed);
1263
1264    // Persist terminal state immediately after the wrapped root process exits.
1265    // This must happen BEFORE joining log threads, because log threads block on
1266    // EOF of stdout/stderr pipes and descendant processes that inherited those
1267    // pipes may keep them open indefinitely. Persisting state here ensures that
1268    // `status` and `wait` can observe the terminal state without waiting for
1269    // all descendants to close their inherited handles.
1270    let duration_ms = child_start_time.elapsed().as_millis() as u64;
1271    let exit_code = exit_status.code();
1272    let finished_at = now_rfc3339();
1273
1274    // Detect signal-killed processes on Unix for accurate state and completion event.
1275    #[cfg(unix)]
1276    let (terminal_status, signal_name) = {
1277        use std::os::unix::process::ExitStatusExt;
1278        if let Some(sig) = exit_status.signal() {
1279            (JobStatus::Killed, Some(sig.to_string()))
1280        } else {
1281            (JobStatus::Exited, None)
1282        }
1283    };
1284    #[cfg(not(unix))]
1285    let (terminal_status, signal_name) = (JobStatus::Exited, None::<String>);
1286
1287    let state = JobState {
1288        job: JobStateJob {
1289            id: job_id.to_string(),
1290            status: terminal_status.clone(),
1291            started_at: Some(started_at.clone()),
1292        },
1293        result: JobStateResult {
1294            exit_code,
1295            signal: signal_name.clone(),
1296            duration_ms: Some(duration_ms),
1297        },
1298        pid: Some(pid),
1299        finished_at: Some(finished_at.clone()),
1300        updated_at: now_rfc3339(),
1301        windows_job_name: None, // not needed after process exits
1302    };
1303    job_dir.write_state(&state)?;
1304    info!(job_id, ?exit_code, "child process finished");
1305
1306    // Bounded join for log-reader threads.
1307    //
1308    // After the wrapped root process exits, we give log threads a short window
1309    // to drain any remaining output and fire output-match callbacks (which is
1310    // the common case: no descendants, pipe write-end closes promptly).
1311    //
1312    // If a thread does not complete within the window, it is detached (dropped).
1313    // Detaching means the thread will be killed when the supervisor process exits,
1314    // which is the correct trade-off: supervisor must not linger indefinitely
1315    // because a descendant holds an inherited pipe write-end open.
1316    const LOG_DRAIN_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000);
1317    let drain_deadline = std::time::Instant::now() + LOG_DRAIN_TIMEOUT;
1318
1319    let remaining = drain_deadline
1320        .checked_duration_since(std::time::Instant::now())
1321        .unwrap_or(std::time::Duration::ZERO);
1322    if rx_stdout_done.recv_timeout(remaining).is_ok() {
1323        let _ = t_stdout.join();
1324    } else {
1325        drop(t_stdout); // detach: descendant holds the pipe open
1326    }
1327
1328    let remaining = drain_deadline
1329        .checked_duration_since(std::time::Instant::now())
1330        .unwrap_or(std::time::Duration::ZERO);
1331    if rx_stderr_done.recv_timeout(remaining).is_ok() {
1332        let _ = t_stderr.join();
1333    } else {
1334        drop(t_stderr); // detach: descendant holds the pipe open
1335    }
1336
1337    // Join watcher if present; it exits promptly once child_done is set.
1338    if let Some(w) = watcher {
1339        let _ = w.join();
1340    }
1341
1342    // Reload the latest notification config from meta.json to pick up any post-creation
1343    // updates (e.g. from `notify set` invoked after the job was launched).
1344    let latest_notification = job_dir.read_meta().ok().and_then(|m| m.notification);
1345    let (current_notify_command, current_notify_file) = match &latest_notification {
1346        Some(n) => (n.notify_command.clone(), n.notify_file.clone()),
1347        None => (None, None),
1348    };
1349
1350    // Dispatch completion event to configured notification sinks.
1351    // Failure here must not alter job state (delivery result is recorded separately).
1352    let has_notification = current_notify_command.is_some() || current_notify_file.is_some();
1353    if has_notification {
1354        let stdout_log = job_dir.stdout_path().display().to_string();
1355        let stderr_log = job_dir.stderr_path().display().to_string();
1356        let event = crate::schema::CompletionEvent {
1357            schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1358            event_type: "job.finished".to_string(),
1359            job_id: job_id.to_string(),
1360            state: terminal_status.as_str().to_string(),
1361            command: meta.command.clone(),
1362            cwd: meta.cwd.clone(),
1363            started_at,
1364            finished_at,
1365            duration_ms: Some(duration_ms),
1366            exit_code,
1367            signal: signal_name,
1368            stdout_log_path: stdout_log,
1369            stderr_log_path: stderr_log,
1370        };
1371
1372        let event_json = serde_json::to_string(&event).unwrap_or_default();
1373        let event_path = job_dir.completion_event_path().display().to_string();
1374        let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
1375
1376        // Write initial completion_event.json before dispatching sinks.
1377        if let Err(e) =
1378            job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1379                event: event.clone(),
1380                delivery_results: vec![],
1381            })
1382        {
1383            warn!(job_id, error = %e, "failed to write initial completion_event.json");
1384        }
1385
1386        if let Some(ref shell_cmd) = current_notify_command {
1387            delivery_results.push(dispatch_command_sink(
1388                shell_cmd,
1389                &event_json,
1390                job_id,
1391                &event_path,
1392                &opts.shell_wrapper,
1393                "job.finished",
1394            ));
1395        }
1396        if let Some(ref file_path) = current_notify_file {
1397            delivery_results.push(dispatch_file_sink(file_path, &event_json));
1398        }
1399
1400        // Update completion_event.json with delivery results.
1401        if let Err(e) =
1402            job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1403                event,
1404                delivery_results,
1405            })
1406        {
1407            warn!(job_id, error = %e, "failed to update completion_event.json with delivery results");
1408        }
1409    }
1410
1411    Ok(())
1412}
1413
1414/// Dispatch the command sink: execute the shell command string via the configured shell wrapper,
1415/// pass event JSON via stdin, and set AGENT_EXEC_EVENT_PATH / AGENT_EXEC_JOB_ID /
1416/// AGENT_EXEC_EVENT_TYPE env vars.
1417///
1418/// The shell wrapper argv (e.g. `["sh", "-lc"]`) is provided by the caller.
1419/// The command string is appended as the final argument to the wrapper.
1420fn dispatch_command_sink(
1421    shell_cmd: &str,
1422    event_json: &str,
1423    job_id: &str,
1424    event_path: &str,
1425    shell_wrapper: &[String],
1426    event_type: &str,
1427) -> crate::schema::SinkDeliveryResult {
1428    use std::io::Write;
1429    let attempted_at = now_rfc3339();
1430    let target = shell_cmd.to_string();
1431
1432    if shell_cmd.trim().is_empty() {
1433        return crate::schema::SinkDeliveryResult {
1434            sink_type: "command".to_string(),
1435            target,
1436            success: false,
1437            error: Some("empty shell command".to_string()),
1438            attempted_at,
1439        };
1440    }
1441
1442    if shell_wrapper.is_empty() {
1443        return crate::schema::SinkDeliveryResult {
1444            sink_type: "command".to_string(),
1445            target,
1446            success: false,
1447            error: Some("shell wrapper must not be empty".to_string()),
1448            attempted_at,
1449        };
1450    }
1451
1452    let mut cmd = Command::new(&shell_wrapper[0]);
1453    cmd.args(&shell_wrapper[1..]).arg(shell_cmd);
1454
1455    cmd.env("AGENT_EXEC_EVENT_PATH", event_path);
1456    cmd.env("AGENT_EXEC_JOB_ID", job_id);
1457    cmd.env("AGENT_EXEC_EVENT_TYPE", event_type);
1458    cmd.stdin(std::process::Stdio::piped());
1459    cmd.stdout(std::process::Stdio::null());
1460    cmd.stderr(std::process::Stdio::null());
1461
1462    match cmd.spawn() {
1463        Ok(mut child) => {
1464            if let Some(mut stdin) = child.stdin.take() {
1465                let _ = stdin.write_all(event_json.as_bytes());
1466            }
1467            match child.wait() {
1468                Ok(status) if status.success() => crate::schema::SinkDeliveryResult {
1469                    sink_type: "command".to_string(),
1470                    target,
1471                    success: true,
1472                    error: None,
1473                    attempted_at,
1474                },
1475                Ok(status) => crate::schema::SinkDeliveryResult {
1476                    sink_type: "command".to_string(),
1477                    target,
1478                    success: false,
1479                    error: Some(format!("exited with status {status}")),
1480                    attempted_at,
1481                },
1482                Err(e) => crate::schema::SinkDeliveryResult {
1483                    sink_type: "command".to_string(),
1484                    target,
1485                    success: false,
1486                    error: Some(format!("wait error: {e}")),
1487                    attempted_at,
1488                },
1489            }
1490        }
1491        Err(e) => crate::schema::SinkDeliveryResult {
1492            sink_type: "command".to_string(),
1493            target,
1494            success: false,
1495            error: Some(format!("spawn error: {e}")),
1496            attempted_at,
1497        },
1498    }
1499}
1500
1501/// Dispatch the file sink: append event JSON as a single NDJSON line.
1502/// Creates parent directories automatically.
1503fn dispatch_file_sink(file_path: &str, event_json: &str) -> crate::schema::SinkDeliveryResult {
1504    use std::io::Write;
1505    let attempted_at = now_rfc3339();
1506    let path = std::path::Path::new(file_path);
1507
1508    if let Some(parent) = path.parent()
1509        && let Err(e) = std::fs::create_dir_all(parent)
1510    {
1511        return crate::schema::SinkDeliveryResult {
1512            sink_type: "file".to_string(),
1513            target: file_path.to_string(),
1514            success: false,
1515            error: Some(format!("create parent dir: {e}")),
1516            attempted_at,
1517        };
1518    }
1519
1520    match std::fs::OpenOptions::new()
1521        .create(true)
1522        .append(true)
1523        .open(path)
1524    {
1525        Ok(mut f) => match writeln!(f, "{event_json}") {
1526            Ok(_) => crate::schema::SinkDeliveryResult {
1527                sink_type: "file".to_string(),
1528                target: file_path.to_string(),
1529                success: true,
1530                error: None,
1531                attempted_at,
1532            },
1533            Err(e) => crate::schema::SinkDeliveryResult {
1534                sink_type: "file".to_string(),
1535                target: file_path.to_string(),
1536                success: false,
1537                error: Some(format!("write error: {e}")),
1538                attempted_at,
1539            },
1540        },
1541        Err(e) => crate::schema::SinkDeliveryResult {
1542            sink_type: "file".to_string(),
1543            target: file_path.to_string(),
1544            success: false,
1545            error: Some(format!("open error: {e}")),
1546            attempted_at,
1547        },
1548    }
1549}
1550
1551/// Public alias so other modules can call the timestamp helper.
1552pub fn now_rfc3339_pub() -> String {
1553    now_rfc3339()
1554}
1555
1556fn now_rfc3339() -> String {
1557    // Use a simple approach that works without chrono.
1558    let d = std::time::SystemTime::now()
1559        .duration_since(std::time::UNIX_EPOCH)
1560        .unwrap_or_default();
1561    format_rfc3339(d.as_secs())
1562}
1563
1564fn format_rfc3339(secs: u64) -> String {
1565    // Manual conversion of Unix timestamp to UTC date-time string.
1566    let mut s = secs;
1567    let seconds = s % 60;
1568    s /= 60;
1569    let minutes = s % 60;
1570    s /= 60;
1571    let hours = s % 24;
1572    s /= 24;
1573
1574    // Days since 1970-01-01
1575    let mut days = s;
1576    let mut year = 1970u64;
1577    loop {
1578        let days_in_year = if is_leap(year) { 366 } else { 365 };
1579        if days < days_in_year {
1580            break;
1581        }
1582        days -= days_in_year;
1583        year += 1;
1584    }
1585
1586    let leap = is_leap(year);
1587    let month_days: [u64; 12] = [
1588        31,
1589        if leap { 29 } else { 28 },
1590        31,
1591        30,
1592        31,
1593        30,
1594        31,
1595        31,
1596        30,
1597        31,
1598        30,
1599        31,
1600    ];
1601    let mut month = 0usize;
1602    for (i, &d) in month_days.iter().enumerate() {
1603        if days < d {
1604            month = i;
1605            break;
1606        }
1607        days -= d;
1608    }
1609    let day = days + 1;
1610
1611    format!(
1612        "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
1613        year,
1614        month + 1,
1615        day,
1616        hours,
1617        minutes,
1618        seconds
1619    )
1620}
1621
1622fn is_leap(year: u64) -> bool {
1623    (year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
1624}
1625
1626/// Windows-only: create a named Job Object and assign the given child process
1627/// to it so that the entire process tree can be terminated via `kill`.
1628///
1629/// The Job Object is named `"AgentExec-{job_id}"`. This name is stored in
1630/// `state.json` so that future `kill` invocations can open the same Job Object
1631/// by name and call `TerminateJobObject` to stop the whole tree.
1632///
1633/// Returns `Ok(name)` on success.  Returns `Err` on failure — the caller
1634/// (`supervise`) treats failure as a fatal error because reliable process-tree
1635/// management is a Windows MUST requirement (design.md).
1636#[cfg(windows)]
1637fn assign_to_job_object(job_id: &str, pid: u32) -> Result<String> {
1638    use windows::Win32::Foundation::CloseHandle;
1639    use windows::Win32::System::JobObjects::{AssignProcessToJobObject, CreateJobObjectW};
1640    use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
1641    use windows::core::HSTRING;
1642
1643    let job_name = format!("AgentExec-{job_id}");
1644    let hname = HSTRING::from(job_name.as_str());
1645
1646    unsafe {
1647        // Open the child process handle (needed for AssignProcessToJobObject).
1648        let proc_handle =
1649            OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid).map_err(|e| {
1650                anyhow::anyhow!(
1651                    "supervisor: OpenProcess(pid={pid}) failed — cannot assign to Job Object: {e}"
1652                )
1653            })?;
1654
1655        // Create a named Job Object.
1656        let job = match CreateJobObjectW(None, &hname) {
1657            Ok(h) => h,
1658            Err(e) => {
1659                let _ = CloseHandle(proc_handle);
1660                return Err(anyhow::anyhow!(
1661                    "supervisor: CreateJobObjectW({job_name}) failed: {e}"
1662                ));
1663            }
1664        };
1665
1666        // Assign the child process to the Job Object.
1667        // This can fail if the process is already in another job (e.g. CI/nested).
1668        // Per design.md, assignment is a MUST on Windows — failure is a fatal error.
1669        if let Err(e) = AssignProcessToJobObject(job, proc_handle) {
1670            let _ = CloseHandle(job);
1671            let _ = CloseHandle(proc_handle);
1672            return Err(anyhow::anyhow!(
1673                "supervisor: AssignProcessToJobObject(pid={pid}) failed \
1674                 (process may already belong to another Job Object, e.g. in a CI environment): {e}"
1675            ));
1676        }
1677
1678        // Keep job handle open for the lifetime of the supervisor so the Job
1679        // Object remains valid. We intentionally do NOT close it here.
1680        // The OS will close it automatically when the supervisor exits.
1681        // (We close proc_handle since we only needed it for assignment.)
1682        let _ = CloseHandle(proc_handle);
1683        // Note: job handle is intentionally leaked here to keep the Job Object alive.
1684        // The handle will be closed when the supervisor process exits.
1685        std::mem::forget(job);
1686    }
1687
1688    info!(job_id, name = %job_name, "supervisor: child assigned to Job Object");
1689    Ok(job_name)
1690}
1691
1692/// Build the shell command string passed to the shell wrapper.
1693///
1694/// - Single-element commands are treated as shell command strings and passed
1695///   as-is, preserving shell operators like `&&`, pipes, etc.
1696/// - Multi-element commands have each element POSIX-single-quoted before
1697///   joining so that argv semantics survive the shell layer, including
1698///   arguments that contain spaces, `$`, quotes, or other special characters.
1699fn build_cmd_str(command: &[String]) -> String {
1700    if command.len() == 1 {
1701        command[0].clone()
1702    } else {
1703        command
1704            .iter()
1705            .map(|s| posix_single_quote(s))
1706            .collect::<Vec<_>>()
1707            .join(" ")
1708    }
1709}
1710
1711/// Wrap a string in POSIX single quotes, escaping any embedded single quotes.
1712fn posix_single_quote(s: &str) -> String {
1713    format!("'{}'", s.replace('\'', "'\\''"))
1714}
1715
1716#[cfg(test)]
1717mod tests {
1718    use super::*;
1719
1720    #[test]
1721    fn rfc3339_epoch() {
1722        assert_eq!(format_rfc3339(0), "1970-01-01T00:00:00Z");
1723    }
1724
1725    #[test]
1726    fn rfc3339_known_date() {
1727        // 2024-01-01T00:00:00Z = 1704067200
1728        assert_eq!(format_rfc3339(1704067200), "2024-01-01T00:00:00Z");
1729    }
1730}