Skip to main content

agent_exec/
run.rs

1//! Implementation of the `run` sub-command.
2//!
3//! Design decisions (from design.md):
4//! - `run` spawns a short-lived front-end that forks a `_supervise` child.
5//! - The supervisor continues logging stdout/stderr after `run` returns.
6//! - `run` returns launch metadata immediately; observation is delegated to
7//!   `status` / `wait` / `tail`.
8
9use anyhow::{Context, Result};
10use std::io::IsTerminal;
11use std::path::Path;
12use std::process::Command;
13use tracing::{debug, info, warn};
14
15use crate::jobstore::{JobDir, generate_job_id, resolve_root};
16use crate::schema::{
17    JobMeta, JobMetaJob, JobState, JobStateJob, JobStateResult, JobStatus, Response, RunData,
18};
19
20#[derive(Debug, Clone)]
21pub struct InlineObservation {
22    pub waited_ms: u64,
23    pub stdout: String,
24    pub stderr: String,
25    pub stdout_range: [u64; 2],
26    pub stderr_range: [u64; 2],
27    pub stdout_total_bytes: u64,
28    pub stderr_total_bytes: u64,
29    pub encoding: String,
30    pub state: String,
31    pub exit_code: Option<i32>,
32    pub finished_at: Option<String>,
33    pub signal: Option<String>,
34    pub duration_ms: Option<u64>,
35}
36use crate::tag::dedup_tags;
37
38/// Options for the `run` sub-command.
39///
40/// # Definition-time option alignment rule
41///
42/// Every definition-time option accepted here MUST also be accepted by `create` (and vice versa),
43/// since both commands write the same persisted job definition to `meta.json`. When adding a
44/// new persisted metadata field, wire it through both `run` and `create` unless the spec
45/// explicitly documents it as launch-only (e.g. snapshot timing, tail sizing, --wait).
46#[derive(Debug)]
47pub struct RunOpts<'a> {
48    /// Command and arguments to execute.
49    pub command: Vec<String>,
50    /// Override for jobs root directory.
51    pub root: Option<&'a str>,
52    /// Disable best-effort auto-GC for this invocation.
53    pub no_auto_gc: bool,
54    /// Optional auto-GC retention override.
55    pub auto_gc_older_than: Option<String>,
56    /// Optional auto-GC max-jobs override.
57    pub auto_gc_max_jobs: Option<u64>,
58    /// Optional auto-GC max-bytes override.
59    pub auto_gc_max_bytes: Option<u64>,
60    /// Base auto-GC settings resolved from config/defaults.
61    pub auto_gc_config: crate::gc::AutoGcConfig,
62    /// Wait for inline output observation before returning.
63    pub wait: bool,
64    /// Maximum wait duration in seconds for inline observation.
65    pub until_seconds: u64,
66    /// Wait indefinitely for terminal state / observation budget.
67    pub forever: bool,
68    /// Maximum bytes to include from the head of each stream.
69    pub max_bytes: u64,
70    pub compression_mode: crate::compress::CompressionMode,
71    /// Timeout in milliseconds; 0 = no timeout.
72    pub timeout_ms: u64,
73    /// Milliseconds after SIGTERM before SIGKILL; 0 = immediate SIGKILL.
74    pub kill_after_ms: u64,
75    /// Working directory for the command.
76    pub cwd: Option<&'a str>,
77    /// Environment variables as KEY=VALUE strings.
78    pub env_vars: Vec<String>,
79    /// Paths to env files, applied in order.
80    pub env_files: Vec<String>,
81    /// Whether to inherit the current process environment (default: true).
82    pub inherit_env: bool,
83    /// Keys to mask in JSON output (values replaced with "***").
84    pub mask: Vec<String>,
85    /// Optional stdin source definition persisted in meta and materialized into stdin.bin.
86    pub stdin: Option<StdinSource>,
87    /// Maximum bytes allowed for materialized stdin.bin (default: 64 MiB).
88    pub stdin_max_bytes: u64,
89    /// User-defined tags for this job (deduplicated preserving first-seen order).
90    pub tags: Vec<String>,
91    /// Override full.log path; None = use job dir.
92    pub log: Option<&'a str>,
93    /// Interval (ms) for state.json updated_at refresh; 0 = disabled.
94    pub progress_every_ms: u64,
95    /// Shell command string for command notification sink; executed via platform shell.
96    /// None = no command sink.
97    pub notify_command: Option<String>,
98    /// File path for NDJSON notification sink; None = no file sink.
99    pub notify_file: Option<String>,
100    /// Pattern to match against output lines (output-match notification).
101    pub output_pattern: Option<String>,
102    /// Match type for output-match: "contains" or "regex".
103    pub output_match_type: Option<String>,
104    /// Stream selector: "stdout", "stderr", or "either".
105    pub output_stream: Option<String>,
106    /// Shell command string for output-match command sink.
107    pub output_command: Option<String>,
108    /// File path for output-match NDJSON file sink.
109    pub output_file: Option<String>,
110    /// Resolved shell wrapper argv used to execute command strings.
111    /// e.g. `["sh", "-lc"]` or `["bash", "-lc"]`.
112    pub shell_wrapper: Vec<String>,
113}
114
115impl<'a> Default for RunOpts<'a> {
116    fn default() -> Self {
117        RunOpts {
118            command: vec![],
119            root: None,
120            no_auto_gc: false,
121            auto_gc_older_than: None,
122            auto_gc_max_jobs: None,
123            auto_gc_max_bytes: None,
124            auto_gc_config: crate::gc::AutoGcConfig::default(),
125            wait: true,
126            until_seconds: 10,
127            forever: false,
128            max_bytes: 65536,
129            compression_mode: crate::compress::CompressionMode::default(),
130            timeout_ms: 0,
131            kill_after_ms: 0,
132            cwd: None,
133            env_vars: vec![],
134            env_files: vec![],
135            inherit_env: true,
136            mask: vec![],
137            stdin: None,
138            stdin_max_bytes: DEFAULT_STDIN_MAX_BYTES,
139            tags: vec![],
140            log: None,
141            progress_every_ms: 0,
142            notify_command: None,
143            notify_file: None,
144            output_pattern: None,
145            output_match_type: None,
146            output_stream: None,
147            output_command: None,
148            output_file: None,
149            shell_wrapper: crate::config::default_shell_wrapper(),
150        }
151    }
152}
153
154/// Parameters for spawning a supervisor process.
155///
156/// Shared by `run::execute` and `start::execute`.
157#[derive(Debug, Clone)]
158pub enum StdinSource {
159    CallerStdin,
160    Inline(String),
161    File(String),
162}
163
164pub struct SpawnSupervisorParams {
165    pub job_id: String,
166    pub root: std::path::PathBuf,
167    pub full_log_path: String,
168    pub timeout_ms: u64,
169    pub kill_after_ms: u64,
170    pub cwd: Option<String>,
171    /// Real (unmasked) KEY=VALUE env var pairs.
172    pub env_vars: Vec<String>,
173    pub env_files: Vec<String>,
174    pub inherit_env: bool,
175    pub stdin_file: Option<String>,
176    pub progress_every_ms: u64,
177    pub notify_command: Option<String>,
178    pub notify_file: Option<String>,
179    pub shell_wrapper: Vec<String>,
180    pub command: Vec<String>,
181}
182
183pub fn resolve_stdin_source(
184    stdin: Option<String>,
185    stdin_file: Option<String>,
186) -> Option<StdinSource> {
187    if let Some(value) = stdin {
188        if value == "-" {
189            Some(StdinSource::CallerStdin)
190        } else {
191            Some(StdinSource::Inline(value))
192        }
193    } else {
194        stdin_file.map(StdinSource::File)
195    }
196}
197
198pub fn validate_stdin_source(stdin: Option<&StdinSource>) -> Result<()> {
199    if matches!(stdin, Some(StdinSource::CallerStdin)) {
200        let stdin = std::io::stdin();
201        if stdin.is_terminal() {
202            return Err(anyhow::anyhow!(StdinRequired(
203                "stdin_required: --stdin - requires non-interactive stdin (pipe/heredoc/redirect)"
204                    .to_string(),
205            )));
206        }
207    }
208    Ok(())
209}
210
211fn create_stdin_file(path: &std::path::Path) -> Result<std::fs::File> {
212    #[cfg(unix)]
213    {
214        use std::os::unix::fs::OpenOptionsExt;
215        std::fs::OpenOptions::new()
216            .write(true)
217            .create(true)
218            .truncate(true)
219            .mode(0o600)
220            .open(path)
221            .with_context(|| format!("create materialized stdin {}", path.display()))
222    }
223    #[cfg(not(unix))]
224    {
225        std::fs::File::create(path)
226            .with_context(|| format!("create materialized stdin {}", path.display()))
227    }
228}
229
230struct LimitedWriter<W> {
231    inner: W,
232    written: u64,
233    limit: u64,
234}
235
236impl<W: std::io::Write> std::io::Write for LimitedWriter<W> {
237    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
238        if self.written + buf.len() as u64 > self.limit {
239            return Err(std::io::Error::other("stdin_too_large"));
240        }
241        let n = self.inner.write(buf)?;
242        self.written += n as u64;
243        Ok(n)
244    }
245
246    fn flush(&mut self) -> std::io::Result<()> {
247        self.inner.flush()
248    }
249}
250
251fn materialize_stdin(
252    job_dir: &JobDir,
253    stdin: Option<&StdinSource>,
254    max_bytes: u64,
255) -> Result<Option<String>> {
256    let Some(source) = stdin else {
257        return Ok(None);
258    };
259
260    let target_name = "stdin.bin".to_string();
261    let target_path = job_dir.path.join(&target_name);
262    let file = create_stdin_file(&target_path)?;
263    let mut target = LimitedWriter {
264        inner: file,
265        written: 0,
266        limit: max_bytes,
267    };
268
269    let copy_result = match source {
270        StdinSource::CallerStdin => {
271            let mut stdin = std::io::stdin();
272            std::io::copy(&mut stdin, &mut target).context("materialize caller stdin to stdin.bin")
273        }
274        StdinSource::Inline(value) => {
275            use std::io::Write;
276            target
277                .write_all(value.as_bytes())
278                .map(|_| 0u64)
279                .context("write inline stdin to stdin.bin")
280        }
281        StdinSource::File(path) => {
282            let mut input = std::fs::File::open(path)
283                .with_context(|| format!("open --stdin-file source {}", path))?;
284            std::io::copy(&mut input, &mut target)
285                .with_context(|| format!("copy --stdin-file source {} to stdin.bin", path))
286        }
287    };
288
289    if let Err(e) = copy_result {
290        let _ = std::fs::remove_file(&target_path);
291        let is_too_large = e
292            .chain()
293            .any(|cause| cause.to_string().contains("stdin_too_large"));
294        if is_too_large {
295            return Err(anyhow::anyhow!(StdinTooLarge(format!(
296                "stdin_too_large: input exceeds {} byte limit",
297                max_bytes
298            ))));
299        }
300        return Err(e);
301    }
302
303    Ok(Some(target_name))
304}
305
306fn resolve_stdin_path(job_dir: &JobDir, stdin_file: Option<&str>) -> Option<std::path::PathBuf> {
307    stdin_file.map(|p| {
308        let path = std::path::Path::new(p);
309        if path.is_absolute() {
310            path.to_path_buf()
311        } else {
312            job_dir.path.join(path)
313        }
314    })
315}
316
317#[derive(Debug)]
318pub struct StdinRequired(pub String);
319
320impl std::fmt::Display for StdinRequired {
321    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322        write!(f, "{}", self.0)
323    }
324}
325
326impl std::error::Error for StdinRequired {}
327
328#[derive(Debug)]
329pub struct StdinTooLarge(pub String);
330
331impl std::fmt::Display for StdinTooLarge {
332    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
333        write!(f, "{}", self.0)
334    }
335}
336
337impl std::error::Error for StdinTooLarge {}
338
339pub fn open_child_stdin(job_dir: &JobDir, stdin_file: Option<&str>) -> Result<std::process::Stdio> {
340    if let Some(path) = resolve_stdin_path(job_dir, stdin_file) {
341        let file = std::fs::File::open(&path)
342            .with_context(|| format!("open materialized stdin {}", path.display()))?;
343        Ok(std::process::Stdio::from(file))
344    } else {
345        Ok(std::process::Stdio::null())
346    }
347}
348
349pub const DEFAULT_STDIN_MAX_BYTES: u64 = 64 * 1024 * 1024; // 64 MiB
350
351pub fn materialize_stdin_for_job(
352    job_dir: &JobDir,
353    stdin: Option<&StdinSource>,
354    max_bytes: u64,
355) -> Result<Option<String>> {
356    materialize_stdin(job_dir, stdin, max_bytes)
357}
358
359/// Spawn the supervisor process and write the initial running state to `state.json`.
360///
361/// Returns the supervisor PID and the actual `started_at` timestamp.
362/// Also handles the Windows Job Object handshake before returning.
363pub fn spawn_supervisor_process(
364    job_dir: &JobDir,
365    params: SpawnSupervisorParams,
366) -> Result<(u32, String)> {
367    let started_at = now_rfc3339();
368
369    let exe = std::env::current_exe().context("resolve current exe")?;
370    let mut supervisor_cmd = Command::new(&exe);
371    supervisor_cmd
372        .arg("_supervise")
373        .arg("--job-id")
374        .arg(&params.job_id)
375        .arg("--supervise-root")
376        .arg(params.root.display().to_string())
377        .arg("--full-log")
378        .arg(&params.full_log_path);
379
380    if params.timeout_ms > 0 {
381        let timeout_seconds = params.timeout_ms.saturating_add(999) / 1000;
382        supervisor_cmd
383            .arg("--timeout")
384            .arg(timeout_seconds.to_string());
385    }
386    if params.kill_after_ms > 0 {
387        let kill_after_seconds = params.kill_after_ms.saturating_add(999) / 1000;
388        supervisor_cmd
389            .arg("--kill-after")
390            .arg(kill_after_seconds.to_string());
391    }
392    if let Some(ref cwd) = params.cwd {
393        supervisor_cmd.arg("--cwd").arg(cwd);
394    }
395    for env_file in &params.env_files {
396        supervisor_cmd.arg("--env-file").arg(env_file);
397    }
398    for env_var in &params.env_vars {
399        supervisor_cmd.arg("--env").arg(env_var);
400    }
401    if !params.inherit_env {
402        supervisor_cmd.arg("--no-inherit-env");
403    }
404    if let Some(ref stdin_file) = params.stdin_file {
405        supervisor_cmd.arg("--stdin-file").arg(stdin_file);
406    }
407    if params.progress_every_ms > 0 {
408        let progress_every_seconds = params.progress_every_ms.saturating_add(999) / 1000;
409        supervisor_cmd
410            .arg("--progress-every")
411            .arg(progress_every_seconds.to_string());
412    }
413    if let Some(ref nc) = params.notify_command {
414        supervisor_cmd.arg("--notify-command").arg(nc);
415    }
416    if let Some(ref nf) = params.notify_file {
417        supervisor_cmd.arg("--notify-file").arg(nf);
418    }
419    let wrapper_json =
420        serde_json::to_string(&params.shell_wrapper).context("serialize shell wrapper")?;
421    supervisor_cmd
422        .arg("--shell-wrapper-resolved")
423        .arg(&wrapper_json);
424
425    supervisor_cmd
426        .arg("--")
427        .args(&params.command)
428        .stdin(std::process::Stdio::null())
429        .stdout(std::process::Stdio::null())
430        .stderr(std::process::Stdio::null());
431
432    let supervisor = supervisor_cmd.spawn().context("spawn supervisor")?;
433    let supervisor_pid = supervisor.id();
434    debug!(supervisor_pid, "supervisor spawned");
435
436    // Write initial running state.
437    job_dir.init_state(supervisor_pid, &started_at)?;
438
439    // Windows Job Object handshake.
440    #[cfg(windows)]
441    {
442        let handshake_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
443        loop {
444            std::thread::sleep(std::time::Duration::from_millis(10));
445            if let Ok(current_state) = job_dir.read_state() {
446                let supervisor_updated = current_state
447                    .pid
448                    .map(|p| p != supervisor_pid)
449                    .unwrap_or(false)
450                    || *current_state.status() == crate::schema::JobStatus::Failed;
451                if supervisor_updated {
452                    if *current_state.status() == crate::schema::JobStatus::Failed {
453                        anyhow::bail!(
454                            "supervisor failed to assign child process to Job Object \
455                             (Windows MUST requirement); see stderr for details"
456                        );
457                    }
458                    debug!("supervisor confirmed Job Object assignment via state.json handshake");
459                    break;
460                }
461            }
462            if std::time::Instant::now() >= handshake_deadline {
463                debug!("supervisor handshake timed out; proceeding with initial state");
464                break;
465            }
466        }
467    }
468
469    Ok((supervisor_pid, started_at))
470}
471
472/// Pre-create empty log files (stdout.log, stderr.log, full.log) so they exist
473/// immediately after job creation, before the supervisor starts writing.
474pub fn pre_create_log_files(job_dir: &JobDir) -> Result<()> {
475    for log_path in [
476        job_dir.stdout_path(),
477        job_dir.stderr_path(),
478        job_dir.full_log_path(),
479    ] {
480        std::fs::OpenOptions::new()
481            .create(true)
482            .append(true)
483            .open(&log_path)
484            .with_context(|| format!("pre-create log file {}", log_path.display()))?;
485    }
486    Ok(())
487}
488
489/// Execute `run`: spawn job and return launch metadata immediately.
490pub fn execute(opts: RunOpts) -> Result<()> {
491    if opts.command.is_empty() {
492        anyhow::bail!("no command specified for run");
493    }
494
495    let elapsed_start = std::time::Instant::now();
496
497    let root = resolve_root(opts.root);
498    std::fs::create_dir_all(&root)
499        .with_context(|| format!("create jobs root {}", root.display()))?;
500
501    let job_id = generate_job_id(&root)?;
502    let created_at = now_rfc3339();
503
504    // Extract only the key names from KEY=VALUE env var strings (values are not persisted).
505    let env_keys: Vec<String> = opts
506        .env_vars
507        .iter()
508        .map(|kv| kv.split('=').next().unwrap_or(kv.as_str()).to_string())
509        .collect();
510
511    // Apply masking: replace values of masked keys with "***" in env_vars for metadata.
512    let masked_env_vars = mask_env_vars(&opts.env_vars, &opts.mask);
513
514    // Resolve the effective working directory for this job.
515    // If --cwd was specified, use that path; otherwise use the current process's working directory.
516    // Canonicalize the path for consistent comparison; fall back to absolute path on failure.
517    let effective_cwd = resolve_effective_cwd(opts.cwd);
518
519    // Build output-match config from definition-time options (same logic as `create` and `notify set`).
520    let on_output_match = crate::notify::build_output_match_config(
521        opts.output_pattern,
522        opts.output_match_type,
523        opts.output_stream,
524        opts.output_command,
525        opts.output_file,
526        None,
527    );
528
529    let notification =
530        if opts.notify_command.is_some() || opts.notify_file.is_some() || on_output_match.is_some()
531        {
532            Some(crate::schema::NotificationConfig {
533                notify_command: opts.notify_command.clone(),
534                notify_file: opts.notify_file.clone(),
535                on_output_match,
536            })
537        } else {
538            None
539        };
540
541    // Validate and deduplicate tags (preserving first-seen order).
542    let tags = dedup_tags(opts.tags)?;
543
544    let meta = JobMeta {
545        job: JobMetaJob { id: job_id.clone() },
546        schema_version: crate::schema::SCHEMA_VERSION.to_string(),
547        command: opts.command.clone(),
548        created_at: created_at.clone(),
549        root: root.display().to_string(),
550        env_keys,
551        env_vars: masked_env_vars.clone(),
552        // For `run`, env_vars_runtime is not populated because the supervisor
553        // is spawned immediately with the real values; no deferred start needed.
554        env_vars_runtime: vec![],
555        mask: opts.mask.clone(),
556        cwd: Some(effective_cwd),
557        notification,
558        // Execution-definition fields (used by start if ever applicable).
559        inherit_env: opts.inherit_env,
560        env_files: opts.env_files.clone(),
561        timeout_ms: opts.timeout_ms,
562        kill_after_ms: opts.kill_after_ms,
563        progress_every_ms: opts.progress_every_ms,
564        shell_wrapper: Some(opts.shell_wrapper.clone()),
565        stdin_file: None,
566        tags: tags.clone(),
567    };
568
569    validate_stdin_source(opts.stdin.as_ref())?;
570
571    let job_dir = JobDir::create(&root, &job_id, &meta)?;
572    let stdin_file =
573        materialize_stdin_for_job(&job_dir, opts.stdin.as_ref(), opts.stdin_max_bytes)?;
574    if stdin_file.is_some() {
575        let mut meta_with_stdin = meta.clone();
576        meta_with_stdin.stdin_file = stdin_file.clone();
577        job_dir.write_meta_atomic(&meta_with_stdin)?;
578    }
579    info!(job_id = %job_id, "created job directory");
580
581    // Determine the full.log path (may be overridden by --log).
582    let full_log_path = if let Some(log) = opts.log {
583        log.to_string()
584    } else {
585        job_dir.full_log_path().display().to_string()
586    };
587
588    // Pre-create empty log files so they exist before the supervisor starts.
589    pre_create_log_files(&job_dir)?;
590
591    // Spawn the supervisor using the shared helper.
592    // Note: masking is handled by `run` (meta.json + JSON response). The supervisor
593    // receives the real env var values so the child process can use them as intended.
594    let (_supervisor_pid, _started_at) = spawn_supervisor_process(
595        &job_dir,
596        SpawnSupervisorParams {
597            job_id: job_id.clone(),
598            root: root.clone(),
599            full_log_path: full_log_path.clone(),
600            timeout_ms: opts.timeout_ms,
601            kill_after_ms: opts.kill_after_ms,
602            cwd: opts.cwd.map(|s| s.to_string()),
603            env_vars: opts.env_vars.clone(),
604            env_files: opts.env_files.clone(),
605            inherit_env: opts.inherit_env,
606            stdin_file: stdin_file.clone(),
607            progress_every_ms: opts.progress_every_ms,
608            notify_command: opts.notify_command.clone(),
609            notify_file: opts.notify_file.clone(),
610            shell_wrapper: opts.shell_wrapper.clone(),
611            command: opts.command.clone(),
612        },
613    )?;
614
615    // Compute absolute paths for stdout.log and stderr.log.
616    let stdout_log_path = job_dir.stdout_path().display().to_string();
617    let stderr_log_path = job_dir.stderr_path().display().to_string();
618
619    let observation = observe_inline_output(
620        &job_dir,
621        opts.wait,
622        opts.until_seconds,
623        opts.forever,
624        opts.max_bytes,
625    )?;
626    let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
627    let compression = crate::compress::compress(crate::compress::CompressionInput {
628        command: &opts.command,
629        stdout: &observation.stdout,
630        stderr: &observation.stderr,
631        stdout_original_bytes: observation.stdout_total_bytes,
632        stderr_original_bytes: observation.stderr_total_bytes,
633        mode: opts.compression_mode,
634    });
635
636    if !opts.no_auto_gc {
637        let mut auto_cfg = opts.auto_gc_config.clone();
638        if let Some(v) = opts.auto_gc_older_than {
639            auto_cfg.older_than = v;
640        }
641        if let Some(v) = opts.auto_gc_max_jobs {
642            auto_cfg.max_jobs = usize::try_from(v).ok();
643        }
644        if let Some(v) = opts.auto_gc_max_bytes {
645            auto_cfg.max_bytes = Some(v);
646        }
647        crate::gc::maybe_run_auto_gc(&root, &auto_cfg);
648    }
649
650    let response = Response::new(
651        "run",
652        RunData {
653            job_id,
654            state: observation.state,
655            tags,
656            // Include masked env_vars in the JSON response so callers can inspect
657            // which variables were set (with secret values replaced by "***").
658            env_vars: masked_env_vars,
659            stdout_log_path,
660            stderr_log_path,
661            elapsed_ms,
662            waited_ms: observation.waited_ms,
663            stdout: observation.stdout,
664            stderr: observation.stderr,
665            stdout_range: observation.stdout_range,
666            stderr_range: observation.stderr_range,
667            stdout_total_bytes: observation.stdout_total_bytes,
668            stderr_total_bytes: observation.stderr_total_bytes,
669            encoding: observation.encoding,
670            exit_code: observation.exit_code,
671            finished_at: observation.finished_at,
672            signal: observation.signal,
673            duration_ms: observation.duration_ms,
674            compression,
675        },
676    );
677    response.print();
678    Ok(())
679}
680
681/// Options for the `_supervise` internal sub-command.
682///
683/// Masking is the responsibility of `run` (which writes masked values to meta.json
684/// and includes them in the JSON response). The supervisor only needs the real
685/// environment variable values to launch the child process correctly.
686#[derive(Debug)]
687pub struct SuperviseOpts<'a> {
688    pub job_id: &'a str,
689    pub root: &'a Path,
690    pub command: &'a [String],
691    /// Override full.log path; None = use job dir default.
692    pub full_log: Option<&'a str>,
693    /// Timeout in milliseconds; 0 = no timeout.
694    pub timeout_ms: u64,
695    /// Milliseconds after SIGTERM before SIGKILL; 0 = immediate SIGKILL.
696    pub kill_after_ms: u64,
697    /// Working directory for the child process.
698    pub cwd: Option<&'a str>,
699    /// Environment variables as KEY=VALUE strings (real values, not masked).
700    pub env_vars: Vec<String>,
701    /// Paths to env files, applied in order.
702    pub env_files: Vec<String>,
703    /// Whether to inherit the current process environment.
704    pub inherit_env: bool,
705    /// Materialized stdin file path (relative to job dir) to feed child stdin.
706    pub stdin_file: Option<String>,
707    /// Interval (ms) for state.json updated_at refresh; 0 = disabled.
708    pub progress_every_ms: u64,
709    /// Shell command string for command notification sink; executed via platform shell.
710    /// None = no command sink.
711    pub notify_command: Option<String>,
712    /// File path for NDJSON notification sink; None = no file sink.
713    pub notify_file: Option<String>,
714    /// Resolved shell wrapper argv used to execute command strings.
715    pub shell_wrapper: Vec<String>,
716}
717
718/// Resolve the effective working directory for a job.
719///
720/// If `cwd_override` is `Some`, use that path as the base. Otherwise use the
721/// current process working directory. In either case, attempt to canonicalize
722/// the path for consistent comparison; on failure, fall back to the absolute
723/// path representation (avoids symlink / permission issues on some systems).
724pub fn observe_inline_output(
725    job_dir: &JobDir,
726    wait: bool,
727    until_seconds: u64,
728    forever: bool,
729    max_bytes: u64,
730) -> Result<InlineObservation> {
731    let started = std::time::Instant::now();
732    let deadline = if wait {
733        if forever {
734            None
735        } else {
736            Some(started + std::time::Duration::from_secs(until_seconds))
737        }
738    } else {
739        Some(started)
740    };
741
742    if wait {
743        loop {
744            let state = job_dir.read_state()?;
745            let has_output = std::fs::metadata(job_dir.stdout_path())
746                .map(|m| m.len() > 0)
747                .unwrap_or(false)
748                || std::fs::metadata(job_dir.stderr_path())
749                    .map(|m| m.len() > 0)
750                    .unwrap_or(false);
751
752            if !state.status().is_non_terminal() || has_output {
753                break;
754            }
755
756            if let Some(dl) = deadline
757                && std::time::Instant::now() >= dl
758            {
759                break;
760            }
761
762            std::thread::sleep(std::time::Duration::from_millis(100));
763        }
764    }
765
766    let state = job_dir.read_state()?;
767    let stdout = job_dir.read_head_metrics("stdout.log", max_bytes);
768    let stderr = job_dir.read_head_metrics("stderr.log", max_bytes);
769
770    Ok(InlineObservation {
771        waited_ms: if wait {
772            started.elapsed().as_millis() as u64
773        } else {
774            0
775        },
776        stdout: stdout.head,
777        stderr: stderr.head,
778        stdout_range: stdout.range,
779        stderr_range: stderr.range,
780        stdout_total_bytes: stdout.observed_bytes,
781        stderr_total_bytes: stderr.observed_bytes,
782        encoding: "utf-8-lossy".to_string(),
783        state: state.status().as_str().to_string(),
784        exit_code: state.exit_code(),
785        signal: state.signal().map(|s| s.to_string()),
786        duration_ms: state.duration_ms(),
787        finished_at: state.finished_at,
788    })
789}
790
791pub fn resolve_effective_cwd(cwd_override: Option<&str>) -> String {
792    let base = match cwd_override {
793        Some(p) => std::path::PathBuf::from(p),
794        None => std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")),
795    };
796
797    // Prefer canonicalized (resolves symlinks); fall back to making the path absolute.
798    match base.canonicalize() {
799        Ok(canonical) => canonical.display().to_string(),
800        Err(_) => {
801            // If base is already absolute, use as-is; otherwise prepend cwd.
802            if base.is_absolute() {
803                base.display().to_string()
804            } else {
805                // Best-effort: join with cwd, ignore errors.
806                let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
807                cwd.join(base).display().to_string()
808            }
809        }
810    }
811}
812
813/// Mask the values of specified keys in a list of KEY=VALUE strings.
814/// Keys listed in `mask_keys` will have their value replaced with "***".
815pub fn mask_env_vars(env_vars: &[String], mask_keys: &[String]) -> Vec<String> {
816    if mask_keys.is_empty() {
817        return env_vars.to_vec();
818    }
819    env_vars
820        .iter()
821        .map(|s| {
822            let (key, _val) = parse_env_var(s);
823            if mask_keys.iter().any(|k| k == &key) {
824                format!("{key}=***")
825            } else {
826                s.clone()
827            }
828        })
829        .collect()
830}
831
832/// Parse a single KEY=VALUE or KEY= string into (key, value).
833fn parse_env_var(s: &str) -> (String, String) {
834    if let Some(pos) = s.find('=') {
835        (s[..pos].to_string(), s[pos + 1..].to_string())
836    } else {
837        (s.to_string(), String::new())
838    }
839}
840
841/// Load environment variables from a .env-style file.
842/// Supports KEY=VALUE lines; lines starting with '#' and empty lines are ignored.
843fn load_env_file(path: &str) -> Result<Vec<(String, String)>> {
844    let contents =
845        std::fs::read_to_string(path).with_context(|| format!("read env-file {path}"))?;
846    let mut vars = Vec::new();
847    for line in contents.lines() {
848        let line = line.trim();
849        if line.is_empty() || line.starts_with('#') {
850            continue;
851        }
852        vars.push(parse_env_var(line));
853    }
854    Ok(vars)
855}
856
857/// Shared state for output-match checking, used by streaming threads in `supervise`.
858///
859/// Reloads `meta.json` on every observed line so that a `notify set` update is
860/// visible for the very next line, regardless of how recently the last reload
861/// occurred.  Multiple streaming threads share the same checker via `Arc`; the
862/// internal `Mutex` serialises access.
863struct OutputMatchChecker {
864    job_dir_path: std::path::PathBuf,
865    shell_wrapper: Vec<String>,
866    inner: std::sync::Mutex<OutputMatchInner>,
867}
868
869struct OutputMatchInner {
870    config: Option<crate::schema::NotificationConfig>,
871}
872
873impl OutputMatchChecker {
874    fn new(
875        job_dir_path: std::path::PathBuf,
876        shell_wrapper: Vec<String>,
877        initial_config: Option<crate::schema::NotificationConfig>,
878    ) -> Self {
879        Self {
880            job_dir_path,
881            shell_wrapper,
882            inner: std::sync::Mutex::new(OutputMatchInner {
883                config: initial_config,
884            }),
885        }
886    }
887
888    /// Check a newly observed output line for a configured match.
889    ///
890    /// Reloads `meta.json` on every call so that `notify set` updates are
891    /// visible for the next line without any delay.
892    /// Dispatches `job.output.matched` events outside the lock to avoid blocking
893    /// other streaming threads.
894    fn check_line(&self, line: &str, stream: &str) {
895        use crate::schema::{OutputMatchStream, OutputMatchType};
896
897        // Lock, reload, evaluate match, then release before dispatching.
898        let match_info: Option<crate::schema::OutputMatchConfig> = {
899            let mut inner = self.inner.lock().unwrap();
900
901            // Reload config on every line to pick up `notify set` updates immediately.
902            {
903                let meta_path = self.job_dir_path.join("meta.json");
904                if let Ok(raw) = std::fs::read(&meta_path)
905                    && let Ok(meta) = serde_json::from_slice::<crate::schema::JobMeta>(&raw)
906                {
907                    inner.config = meta.notification;
908                }
909            }
910
911            let Some(ref notification) = inner.config else {
912                return;
913            };
914            let Some(ref match_cfg) = notification.on_output_match else {
915                return;
916            };
917
918            // Check stream filter.
919            let stream_matches = match match_cfg.stream {
920                OutputMatchStream::Stdout => stream == "stdout",
921                OutputMatchStream::Stderr => stream == "stderr",
922                OutputMatchStream::Either => true,
923            };
924            if !stream_matches {
925                return;
926            }
927
928            // Check pattern.
929            let matched = match &match_cfg.match_type {
930                OutputMatchType::Contains => line.contains(&match_cfg.pattern),
931                OutputMatchType::Regex => regex::Regex::new(&match_cfg.pattern)
932                    .map(|re| re.is_match(line))
933                    .unwrap_or(false),
934            };
935
936            if matched {
937                Some(match_cfg.clone())
938            } else {
939                None
940            }
941        }; // Lock released.
942
943        if let Some(match_cfg) = match_info {
944            self.dispatch_match(line, stream, &match_cfg);
945        }
946    }
947
948    /// Dispatch a `job.output.matched` event and append a delivery record to
949    /// `notification_events.ndjson`.  Failures are non-fatal.
950    fn dispatch_match(
951        &self,
952        line: &str,
953        stream: &str,
954        match_cfg: &crate::schema::OutputMatchConfig,
955    ) {
956        use std::io::Write;
957
958        let job_id = self
959            .job_dir_path
960            .file_name()
961            .and_then(|n| n.to_str())
962            .unwrap_or("unknown");
963
964        let stdout_log_path = self.job_dir_path.join("stdout.log").display().to_string();
965        let stderr_log_path = self.job_dir_path.join("stderr.log").display().to_string();
966        let events_path = self.job_dir_path.join("notification_events.ndjson");
967        let events_path_str = events_path.display().to_string();
968
969        let match_type_str = match &match_cfg.match_type {
970            crate::schema::OutputMatchType::Contains => "contains",
971            crate::schema::OutputMatchType::Regex => "regex",
972        };
973
974        let event = crate::schema::OutputMatchEvent {
975            schema_version: crate::schema::SCHEMA_VERSION.to_string(),
976            event_type: "job.output.matched".to_string(),
977            job_id: job_id.to_string(),
978            pattern: match_cfg.pattern.clone(),
979            match_type: match_type_str.to_string(),
980            stream: stream.to_string(),
981            line: line.to_string(),
982            stdout_log_path,
983            stderr_log_path,
984        };
985
986        let event_json = serde_json::to_string(&event).unwrap_or_default();
987        let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
988
989        if let Some(ref cmd) = match_cfg.command {
990            delivery_results.push(dispatch_command_sink(
991                cmd,
992                &event_json,
993                job_id,
994                &events_path_str,
995                &self.shell_wrapper,
996                "job.output.matched",
997            ));
998        }
999        if let Some(ref file_path) = match_cfg.file {
1000            delivery_results.push(dispatch_file_sink(file_path, &event_json));
1001        }
1002
1003        // Append delivery record to notification_events.ndjson.
1004        let record = crate::schema::OutputMatchEventRecord {
1005            event,
1006            delivery_results,
1007        };
1008        if let Ok(record_json) = serde_json::to_string(&record)
1009            && let Ok(mut f) = std::fs::OpenOptions::new()
1010                .create(true)
1011                .append(true)
1012                .open(&events_path)
1013        {
1014            let _ = writeln!(f, "{record_json}");
1015        }
1016    }
1017}
1018
1019/// Stream bytes from a child process output pipe to an individual log file and
1020/// to the shared `full.log`.
1021///
1022/// Reads byte chunks (not lines) so that output without a trailing newline is
1023/// still captured in the individual log immediately.  The `full.log` format
1024/// `"<RFC3339> [LABEL] <line>"` is maintained via a line-accumulation buffer:
1025/// bytes are appended to the buffer until a newline is found, at which point a
1026/// formatted line is written to `full.log`.  Any remaining bytes at EOF are
1027/// flushed as a final line.
1028///
1029/// The optional `on_line` callback is invoked for each complete line (without
1030/// the trailing newline) and is used to drive output-match checking.
1031///
1032/// This helper is used by both the stdout and stderr monitoring threads inside
1033/// [`supervise`], replacing the previously duplicated per-stream implementations.
1034/// Buffer size (8192 bytes) and newline-split logic are preserved unchanged.
1035fn stream_to_logs<R, F>(
1036    stream: R,
1037    log_path: &std::path::Path,
1038    full_log: std::sync::Arc<std::sync::Mutex<std::fs::File>>,
1039    label: &str,
1040    on_line: Option<F>,
1041) where
1042    R: std::io::Read,
1043    F: Fn(&str),
1044{
1045    use std::io::Write;
1046    let mut log_file = std::fs::File::create(log_path).expect("create stream log file in thread");
1047    let mut stream = stream;
1048    let mut buf = [0u8; 8192];
1049    // Incomplete-line buffer for full.log formatting.
1050    let mut line_buf: Vec<u8> = Vec::new();
1051    loop {
1052        match stream.read(&mut buf) {
1053            Ok(0) => break, // EOF
1054            Ok(n) => {
1055                let chunk = &buf[..n];
1056                // Write raw bytes to the individual log (captures partial lines too).
1057                let _ = log_file.write_all(chunk);
1058                // Accumulate bytes for full.log line formatting.
1059                for &b in chunk {
1060                    if b == b'\n' {
1061                        let line = String::from_utf8_lossy(&line_buf);
1062                        if let Ok(mut fl) = full_log.lock() {
1063                            let ts = now_rfc3339();
1064                            let _ = writeln!(fl, "{ts} [{label}] {line}");
1065                        }
1066                        if let Some(ref f) = on_line {
1067                            f(&line);
1068                        }
1069                        line_buf.clear();
1070                    } else {
1071                        line_buf.push(b);
1072                    }
1073                }
1074            }
1075            Err(_) => break,
1076        }
1077    }
1078    // Flush any remaining incomplete line to full.log and trigger callback.
1079    if !line_buf.is_empty() {
1080        let line = String::from_utf8_lossy(&line_buf);
1081        if let Ok(mut fl) = full_log.lock() {
1082            let ts = now_rfc3339();
1083            let _ = writeln!(fl, "{ts} [{label}] {line}");
1084        }
1085        if let Some(ref f) = on_line {
1086            f(&line);
1087        }
1088    }
1089}
1090
1091/// Internal supervisor sub-command.
1092///
1093/// Runs the target command, streams stdout/stderr to individual log files
1094/// (`stdout.log`, `stderr.log`) **and** to the combined `full.log`, then
1095/// updates `state.json` when the process finishes.
1096///
1097/// On Windows, the child process is assigned to a named Job Object so that
1098/// the entire process tree can be terminated with a single `kill` call.
1099/// The Job Object name is recorded in `state.json` as `windows_job_name`.
1100pub fn supervise(opts: SuperviseOpts) -> Result<()> {
1101    use std::sync::{Arc, Mutex};
1102
1103    let job_id = opts.job_id;
1104    let root = opts.root;
1105    let command = opts.command;
1106
1107    if command.is_empty() {
1108        anyhow::bail!("supervisor: no command");
1109    }
1110
1111    let job_dir = JobDir::open(root, job_id)?;
1112
1113    // Read meta.json for notification config and cwd (used in completion event).
1114    let meta = job_dir.read_meta()?;
1115    // Use the actual execution start time, not meta.created_at.
1116    // For `run`, these are nearly identical. For `start`, the job may have
1117    // been created long before it was started.
1118    let started_at = now_rfc3339();
1119
1120    // Determine full.log path.
1121    let full_log_path = if let Some(p) = opts.full_log {
1122        std::path::PathBuf::from(p)
1123    } else {
1124        job_dir.full_log_path()
1125    };
1126
1127    // Create the full.log file (shared between stdout/stderr threads).
1128    // Ensure parent directories exist for custom paths.
1129    if let Some(parent) = full_log_path.parent() {
1130        std::fs::create_dir_all(parent)
1131            .with_context(|| format!("create dir for full.log: {}", parent.display()))?;
1132    }
1133    let full_log_file = std::fs::File::create(&full_log_path).context("create full.log")?;
1134    let full_log = Arc::new(Mutex::new(full_log_file));
1135
1136    // Execute command through the shell wrapper.
1137    //
1138    // Two launch modes:
1139    //
1140    //   String mode (command.len() == 1):  The single element is a shell command
1141    //   string passed as-is to the wrapper (e.g. `"echo hello && ls"` preserves
1142    //   shell operators).  The wrapper process is the workload boundary.
1143    //
1144    //   Argv mode (command.len() > 1):  The wrapper is used for login-shell
1145    //   environment initialisation but immediately hands off to the target via
1146    //   `exec "$@"`.  The shell replaces itself so the observed child PID and
1147    //   lifecycle align with the intended workload, not the wrapper.
1148    //
1149    // --notify-command delivery always uses the wrapper in string mode
1150    // (see dispatch_command_sink); this change only affects job argv launches.
1151    if opts.shell_wrapper.is_empty() {
1152        anyhow::bail!("supervisor: shell wrapper must not be empty");
1153    }
1154    let mut child_cmd = Command::new(&opts.shell_wrapper[0]);
1155    if command.len() == 1 {
1156        // Shell-string mode: pass the command string to the wrapper as-is.
1157        child_cmd.args(&opts.shell_wrapper[1..]).arg(&command[0]);
1158    } else {
1159        // Argv mode: launch the workload via the shell wrapper.
1160        //
1161        // On Unix the wrapper hands off to the workload via `exec "$@"` so the
1162        // shell replaces itself and the observed PID / lifecycle align with the
1163        // intended workload, not the wrapper.
1164        //
1165        // On non-Unix platforms (Windows) there is no POSIX `exec`; the wrapper
1166        // is invoked in shell-string mode with the argv joined into a single
1167        // quoted command string, preserving the existing cmd/C semantics.
1168        #[cfg(unix)]
1169        {
1170            // `--` serves as $0; argv elements become $1..$n so `$@` expands
1171            // to the full workload argv.
1172            child_cmd
1173                .args(&opts.shell_wrapper[1..])
1174                .arg("exec \"$@\"")
1175                .arg("--")
1176                .args(command);
1177        }
1178        #[cfg(not(unix))]
1179        {
1180            // Windows fallback: join argv into a shell-compatible string and
1181            // pass it to the wrapper as a single command string (same as
1182            // shell-string mode), so cmd /C semantics are preserved.
1183            let joined = command
1184                .iter()
1185                .map(|a| {
1186                    if a.contains(' ') {
1187                        format!("\"{}\"", a)
1188                    } else {
1189                        a.clone()
1190                    }
1191                })
1192                .collect::<Vec<_>>()
1193                .join(" ");
1194            child_cmd.args(&opts.shell_wrapper[1..]).arg(joined);
1195        }
1196    }
1197
1198    if opts.inherit_env {
1199        // Start with the current environment (default).
1200    } else {
1201        child_cmd.env_clear();
1202    }
1203
1204    // Apply env files in order.
1205    for env_file in &opts.env_files {
1206        let vars = load_env_file(env_file)?;
1207        for (k, v) in vars {
1208            child_cmd.env(&k, &v);
1209        }
1210    }
1211
1212    // Apply --env KEY=VALUE overrides (applied after env-files).
1213    for env_var in &opts.env_vars {
1214        let (k, v) = parse_env_var(env_var);
1215        child_cmd.env(&k, &v);
1216    }
1217
1218    // Set working directory if specified.
1219    if let Some(cwd) = opts.cwd {
1220        child_cmd.current_dir(cwd);
1221    }
1222
1223    // Put the child in its own process group so that timeout signals
1224    // (SIGTERM / SIGKILL) reach the entire process tree, not just the
1225    // shell wrapper.  Without this, `sh -lc "sleep 60"` would absorb
1226    // the signal while the grandchild (`sleep`) keeps running.
1227    #[cfg(unix)]
1228    {
1229        use std::os::unix::process::CommandExt;
1230        // SAFETY: setsid is async-signal-safe and called before exec.
1231        unsafe {
1232            child_cmd.pre_exec(|| {
1233                libc::setsid();
1234                Ok(())
1235            });
1236        }
1237    }
1238
1239    // Spawn the child with piped stdout/stderr so we can tee to logs.
1240    let child_stdin = open_child_stdin(&job_dir, opts.stdin_file.as_deref())?;
1241    let mut child = child_cmd
1242        .stdin(child_stdin)
1243        .stdout(std::process::Stdio::piped())
1244        .stderr(std::process::Stdio::piped())
1245        .spawn()
1246        .context("supervisor: spawn child")?;
1247
1248    let pid = child.id();
1249    info!(job_id, pid, "child process started");
1250
1251    // On Windows, assign child to a named Job Object for process-tree management.
1252    // The job name is derived from the job_id so that `kill` can look it up.
1253    // Assignment is a MUST requirement on Windows: if it fails, the supervisor
1254    // kills the child process and updates state.json to "failed" before returning
1255    // an error, so that the run front-end (which may have already returned) can
1256    // detect the failure via state.json on next poll.
1257    #[cfg(windows)]
1258    let windows_job_name = {
1259        match assign_to_job_object(job_id, pid) {
1260            Ok(name) => Some(name),
1261            Err(e) => {
1262                // Job Object assignment failed. Per design.md this is a MUST
1263                // requirement on Windows. Kill the child process and update
1264                // state.json to "failed" so the run front-end can detect it.
1265                let kill_err = child.kill();
1266                let _ = child.wait(); // reap to avoid zombies
1267
1268                let failed_state = JobState {
1269                    job: JobStateJob {
1270                        id: job_id.to_string(),
1271                        status: JobStatus::Failed,
1272                        started_at: Some(started_at.clone()),
1273                    },
1274                    result: JobStateResult {
1275                        exit_code: None,
1276                        signal: None,
1277                        duration_ms: None,
1278                    },
1279                    pid: Some(pid),
1280                    finished_at: Some(now_rfc3339()),
1281                    updated_at: now_rfc3339(),
1282                    windows_job_name: None,
1283                };
1284                // Best-effort: if writing state fails, we still propagate the
1285                // original assignment error.
1286                let _ = job_dir.write_state(&failed_state);
1287
1288                // Dispatch completion event for the failed state if notifications are configured.
1289                // This mirrors the dispatch logic in the normal exit path so that callers
1290                // receive a job.finished event even when the supervisor fails early (Windows only).
1291                if opts.notify_command.is_some() || opts.notify_file.is_some() {
1292                    let finished_at_ts =
1293                        failed_state.finished_at.clone().unwrap_or_else(now_rfc3339);
1294                    let stdout_log = job_dir.stdout_path().display().to_string();
1295                    let stderr_log = job_dir.stderr_path().display().to_string();
1296                    let fail_event = crate::schema::CompletionEvent {
1297                        schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1298                        event_type: "job.finished".to_string(),
1299                        job_id: job_id.to_string(),
1300                        state: JobStatus::Failed.as_str().to_string(),
1301                        command: meta.command.clone(),
1302                        cwd: meta.cwd.clone(),
1303                        started_at: started_at.clone(),
1304                        finished_at: finished_at_ts,
1305                        duration_ms: None,
1306                        exit_code: None,
1307                        signal: None,
1308                        stdout_log_path: stdout_log,
1309                        stderr_log_path: stderr_log,
1310                    };
1311                    let fail_event_json = serde_json::to_string(&fail_event).unwrap_or_default();
1312                    let fail_event_path = job_dir.completion_event_path().display().to_string();
1313                    let mut fail_delivery_results: Vec<crate::schema::SinkDeliveryResult> =
1314                        Vec::new();
1315                    if let Err(we) = job_dir.write_completion_event_atomic(
1316                        &crate::schema::CompletionEventRecord {
1317                            event: fail_event.clone(),
1318                            delivery_results: vec![],
1319                        },
1320                    ) {
1321                        warn!(
1322                            job_id,
1323                            error = %we,
1324                            "failed to write initial completion_event.json for failed job"
1325                        );
1326                    }
1327                    if let Some(ref shell_cmd) = opts.notify_command {
1328                        fail_delivery_results.push(dispatch_command_sink(
1329                            shell_cmd,
1330                            &fail_event_json,
1331                            job_id,
1332                            &fail_event_path,
1333                            &opts.shell_wrapper,
1334                            "job.finished",
1335                        ));
1336                    }
1337                    if let Some(ref file_path) = opts.notify_file {
1338                        fail_delivery_results.push(dispatch_file_sink(file_path, &fail_event_json));
1339                    }
1340                    if let Err(we) = job_dir.write_completion_event_atomic(
1341                        &crate::schema::CompletionEventRecord {
1342                            event: fail_event,
1343                            delivery_results: fail_delivery_results,
1344                        },
1345                    ) {
1346                        warn!(
1347                            job_id,
1348                            error = %we,
1349                            "failed to update completion_event.json with delivery results for failed job"
1350                        );
1351                    }
1352                }
1353
1354                if let Err(ke) = kill_err {
1355                    return Err(anyhow::anyhow!(
1356                        "supervisor: failed to assign pid {pid} to Job Object \
1357                         (Windows MUST requirement): {e}; also failed to kill child: {ke}"
1358                    ));
1359                }
1360                return Err(anyhow::anyhow!(
1361                    "supervisor: failed to assign pid {pid} to Job Object \
1362                     (Windows MUST requirement); child process was killed; \
1363                     consider running outside a nested Job Object environment: {e}"
1364                ));
1365            }
1366        }
1367    };
1368    #[cfg(not(windows))]
1369    let windows_job_name: Option<String> = None;
1370
1371    // Update state.json with real child PID and Windows Job Object name.
1372    // On Windows, windows_job_name is always Some at this point (guaranteed
1373    // by the MUST requirement above), so state.json will always contain the
1374    // Job Object identifier while the job is running.
1375    let state = JobState {
1376        job: JobStateJob {
1377            id: job_id.to_string(),
1378            status: JobStatus::Running,
1379            started_at: Some(started_at.clone()),
1380        },
1381        result: JobStateResult {
1382            exit_code: None,
1383            signal: None,
1384            duration_ms: None,
1385        },
1386        pid: Some(pid),
1387        finished_at: None,
1388        updated_at: now_rfc3339(),
1389        windows_job_name,
1390    };
1391    job_dir.write_state(&state)?;
1392
1393    let child_start_time = std::time::Instant::now();
1394
1395    // Take stdout/stderr handles before moving child.
1396    let child_stdout = child.stdout.take().expect("child stdout piped");
1397    let child_stderr = child.stderr.take().expect("child stderr piped");
1398
1399    // Create shared output-match checker from the initial meta notification config.
1400    let match_checker = std::sync::Arc::new(OutputMatchChecker::new(
1401        job_dir.path.clone(),
1402        opts.shell_wrapper.clone(),
1403        meta.notification.clone(),
1404    ));
1405
1406    // Completion channels for log threads: each thread sends `()` after stream_to_logs returns.
1407    // Used for bounded joins below (allows supervisor to exit promptly when descendants
1408    // hold inherited pipe ends open indefinitely).
1409    let (tx_stdout_done, rx_stdout_done) = std::sync::mpsc::channel::<()>();
1410    let (tx_stderr_done, rx_stderr_done) = std::sync::mpsc::channel::<()>();
1411
1412    // Thread: read stdout, write to stdout.log and full.log.
1413    let stdout_log_path = job_dir.stdout_path();
1414    let full_log_stdout = Arc::clone(&full_log);
1415    let match_checker_stdout = std::sync::Arc::clone(&match_checker);
1416    let t_stdout = std::thread::spawn(move || {
1417        stream_to_logs(
1418            child_stdout,
1419            &stdout_log_path,
1420            full_log_stdout,
1421            "STDOUT",
1422            Some(move |line: &str| match_checker_stdout.check_line(line, "stdout")),
1423        );
1424        let _ = tx_stdout_done.send(());
1425    });
1426
1427    // Thread: read stderr, write to stderr.log and full.log.
1428    let stderr_log_path = job_dir.stderr_path();
1429    let full_log_stderr = Arc::clone(&full_log);
1430    let match_checker_stderr = std::sync::Arc::clone(&match_checker);
1431    let t_stderr = std::thread::spawn(move || {
1432        stream_to_logs(
1433            child_stderr,
1434            &stderr_log_path,
1435            full_log_stderr,
1436            "STDERR",
1437            Some(move |line: &str| match_checker_stderr.check_line(line, "stderr")),
1438        );
1439        let _ = tx_stderr_done.send(());
1440    });
1441
1442    // Timeout / kill-after / progress-every handling.
1443    // We spawn a watcher thread to handle timeout and periodic state.json updates.
1444    let timeout_ms = opts.timeout_ms;
1445    let kill_after_ms = opts.kill_after_ms;
1446    let progress_every_ms = opts.progress_every_ms;
1447    let watcher_job_dir = JobDir {
1448        path: job_dir.path.clone(),
1449        job_id: job_id.to_string(),
1450    };
1451    let job_id_str = job_id.to_string();
1452
1453    // Use an atomic flag to signal the watcher thread when the child has exited.
1454    use std::sync::atomic::{AtomicBool, Ordering};
1455    let child_done = Arc::new(AtomicBool::new(false));
1456
1457    let watcher = if timeout_ms > 0 || progress_every_ms > 0 {
1458        let child_done_clone = Arc::clone(&child_done);
1459        Some(std::thread::spawn(move || {
1460            let start = std::time::Instant::now();
1461            let timeout_dur = if timeout_ms > 0 {
1462                Some(std::time::Duration::from_millis(timeout_ms))
1463            } else {
1464                None
1465            };
1466            let progress_dur = if progress_every_ms > 0 {
1467                Some(std::time::Duration::from_millis(progress_every_ms))
1468            } else {
1469                None
1470            };
1471
1472            let poll_interval = std::time::Duration::from_millis(100);
1473
1474            loop {
1475                std::thread::sleep(poll_interval);
1476
1477                // Exit the watcher loop if the child process has finished.
1478                if child_done_clone.load(Ordering::Relaxed) {
1479                    break;
1480                }
1481
1482                let elapsed = start.elapsed();
1483
1484                // Check for timeout.
1485                if let Some(td) = timeout_dur
1486                    && elapsed >= td
1487                {
1488                    info!(job_id = %job_id_str, "timeout reached, sending SIGTERM to process group");
1489                    // Send SIGTERM to the entire process group (negative PID).
1490                    // The child was placed in its own session/group via setsid.
1491                    #[cfg(unix)]
1492                    {
1493                        unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGTERM) };
1494                    }
1495                    // If kill_after > 0, wait kill_after ms then SIGKILL.
1496                    if kill_after_ms > 0 {
1497                        std::thread::sleep(std::time::Duration::from_millis(kill_after_ms));
1498                        info!(job_id = %job_id_str, "kill-after elapsed, sending SIGKILL to process group");
1499                        #[cfg(unix)]
1500                        {
1501                            unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1502                        }
1503                    } else {
1504                        // Immediate SIGKILL to the process group.
1505                        #[cfg(unix)]
1506                        {
1507                            unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1508                        }
1509                    }
1510                    break;
1511                }
1512
1513                // Progress-every: update updated_at periodically.
1514                if let Some(pd) = progress_dur {
1515                    let elapsed_ms = elapsed.as_millis() as u64;
1516                    let pd_ms = pd.as_millis() as u64;
1517                    let poll_ms = poll_interval.as_millis() as u64;
1518                    if elapsed_ms % pd_ms < poll_ms {
1519                        // Read, update updated_at, write back atomically so concurrent
1520                        // status/wait/run observers never parse a truncated state file.
1521                        if let Ok(mut st) = watcher_job_dir.read_state() {
1522                            st.updated_at = now_rfc3339();
1523                            let _ = watcher_job_dir.write_state(&st);
1524                        }
1525                    }
1526                }
1527            }
1528        }))
1529    } else {
1530        None
1531    };
1532
1533    // Wait for child to finish.
1534    let exit_status = child.wait().context("wait for child")?;
1535
1536    // Signal the watcher that the child has finished so it can exit its loop.
1537    child_done.store(true, Ordering::Relaxed);
1538
1539    // Persist terminal state immediately after the wrapped root process exits.
1540    // This must happen BEFORE joining log threads, because log threads block on
1541    // EOF of stdout/stderr pipes and descendant processes that inherited those
1542    // pipes may keep them open indefinitely. Persisting state here ensures that
1543    // `status` and `wait` can observe the terminal state without waiting for
1544    // all descendants to close their inherited handles.
1545    let duration_ms = child_start_time.elapsed().as_millis() as u64;
1546    let exit_code = exit_status.code();
1547    let finished_at = now_rfc3339();
1548
1549    // Detect signal-killed processes on Unix for accurate state and completion event.
1550    #[cfg(unix)]
1551    let (terminal_status, signal_name) = {
1552        use std::os::unix::process::ExitStatusExt;
1553        if let Some(sig) = exit_status.signal() {
1554            (JobStatus::Killed, Some(sig.to_string()))
1555        } else {
1556            (JobStatus::Exited, None)
1557        }
1558    };
1559    #[cfg(not(unix))]
1560    let (terminal_status, signal_name) = (JobStatus::Exited, None::<String>);
1561
1562    let state = JobState {
1563        job: JobStateJob {
1564            id: job_id.to_string(),
1565            status: terminal_status.clone(),
1566            started_at: Some(started_at.clone()),
1567        },
1568        result: JobStateResult {
1569            exit_code,
1570            signal: signal_name.clone(),
1571            duration_ms: Some(duration_ms),
1572        },
1573        pid: Some(pid),
1574        finished_at: Some(finished_at.clone()),
1575        updated_at: now_rfc3339(),
1576        windows_job_name: None, // not needed after process exits
1577    };
1578    job_dir.write_state(&state)?;
1579    info!(job_id, ?exit_code, "child process finished");
1580
1581    // Bounded join for log-reader threads.
1582    //
1583    // After the wrapped root process exits, we give log threads a short window
1584    // to drain any remaining output and fire output-match callbacks (which is
1585    // the common case: no descendants, pipe write-end closes promptly).
1586    //
1587    // If a thread does not complete within the window, it is detached (dropped).
1588    // Detaching means the thread will be killed when the supervisor process exits,
1589    // which is the correct trade-off: supervisor must not linger indefinitely
1590    // because a descendant holds an inherited pipe write-end open.
1591    const LOG_DRAIN_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000);
1592    let drain_deadline = std::time::Instant::now() + LOG_DRAIN_TIMEOUT;
1593
1594    let remaining = drain_deadline
1595        .checked_duration_since(std::time::Instant::now())
1596        .unwrap_or(std::time::Duration::ZERO);
1597    if rx_stdout_done.recv_timeout(remaining).is_ok() {
1598        let _ = t_stdout.join();
1599    } else {
1600        drop(t_stdout); // detach: descendant holds the pipe open
1601    }
1602
1603    let remaining = drain_deadline
1604        .checked_duration_since(std::time::Instant::now())
1605        .unwrap_or(std::time::Duration::ZERO);
1606    if rx_stderr_done.recv_timeout(remaining).is_ok() {
1607        let _ = t_stderr.join();
1608    } else {
1609        drop(t_stderr); // detach: descendant holds the pipe open
1610    }
1611
1612    // Join watcher if present; it exits promptly once child_done is set.
1613    if let Some(w) = watcher {
1614        let _ = w.join();
1615    }
1616
1617    // Reload the latest notification config from meta.json to pick up any post-creation
1618    // updates (e.g. from `notify set` invoked after the job was launched).
1619    let latest_notification = job_dir.read_meta().ok().and_then(|m| m.notification);
1620    let (current_notify_command, current_notify_file) = match &latest_notification {
1621        Some(n) => (n.notify_command.clone(), n.notify_file.clone()),
1622        None => (None, None),
1623    };
1624
1625    // Dispatch completion event to configured notification sinks.
1626    // Failure here must not alter job state (delivery result is recorded separately).
1627    let has_notification = current_notify_command.is_some() || current_notify_file.is_some();
1628    if has_notification {
1629        let stdout_log = job_dir.stdout_path().display().to_string();
1630        let stderr_log = job_dir.stderr_path().display().to_string();
1631        let event = crate::schema::CompletionEvent {
1632            schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1633            event_type: "job.finished".to_string(),
1634            job_id: job_id.to_string(),
1635            state: terminal_status.as_str().to_string(),
1636            command: meta.command.clone(),
1637            cwd: meta.cwd.clone(),
1638            started_at,
1639            finished_at,
1640            duration_ms: Some(duration_ms),
1641            exit_code,
1642            signal: signal_name,
1643            stdout_log_path: stdout_log,
1644            stderr_log_path: stderr_log,
1645        };
1646
1647        let event_json = serde_json::to_string(&event).unwrap_or_default();
1648        let event_path = job_dir.completion_event_path().display().to_string();
1649        let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
1650
1651        // Write initial completion_event.json before dispatching sinks.
1652        if let Err(e) =
1653            job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1654                event: event.clone(),
1655                delivery_results: vec![],
1656            })
1657        {
1658            warn!(job_id, error = %e, "failed to write initial completion_event.json");
1659        }
1660
1661        if let Some(ref shell_cmd) = current_notify_command {
1662            delivery_results.push(dispatch_command_sink(
1663                shell_cmd,
1664                &event_json,
1665                job_id,
1666                &event_path,
1667                &opts.shell_wrapper,
1668                "job.finished",
1669            ));
1670        }
1671        if let Some(ref file_path) = current_notify_file {
1672            delivery_results.push(dispatch_file_sink(file_path, &event_json));
1673        }
1674
1675        // Update completion_event.json with delivery results.
1676        if let Err(e) =
1677            job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1678                event,
1679                delivery_results,
1680            })
1681        {
1682            warn!(job_id, error = %e, "failed to update completion_event.json with delivery results");
1683        }
1684    }
1685
1686    Ok(())
1687}
1688
1689/// Dispatch the command sink: execute the shell command string via the configured shell wrapper,
1690/// pass event JSON via stdin, and set AGENT_EXEC_EVENT_PATH / AGENT_EXEC_JOB_ID /
1691/// AGENT_EXEC_EVENT_TYPE env vars.
1692///
1693/// The shell wrapper argv (e.g. `["sh", "-lc"]`) is provided by the caller.
1694/// The command string is appended as the final argument to the wrapper.
1695fn dispatch_command_sink(
1696    shell_cmd: &str,
1697    event_json: &str,
1698    job_id: &str,
1699    event_path: &str,
1700    shell_wrapper: &[String],
1701    event_type: &str,
1702) -> crate::schema::SinkDeliveryResult {
1703    use std::io::Write;
1704    let attempted_at = now_rfc3339();
1705    let target = shell_cmd.to_string();
1706
1707    if shell_cmd.trim().is_empty() {
1708        return crate::schema::SinkDeliveryResult {
1709            sink_type: "command".to_string(),
1710            target,
1711            success: false,
1712            error: Some("empty shell command".to_string()),
1713            attempted_at,
1714        };
1715    }
1716
1717    if shell_wrapper.is_empty() {
1718        return crate::schema::SinkDeliveryResult {
1719            sink_type: "command".to_string(),
1720            target,
1721            success: false,
1722            error: Some("shell wrapper must not be empty".to_string()),
1723            attempted_at,
1724        };
1725    }
1726
1727    let mut cmd = Command::new(&shell_wrapper[0]);
1728    cmd.args(&shell_wrapper[1..]).arg(shell_cmd);
1729
1730    cmd.env("AGENT_EXEC_EVENT_PATH", event_path);
1731    cmd.env("AGENT_EXEC_JOB_ID", job_id);
1732    cmd.env("AGENT_EXEC_EVENT_TYPE", event_type);
1733    cmd.stdin(std::process::Stdio::piped());
1734    cmd.stdout(std::process::Stdio::null());
1735    cmd.stderr(std::process::Stdio::null());
1736
1737    match cmd.spawn() {
1738        Ok(mut child) => {
1739            if let Some(mut stdin) = child.stdin.take() {
1740                let _ = stdin.write_all(event_json.as_bytes());
1741            }
1742            match child.wait() {
1743                Ok(status) if status.success() => crate::schema::SinkDeliveryResult {
1744                    sink_type: "command".to_string(),
1745                    target,
1746                    success: true,
1747                    error: None,
1748                    attempted_at,
1749                },
1750                Ok(status) => crate::schema::SinkDeliveryResult {
1751                    sink_type: "command".to_string(),
1752                    target,
1753                    success: false,
1754                    error: Some(format!("exited with status {status}")),
1755                    attempted_at,
1756                },
1757                Err(e) => crate::schema::SinkDeliveryResult {
1758                    sink_type: "command".to_string(),
1759                    target,
1760                    success: false,
1761                    error: Some(format!("wait error: {e}")),
1762                    attempted_at,
1763                },
1764            }
1765        }
1766        Err(e) => crate::schema::SinkDeliveryResult {
1767            sink_type: "command".to_string(),
1768            target,
1769            success: false,
1770            error: Some(format!("spawn error: {e}")),
1771            attempted_at,
1772        },
1773    }
1774}
1775
1776/// Dispatch the file sink: append event JSON as a single NDJSON line.
1777/// Creates parent directories automatically.
1778fn dispatch_file_sink(file_path: &str, event_json: &str) -> crate::schema::SinkDeliveryResult {
1779    use std::io::Write;
1780    let attempted_at = now_rfc3339();
1781    let path = std::path::Path::new(file_path);
1782
1783    if let Some(parent) = path.parent()
1784        && let Err(e) = std::fs::create_dir_all(parent)
1785    {
1786        return crate::schema::SinkDeliveryResult {
1787            sink_type: "file".to_string(),
1788            target: file_path.to_string(),
1789            success: false,
1790            error: Some(format!("create parent dir: {e}")),
1791            attempted_at,
1792        };
1793    }
1794
1795    match std::fs::OpenOptions::new()
1796        .create(true)
1797        .append(true)
1798        .open(path)
1799    {
1800        Ok(mut f) => match writeln!(f, "{event_json}") {
1801            Ok(_) => crate::schema::SinkDeliveryResult {
1802                sink_type: "file".to_string(),
1803                target: file_path.to_string(),
1804                success: true,
1805                error: None,
1806                attempted_at,
1807            },
1808            Err(e) => crate::schema::SinkDeliveryResult {
1809                sink_type: "file".to_string(),
1810                target: file_path.to_string(),
1811                success: false,
1812                error: Some(format!("write error: {e}")),
1813                attempted_at,
1814            },
1815        },
1816        Err(e) => crate::schema::SinkDeliveryResult {
1817            sink_type: "file".to_string(),
1818            target: file_path.to_string(),
1819            success: false,
1820            error: Some(format!("open error: {e}")),
1821            attempted_at,
1822        },
1823    }
1824}
1825
1826/// Public alias so other modules can call the timestamp helper.
1827pub fn now_rfc3339_pub() -> String {
1828    now_rfc3339()
1829}
1830
1831fn now_rfc3339() -> String {
1832    // Use a simple approach that works without chrono.
1833    let d = std::time::SystemTime::now()
1834        .duration_since(std::time::UNIX_EPOCH)
1835        .unwrap_or_default();
1836    format_rfc3339(d.as_secs())
1837}
1838
1839fn format_rfc3339(secs: u64) -> String {
1840    // Manual conversion of Unix timestamp to UTC date-time string.
1841    let mut s = secs;
1842    let seconds = s % 60;
1843    s /= 60;
1844    let minutes = s % 60;
1845    s /= 60;
1846    let hours = s % 24;
1847    s /= 24;
1848
1849    // Days since 1970-01-01
1850    let mut days = s;
1851    let mut year = 1970u64;
1852    loop {
1853        let days_in_year = if is_leap(year) { 366 } else { 365 };
1854        if days < days_in_year {
1855            break;
1856        }
1857        days -= days_in_year;
1858        year += 1;
1859    }
1860
1861    let leap = is_leap(year);
1862    let month_days: [u64; 12] = [
1863        31,
1864        if leap { 29 } else { 28 },
1865        31,
1866        30,
1867        31,
1868        30,
1869        31,
1870        31,
1871        30,
1872        31,
1873        30,
1874        31,
1875    ];
1876    let mut month = 0usize;
1877    for (i, &d) in month_days.iter().enumerate() {
1878        if days < d {
1879            month = i;
1880            break;
1881        }
1882        days -= d;
1883    }
1884    let day = days + 1;
1885
1886    format!(
1887        "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
1888        year,
1889        month + 1,
1890        day,
1891        hours,
1892        minutes,
1893        seconds
1894    )
1895}
1896
1897fn is_leap(year: u64) -> bool {
1898    (year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
1899}
1900
1901/// Windows-only: create a named Job Object and assign the given child process
1902/// to it so that the entire process tree can be terminated via `kill`.
1903///
1904/// The Job Object is named `"AgentExec-{job_id}"`. This name is stored in
1905/// `state.json` so that future `kill` invocations can open the same Job Object
1906/// by name and call `TerminateJobObject` to stop the whole tree.
1907///
1908/// Returns `Ok(name)` on success.  Returns `Err` on failure — the caller
1909/// (`supervise`) treats failure as a fatal error because reliable process-tree
1910/// management is a Windows MUST requirement (design.md).
1911#[cfg(windows)]
1912fn assign_to_job_object(job_id: &str, pid: u32) -> Result<String> {
1913    use windows::Win32::Foundation::CloseHandle;
1914    use windows::Win32::System::JobObjects::{AssignProcessToJobObject, CreateJobObjectW};
1915    use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
1916    use windows::core::HSTRING;
1917
1918    let job_name = format!("AgentExec-{job_id}");
1919    let hname = HSTRING::from(job_name.as_str());
1920
1921    unsafe {
1922        // Open the child process handle (needed for AssignProcessToJobObject).
1923        let proc_handle =
1924            OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid).map_err(|e| {
1925                anyhow::anyhow!(
1926                    "supervisor: OpenProcess(pid={pid}) failed — cannot assign to Job Object: {e}"
1927                )
1928            })?;
1929
1930        // Create a named Job Object.
1931        let job = match CreateJobObjectW(None, &hname) {
1932            Ok(h) => h,
1933            Err(e) => {
1934                let _ = CloseHandle(proc_handle);
1935                return Err(anyhow::anyhow!(
1936                    "supervisor: CreateJobObjectW({job_name}) failed: {e}"
1937                ));
1938            }
1939        };
1940
1941        // Assign the child process to the Job Object.
1942        // This can fail if the process is already in another job (e.g. CI/nested).
1943        // Per design.md, assignment is a MUST on Windows — failure is a fatal error.
1944        if let Err(e) = AssignProcessToJobObject(job, proc_handle) {
1945            let _ = CloseHandle(job);
1946            let _ = CloseHandle(proc_handle);
1947            return Err(anyhow::anyhow!(
1948                "supervisor: AssignProcessToJobObject(pid={pid}) failed \
1949                 (process may already belong to another Job Object, e.g. in a CI environment): {e}"
1950            ));
1951        }
1952
1953        // Keep job handle open for the lifetime of the supervisor so the Job
1954        // Object remains valid. We intentionally do NOT close it here.
1955        // The OS will close it automatically when the supervisor exits.
1956        // (We close proc_handle since we only needed it for assignment.)
1957        let _ = CloseHandle(proc_handle);
1958        // Note: job handle is intentionally leaked here to keep the Job Object alive.
1959        // The handle will be closed when the supervisor process exits.
1960        std::mem::forget(job);
1961    }
1962
1963    info!(job_id, name = %job_name, "supervisor: child assigned to Job Object");
1964    Ok(job_name)
1965}
1966
1967#[cfg(test)]
1968mod tests {
1969    use super::*;
1970
1971    #[test]
1972    fn rfc3339_epoch() {
1973        assert_eq!(format_rfc3339(0), "1970-01-01T00:00:00Z");
1974    }
1975
1976    #[test]
1977    fn rfc3339_known_date() {
1978        // 2024-01-01T00:00:00Z = 1704067200
1979        assert_eq!(format_rfc3339(1704067200), "2024-01-01T00:00:00Z");
1980    }
1981}