Skip to main content

agent_exec/
run.rs

1//! Implementation of the `run` sub-command.
2//!
3//! Design decisions (from design.md):
4//! - `run` spawns a short-lived front-end that forks a `_supervise` child.
5//! - The supervisor continues logging stdout/stderr after `run` returns.
6//! - `run` returns launch metadata immediately; observation is delegated to
7//!   `status` / `wait` / `tail`.
8
9use anyhow::{Context, Result};
10use std::io::IsTerminal;
11use std::path::Path;
12use std::process::Command;
13use tracing::{debug, info, warn};
14
15use crate::jobstore::{JobDir, generate_job_id, resolve_root};
16use crate::schema::{
17    JobMeta, JobMetaJob, JobState, JobStateJob, JobStateResult, JobStatus, Response, RunData,
18};
19
20#[derive(Debug, Clone)]
21pub struct InlineObservation {
22    pub waited_ms: u64,
23    pub stdout: String,
24    pub stderr: String,
25    pub stdout_range: [u64; 2],
26    pub stderr_range: [u64; 2],
27    pub stdout_total_bytes: u64,
28    pub stderr_total_bytes: u64,
29    pub encoding: String,
30    pub state: String,
31    pub exit_code: Option<i32>,
32    pub finished_at: Option<String>,
33    pub signal: Option<String>,
34    pub duration_ms: Option<u64>,
35}
36use crate::tag::dedup_tags;
37
38/// Options for the `run` sub-command.
39///
40/// # Definition-time option alignment rule
41///
42/// Every definition-time option accepted here MUST also be accepted by `create` (and vice versa),
43/// since both commands write the same persisted job definition to `meta.json`. When adding a
44/// new persisted metadata field, wire it through both `run` and `create` unless the spec
45/// explicitly documents it as launch-only (e.g. snapshot timing, tail sizing, --wait).
46#[derive(Debug)]
47pub struct RunOpts<'a> {
48    /// Command and arguments to execute.
49    pub command: Vec<String>,
50    /// Override for jobs root directory.
51    pub root: Option<&'a str>,
52    /// Wait for inline output observation before returning.
53    pub wait: bool,
54    /// Maximum wait duration in seconds for inline observation.
55    pub until_seconds: u64,
56    /// Wait indefinitely for terminal state / observation budget.
57    pub forever: bool,
58    /// Maximum bytes to include from the head of each stream.
59    pub max_bytes: u64,
60    /// Timeout in milliseconds; 0 = no timeout.
61    pub timeout_ms: u64,
62    /// Milliseconds after SIGTERM before SIGKILL; 0 = immediate SIGKILL.
63    pub kill_after_ms: u64,
64    /// Working directory for the command.
65    pub cwd: Option<&'a str>,
66    /// Environment variables as KEY=VALUE strings.
67    pub env_vars: Vec<String>,
68    /// Paths to env files, applied in order.
69    pub env_files: Vec<String>,
70    /// Whether to inherit the current process environment (default: true).
71    pub inherit_env: bool,
72    /// Keys to mask in JSON output (values replaced with "***").
73    pub mask: Vec<String>,
74    /// Optional stdin source definition persisted in meta and materialized into stdin.bin.
75    pub stdin: Option<StdinSource>,
76    /// Maximum bytes allowed for materialized stdin.bin (default: 64 MiB).
77    pub stdin_max_bytes: u64,
78    /// User-defined tags for this job (deduplicated preserving first-seen order).
79    pub tags: Vec<String>,
80    /// Override full.log path; None = use job dir.
81    pub log: Option<&'a str>,
82    /// Interval (ms) for state.json updated_at refresh; 0 = disabled.
83    pub progress_every_ms: u64,
84    /// Shell command string for command notification sink; executed via platform shell.
85    /// None = no command sink.
86    pub notify_command: Option<String>,
87    /// File path for NDJSON notification sink; None = no file sink.
88    pub notify_file: Option<String>,
89    /// Pattern to match against output lines (output-match notification).
90    pub output_pattern: Option<String>,
91    /// Match type for output-match: "contains" or "regex".
92    pub output_match_type: Option<String>,
93    /// Stream selector: "stdout", "stderr", or "either".
94    pub output_stream: Option<String>,
95    /// Shell command string for output-match command sink.
96    pub output_command: Option<String>,
97    /// File path for output-match NDJSON file sink.
98    pub output_file: Option<String>,
99    /// Resolved shell wrapper argv used to execute command strings.
100    /// e.g. `["sh", "-lc"]` or `["bash", "-lc"]`.
101    pub shell_wrapper: Vec<String>,
102}
103
104impl<'a> Default for RunOpts<'a> {
105    fn default() -> Self {
106        RunOpts {
107            command: vec![],
108            root: None,
109            wait: true,
110            until_seconds: 10,
111            forever: false,
112            max_bytes: 65536,
113            timeout_ms: 0,
114            kill_after_ms: 0,
115            cwd: None,
116            env_vars: vec![],
117            env_files: vec![],
118            inherit_env: true,
119            mask: vec![],
120            stdin: None,
121            stdin_max_bytes: DEFAULT_STDIN_MAX_BYTES,
122            tags: vec![],
123            log: None,
124            progress_every_ms: 0,
125            notify_command: None,
126            notify_file: None,
127            output_pattern: None,
128            output_match_type: None,
129            output_stream: None,
130            output_command: None,
131            output_file: None,
132            shell_wrapper: crate::config::default_shell_wrapper(),
133        }
134    }
135}
136
137/// Parameters for spawning a supervisor process.
138///
139/// Shared by `run::execute` and `start::execute`.
140#[derive(Debug, Clone)]
141pub enum StdinSource {
142    CallerStdin,
143    Inline(String),
144    File(String),
145}
146
147pub struct SpawnSupervisorParams {
148    pub job_id: String,
149    pub root: std::path::PathBuf,
150    pub full_log_path: String,
151    pub timeout_ms: u64,
152    pub kill_after_ms: u64,
153    pub cwd: Option<String>,
154    /// Real (unmasked) KEY=VALUE env var pairs.
155    pub env_vars: Vec<String>,
156    pub env_files: Vec<String>,
157    pub inherit_env: bool,
158    pub stdin_file: Option<String>,
159    pub progress_every_ms: u64,
160    pub notify_command: Option<String>,
161    pub notify_file: Option<String>,
162    pub shell_wrapper: Vec<String>,
163    pub command: Vec<String>,
164}
165
166pub fn resolve_stdin_source(
167    stdin: Option<String>,
168    stdin_file: Option<String>,
169) -> Option<StdinSource> {
170    if let Some(value) = stdin {
171        if value == "-" {
172            Some(StdinSource::CallerStdin)
173        } else {
174            Some(StdinSource::Inline(value))
175        }
176    } else {
177        stdin_file.map(StdinSource::File)
178    }
179}
180
181pub fn validate_stdin_source(stdin: Option<&StdinSource>) -> Result<()> {
182    if matches!(stdin, Some(StdinSource::CallerStdin)) {
183        let stdin = std::io::stdin();
184        if stdin.is_terminal() {
185            return Err(anyhow::anyhow!(StdinRequired(
186                "stdin_required: --stdin - requires non-interactive stdin (pipe/heredoc/redirect)"
187                    .to_string(),
188            )));
189        }
190    }
191    Ok(())
192}
193
194fn create_stdin_file(path: &std::path::Path) -> Result<std::fs::File> {
195    #[cfg(unix)]
196    {
197        use std::os::unix::fs::OpenOptionsExt;
198        std::fs::OpenOptions::new()
199            .write(true)
200            .create(true)
201            .truncate(true)
202            .mode(0o600)
203            .open(path)
204            .with_context(|| format!("create materialized stdin {}", path.display()))
205    }
206    #[cfg(not(unix))]
207    {
208        std::fs::File::create(path)
209            .with_context(|| format!("create materialized stdin {}", path.display()))
210    }
211}
212
213struct LimitedWriter<W> {
214    inner: W,
215    written: u64,
216    limit: u64,
217}
218
219impl<W: std::io::Write> std::io::Write for LimitedWriter<W> {
220    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
221        if self.written + buf.len() as u64 > self.limit {
222            return Err(std::io::Error::other("stdin_too_large"));
223        }
224        let n = self.inner.write(buf)?;
225        self.written += n as u64;
226        Ok(n)
227    }
228
229    fn flush(&mut self) -> std::io::Result<()> {
230        self.inner.flush()
231    }
232}
233
234fn materialize_stdin(
235    job_dir: &JobDir,
236    stdin: Option<&StdinSource>,
237    max_bytes: u64,
238) -> Result<Option<String>> {
239    let Some(source) = stdin else {
240        return Ok(None);
241    };
242
243    let target_name = "stdin.bin".to_string();
244    let target_path = job_dir.path.join(&target_name);
245    let file = create_stdin_file(&target_path)?;
246    let mut target = LimitedWriter {
247        inner: file,
248        written: 0,
249        limit: max_bytes,
250    };
251
252    let copy_result = match source {
253        StdinSource::CallerStdin => {
254            let mut stdin = std::io::stdin();
255            std::io::copy(&mut stdin, &mut target).context("materialize caller stdin to stdin.bin")
256        }
257        StdinSource::Inline(value) => {
258            use std::io::Write;
259            target
260                .write_all(value.as_bytes())
261                .map(|_| 0u64)
262                .context("write inline stdin to stdin.bin")
263        }
264        StdinSource::File(path) => {
265            let mut input = std::fs::File::open(path)
266                .with_context(|| format!("open --stdin-file source {}", path))?;
267            std::io::copy(&mut input, &mut target)
268                .with_context(|| format!("copy --stdin-file source {} to stdin.bin", path))
269        }
270    };
271
272    if let Err(e) = copy_result {
273        let _ = std::fs::remove_file(&target_path);
274        let is_too_large = e
275            .chain()
276            .any(|cause| cause.to_string().contains("stdin_too_large"));
277        if is_too_large {
278            return Err(anyhow::anyhow!(StdinTooLarge(format!(
279                "stdin_too_large: input exceeds {} byte limit",
280                max_bytes
281            ))));
282        }
283        return Err(e);
284    }
285
286    Ok(Some(target_name))
287}
288
289fn resolve_stdin_path(job_dir: &JobDir, stdin_file: Option<&str>) -> Option<std::path::PathBuf> {
290    stdin_file.map(|p| {
291        let path = std::path::Path::new(p);
292        if path.is_absolute() {
293            path.to_path_buf()
294        } else {
295            job_dir.path.join(path)
296        }
297    })
298}
299
300#[derive(Debug)]
301pub struct StdinRequired(pub String);
302
303impl std::fmt::Display for StdinRequired {
304    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305        write!(f, "{}", self.0)
306    }
307}
308
309impl std::error::Error for StdinRequired {}
310
311#[derive(Debug)]
312pub struct StdinTooLarge(pub String);
313
314impl std::fmt::Display for StdinTooLarge {
315    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
316        write!(f, "{}", self.0)
317    }
318}
319
320impl std::error::Error for StdinTooLarge {}
321
322pub fn open_child_stdin(job_dir: &JobDir, stdin_file: Option<&str>) -> Result<std::process::Stdio> {
323    if let Some(path) = resolve_stdin_path(job_dir, stdin_file) {
324        let file = std::fs::File::open(&path)
325            .with_context(|| format!("open materialized stdin {}", path.display()))?;
326        Ok(std::process::Stdio::from(file))
327    } else {
328        Ok(std::process::Stdio::null())
329    }
330}
331
332pub const DEFAULT_STDIN_MAX_BYTES: u64 = 64 * 1024 * 1024; // 64 MiB
333
334pub fn materialize_stdin_for_job(
335    job_dir: &JobDir,
336    stdin: Option<&StdinSource>,
337    max_bytes: u64,
338) -> Result<Option<String>> {
339    materialize_stdin(job_dir, stdin, max_bytes)
340}
341
342/// Spawn the supervisor process and write the initial running state to `state.json`.
343///
344/// Returns the supervisor PID and the actual `started_at` timestamp.
345/// Also handles the Windows Job Object handshake before returning.
346pub fn spawn_supervisor_process(
347    job_dir: &JobDir,
348    params: SpawnSupervisorParams,
349) -> Result<(u32, String)> {
350    let started_at = now_rfc3339();
351
352    let exe = std::env::current_exe().context("resolve current exe")?;
353    let mut supervisor_cmd = Command::new(&exe);
354    supervisor_cmd
355        .arg("_supervise")
356        .arg("--job-id")
357        .arg(&params.job_id)
358        .arg("--supervise-root")
359        .arg(params.root.display().to_string())
360        .arg("--full-log")
361        .arg(&params.full_log_path);
362
363    if params.timeout_ms > 0 {
364        let timeout_seconds = params.timeout_ms.saturating_add(999) / 1000;
365        supervisor_cmd
366            .arg("--timeout")
367            .arg(timeout_seconds.to_string());
368    }
369    if params.kill_after_ms > 0 {
370        let kill_after_seconds = params.kill_after_ms.saturating_add(999) / 1000;
371        supervisor_cmd
372            .arg("--kill-after")
373            .arg(kill_after_seconds.to_string());
374    }
375    if let Some(ref cwd) = params.cwd {
376        supervisor_cmd.arg("--cwd").arg(cwd);
377    }
378    for env_file in &params.env_files {
379        supervisor_cmd.arg("--env-file").arg(env_file);
380    }
381    for env_var in &params.env_vars {
382        supervisor_cmd.arg("--env").arg(env_var);
383    }
384    if !params.inherit_env {
385        supervisor_cmd.arg("--no-inherit-env");
386    }
387    if let Some(ref stdin_file) = params.stdin_file {
388        supervisor_cmd.arg("--stdin-file").arg(stdin_file);
389    }
390    if params.progress_every_ms > 0 {
391        let progress_every_seconds = params.progress_every_ms.saturating_add(999) / 1000;
392        supervisor_cmd
393            .arg("--progress-every")
394            .arg(progress_every_seconds.to_string());
395    }
396    if let Some(ref nc) = params.notify_command {
397        supervisor_cmd.arg("--notify-command").arg(nc);
398    }
399    if let Some(ref nf) = params.notify_file {
400        supervisor_cmd.arg("--notify-file").arg(nf);
401    }
402    let wrapper_json =
403        serde_json::to_string(&params.shell_wrapper).context("serialize shell wrapper")?;
404    supervisor_cmd
405        .arg("--shell-wrapper-resolved")
406        .arg(&wrapper_json);
407
408    supervisor_cmd
409        .arg("--")
410        .args(&params.command)
411        .stdin(std::process::Stdio::null())
412        .stdout(std::process::Stdio::null())
413        .stderr(std::process::Stdio::null());
414
415    let supervisor = supervisor_cmd.spawn().context("spawn supervisor")?;
416    let supervisor_pid = supervisor.id();
417    debug!(supervisor_pid, "supervisor spawned");
418
419    // Write initial running state.
420    job_dir.init_state(supervisor_pid, &started_at)?;
421
422    // Windows Job Object handshake.
423    #[cfg(windows)]
424    {
425        let handshake_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
426        loop {
427            std::thread::sleep(std::time::Duration::from_millis(10));
428            if let Ok(current_state) = job_dir.read_state() {
429                let supervisor_updated = current_state
430                    .pid
431                    .map(|p| p != supervisor_pid)
432                    .unwrap_or(false)
433                    || *current_state.status() == crate::schema::JobStatus::Failed;
434                if supervisor_updated {
435                    if *current_state.status() == crate::schema::JobStatus::Failed {
436                        anyhow::bail!(
437                            "supervisor failed to assign child process to Job Object \
438                             (Windows MUST requirement); see stderr for details"
439                        );
440                    }
441                    debug!("supervisor confirmed Job Object assignment via state.json handshake");
442                    break;
443                }
444            }
445            if std::time::Instant::now() >= handshake_deadline {
446                debug!("supervisor handshake timed out; proceeding with initial state");
447                break;
448            }
449        }
450    }
451
452    Ok((supervisor_pid, started_at))
453}
454
455/// Pre-create empty log files (stdout.log, stderr.log, full.log) so they exist
456/// immediately after job creation, before the supervisor starts writing.
457pub fn pre_create_log_files(job_dir: &JobDir) -> Result<()> {
458    for log_path in [
459        job_dir.stdout_path(),
460        job_dir.stderr_path(),
461        job_dir.full_log_path(),
462    ] {
463        std::fs::OpenOptions::new()
464            .create(true)
465            .append(true)
466            .open(&log_path)
467            .with_context(|| format!("pre-create log file {}", log_path.display()))?;
468    }
469    Ok(())
470}
471
472/// Execute `run`: spawn job and return launch metadata immediately.
473pub fn execute(opts: RunOpts) -> Result<()> {
474    if opts.command.is_empty() {
475        anyhow::bail!("no command specified for run");
476    }
477
478    let elapsed_start = std::time::Instant::now();
479
480    let root = resolve_root(opts.root);
481    std::fs::create_dir_all(&root)
482        .with_context(|| format!("create jobs root {}", root.display()))?;
483
484    let job_id = generate_job_id(&root)?;
485    let created_at = now_rfc3339();
486
487    // Extract only the key names from KEY=VALUE env var strings (values are not persisted).
488    let env_keys: Vec<String> = opts
489        .env_vars
490        .iter()
491        .map(|kv| kv.split('=').next().unwrap_or(kv.as_str()).to_string())
492        .collect();
493
494    // Apply masking: replace values of masked keys with "***" in env_vars for metadata.
495    let masked_env_vars = mask_env_vars(&opts.env_vars, &opts.mask);
496
497    // Resolve the effective working directory for this job.
498    // If --cwd was specified, use that path; otherwise use the current process's working directory.
499    // Canonicalize the path for consistent comparison; fall back to absolute path on failure.
500    let effective_cwd = resolve_effective_cwd(opts.cwd);
501
502    // Build output-match config from definition-time options (same logic as `create` and `notify set`).
503    let on_output_match = crate::notify::build_output_match_config(
504        opts.output_pattern,
505        opts.output_match_type,
506        opts.output_stream,
507        opts.output_command,
508        opts.output_file,
509        None,
510    );
511
512    let notification =
513        if opts.notify_command.is_some() || opts.notify_file.is_some() || on_output_match.is_some()
514        {
515            Some(crate::schema::NotificationConfig {
516                notify_command: opts.notify_command.clone(),
517                notify_file: opts.notify_file.clone(),
518                on_output_match,
519            })
520        } else {
521            None
522        };
523
524    // Validate and deduplicate tags (preserving first-seen order).
525    let tags = dedup_tags(opts.tags)?;
526
527    let meta = JobMeta {
528        job: JobMetaJob { id: job_id.clone() },
529        schema_version: crate::schema::SCHEMA_VERSION.to_string(),
530        command: opts.command.clone(),
531        created_at: created_at.clone(),
532        root: root.display().to_string(),
533        env_keys,
534        env_vars: masked_env_vars.clone(),
535        // For `run`, env_vars_runtime is not populated because the supervisor
536        // is spawned immediately with the real values; no deferred start needed.
537        env_vars_runtime: vec![],
538        mask: opts.mask.clone(),
539        cwd: Some(effective_cwd),
540        notification,
541        // Execution-definition fields (used by start if ever applicable).
542        inherit_env: opts.inherit_env,
543        env_files: opts.env_files.clone(),
544        timeout_ms: opts.timeout_ms,
545        kill_after_ms: opts.kill_after_ms,
546        progress_every_ms: opts.progress_every_ms,
547        shell_wrapper: Some(opts.shell_wrapper.clone()),
548        stdin_file: None,
549        tags: tags.clone(),
550    };
551
552    validate_stdin_source(opts.stdin.as_ref())?;
553
554    let job_dir = JobDir::create(&root, &job_id, &meta)?;
555    let stdin_file =
556        materialize_stdin_for_job(&job_dir, opts.stdin.as_ref(), opts.stdin_max_bytes)?;
557    if stdin_file.is_some() {
558        let mut meta_with_stdin = meta.clone();
559        meta_with_stdin.stdin_file = stdin_file.clone();
560        job_dir.write_meta_atomic(&meta_with_stdin)?;
561    }
562    info!(job_id = %job_id, "created job directory");
563
564    // Determine the full.log path (may be overridden by --log).
565    let full_log_path = if let Some(log) = opts.log {
566        log.to_string()
567    } else {
568        job_dir.full_log_path().display().to_string()
569    };
570
571    // Pre-create empty log files so they exist before the supervisor starts.
572    pre_create_log_files(&job_dir)?;
573
574    // Spawn the supervisor using the shared helper.
575    // Note: masking is handled by `run` (meta.json + JSON response). The supervisor
576    // receives the real env var values so the child process can use them as intended.
577    let (_supervisor_pid, _started_at) = spawn_supervisor_process(
578        &job_dir,
579        SpawnSupervisorParams {
580            job_id: job_id.clone(),
581            root: root.clone(),
582            full_log_path: full_log_path.clone(),
583            timeout_ms: opts.timeout_ms,
584            kill_after_ms: opts.kill_after_ms,
585            cwd: opts.cwd.map(|s| s.to_string()),
586            env_vars: opts.env_vars.clone(),
587            env_files: opts.env_files.clone(),
588            inherit_env: opts.inherit_env,
589            stdin_file: stdin_file.clone(),
590            progress_every_ms: opts.progress_every_ms,
591            notify_command: opts.notify_command.clone(),
592            notify_file: opts.notify_file.clone(),
593            shell_wrapper: opts.shell_wrapper.clone(),
594            command: opts.command.clone(),
595        },
596    )?;
597
598    // Compute absolute paths for stdout.log and stderr.log.
599    let stdout_log_path = job_dir.stdout_path().display().to_string();
600    let stderr_log_path = job_dir.stderr_path().display().to_string();
601
602    let observation = observe_inline_output(
603        &job_dir,
604        opts.wait,
605        opts.until_seconds,
606        opts.forever,
607        opts.max_bytes,
608    )?;
609    let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
610
611    let response = Response::new(
612        "run",
613        RunData {
614            job_id,
615            state: observation.state,
616            tags,
617            // Include masked env_vars in the JSON response so callers can inspect
618            // which variables were set (with secret values replaced by "***").
619            env_vars: masked_env_vars,
620            stdout_log_path,
621            stderr_log_path,
622            elapsed_ms,
623            waited_ms: observation.waited_ms,
624            stdout: observation.stdout,
625            stderr: observation.stderr,
626            stdout_range: observation.stdout_range,
627            stderr_range: observation.stderr_range,
628            stdout_total_bytes: observation.stdout_total_bytes,
629            stderr_total_bytes: observation.stderr_total_bytes,
630            encoding: observation.encoding,
631            exit_code: observation.exit_code,
632            finished_at: observation.finished_at,
633            signal: observation.signal,
634            duration_ms: observation.duration_ms,
635        },
636    );
637    response.print();
638    Ok(())
639}
640
641/// Options for the `_supervise` internal sub-command.
642///
643/// Masking is the responsibility of `run` (which writes masked values to meta.json
644/// and includes them in the JSON response). The supervisor only needs the real
645/// environment variable values to launch the child process correctly.
646#[derive(Debug)]
647pub struct SuperviseOpts<'a> {
648    pub job_id: &'a str,
649    pub root: &'a Path,
650    pub command: &'a [String],
651    /// Override full.log path; None = use job dir default.
652    pub full_log: Option<&'a str>,
653    /// Timeout in milliseconds; 0 = no timeout.
654    pub timeout_ms: u64,
655    /// Milliseconds after SIGTERM before SIGKILL; 0 = immediate SIGKILL.
656    pub kill_after_ms: u64,
657    /// Working directory for the child process.
658    pub cwd: Option<&'a str>,
659    /// Environment variables as KEY=VALUE strings (real values, not masked).
660    pub env_vars: Vec<String>,
661    /// Paths to env files, applied in order.
662    pub env_files: Vec<String>,
663    /// Whether to inherit the current process environment.
664    pub inherit_env: bool,
665    /// Materialized stdin file path (relative to job dir) to feed child stdin.
666    pub stdin_file: Option<String>,
667    /// Interval (ms) for state.json updated_at refresh; 0 = disabled.
668    pub progress_every_ms: u64,
669    /// Shell command string for command notification sink; executed via platform shell.
670    /// None = no command sink.
671    pub notify_command: Option<String>,
672    /// File path for NDJSON notification sink; None = no file sink.
673    pub notify_file: Option<String>,
674    /// Resolved shell wrapper argv used to execute command strings.
675    pub shell_wrapper: Vec<String>,
676}
677
678/// Resolve the effective working directory for a job.
679///
680/// If `cwd_override` is `Some`, use that path as the base. Otherwise use the
681/// current process working directory. In either case, attempt to canonicalize
682/// the path for consistent comparison; on failure, fall back to the absolute
683/// path representation (avoids symlink / permission issues on some systems).
684pub fn observe_inline_output(
685    job_dir: &JobDir,
686    wait: bool,
687    until_seconds: u64,
688    forever: bool,
689    max_bytes: u64,
690) -> Result<InlineObservation> {
691    let started = std::time::Instant::now();
692    let deadline = if wait {
693        if forever {
694            None
695        } else {
696            Some(started + std::time::Duration::from_secs(until_seconds))
697        }
698    } else {
699        Some(started)
700    };
701
702    if wait {
703        loop {
704            let state = job_dir.read_state()?;
705            let has_output = std::fs::metadata(job_dir.stdout_path())
706                .map(|m| m.len() > 0)
707                .unwrap_or(false)
708                || std::fs::metadata(job_dir.stderr_path())
709                    .map(|m| m.len() > 0)
710                    .unwrap_or(false);
711
712            if !state.status().is_non_terminal() || has_output {
713                break;
714            }
715
716            if let Some(dl) = deadline
717                && std::time::Instant::now() >= dl
718            {
719                break;
720            }
721
722            std::thread::sleep(std::time::Duration::from_millis(100));
723        }
724    }
725
726    let state = job_dir.read_state()?;
727    let stdout = job_dir.read_head_metrics("stdout.log", max_bytes);
728    let stderr = job_dir.read_head_metrics("stderr.log", max_bytes);
729
730    Ok(InlineObservation {
731        waited_ms: started.elapsed().as_millis() as u64,
732        stdout: stdout.head,
733        stderr: stderr.head,
734        stdout_range: stdout.range,
735        stderr_range: stderr.range,
736        stdout_total_bytes: stdout.observed_bytes,
737        stderr_total_bytes: stderr.observed_bytes,
738        encoding: "utf-8-lossy".to_string(),
739        state: state.status().as_str().to_string(),
740        exit_code: state.exit_code(),
741        signal: state.signal().map(|s| s.to_string()),
742        duration_ms: state.duration_ms(),
743        finished_at: state.finished_at,
744    })
745}
746
747pub fn resolve_effective_cwd(cwd_override: Option<&str>) -> String {
748    let base = match cwd_override {
749        Some(p) => std::path::PathBuf::from(p),
750        None => std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")),
751    };
752
753    // Prefer canonicalized (resolves symlinks); fall back to making the path absolute.
754    match base.canonicalize() {
755        Ok(canonical) => canonical.display().to_string(),
756        Err(_) => {
757            // If base is already absolute, use as-is; otherwise prepend cwd.
758            if base.is_absolute() {
759                base.display().to_string()
760            } else {
761                // Best-effort: join with cwd, ignore errors.
762                let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
763                cwd.join(base).display().to_string()
764            }
765        }
766    }
767}
768
769/// Mask the values of specified keys in a list of KEY=VALUE strings.
770/// Keys listed in `mask_keys` will have their value replaced with "***".
771pub fn mask_env_vars(env_vars: &[String], mask_keys: &[String]) -> Vec<String> {
772    if mask_keys.is_empty() {
773        return env_vars.to_vec();
774    }
775    env_vars
776        .iter()
777        .map(|s| {
778            let (key, _val) = parse_env_var(s);
779            if mask_keys.iter().any(|k| k == &key) {
780                format!("{key}=***")
781            } else {
782                s.clone()
783            }
784        })
785        .collect()
786}
787
788/// Parse a single KEY=VALUE or KEY= string into (key, value).
789fn parse_env_var(s: &str) -> (String, String) {
790    if let Some(pos) = s.find('=') {
791        (s[..pos].to_string(), s[pos + 1..].to_string())
792    } else {
793        (s.to_string(), String::new())
794    }
795}
796
797/// Load environment variables from a .env-style file.
798/// Supports KEY=VALUE lines; lines starting with '#' and empty lines are ignored.
799fn load_env_file(path: &str) -> Result<Vec<(String, String)>> {
800    let contents =
801        std::fs::read_to_string(path).with_context(|| format!("read env-file {path}"))?;
802    let mut vars = Vec::new();
803    for line in contents.lines() {
804        let line = line.trim();
805        if line.is_empty() || line.starts_with('#') {
806            continue;
807        }
808        vars.push(parse_env_var(line));
809    }
810    Ok(vars)
811}
812
813/// Shared state for output-match checking, used by streaming threads in `supervise`.
814///
815/// Reloads `meta.json` on every observed line so that a `notify set` update is
816/// visible for the very next line, regardless of how recently the last reload
817/// occurred.  Multiple streaming threads share the same checker via `Arc`; the
818/// internal `Mutex` serialises access.
819struct OutputMatchChecker {
820    job_dir_path: std::path::PathBuf,
821    shell_wrapper: Vec<String>,
822    inner: std::sync::Mutex<OutputMatchInner>,
823}
824
825struct OutputMatchInner {
826    config: Option<crate::schema::NotificationConfig>,
827}
828
829impl OutputMatchChecker {
830    fn new(
831        job_dir_path: std::path::PathBuf,
832        shell_wrapper: Vec<String>,
833        initial_config: Option<crate::schema::NotificationConfig>,
834    ) -> Self {
835        Self {
836            job_dir_path,
837            shell_wrapper,
838            inner: std::sync::Mutex::new(OutputMatchInner {
839                config: initial_config,
840            }),
841        }
842    }
843
844    /// Check a newly observed output line for a configured match.
845    ///
846    /// Reloads `meta.json` on every call so that `notify set` updates are
847    /// visible for the next line without any delay.
848    /// Dispatches `job.output.matched` events outside the lock to avoid blocking
849    /// other streaming threads.
850    fn check_line(&self, line: &str, stream: &str) {
851        use crate::schema::{OutputMatchStream, OutputMatchType};
852
853        // Lock, reload, evaluate match, then release before dispatching.
854        let match_info: Option<crate::schema::OutputMatchConfig> = {
855            let mut inner = self.inner.lock().unwrap();
856
857            // Reload config on every line to pick up `notify set` updates immediately.
858            {
859                let meta_path = self.job_dir_path.join("meta.json");
860                if let Ok(raw) = std::fs::read(&meta_path)
861                    && let Ok(meta) = serde_json::from_slice::<crate::schema::JobMeta>(&raw)
862                {
863                    inner.config = meta.notification;
864                }
865            }
866
867            let Some(ref notification) = inner.config else {
868                return;
869            };
870            let Some(ref match_cfg) = notification.on_output_match else {
871                return;
872            };
873
874            // Check stream filter.
875            let stream_matches = match match_cfg.stream {
876                OutputMatchStream::Stdout => stream == "stdout",
877                OutputMatchStream::Stderr => stream == "stderr",
878                OutputMatchStream::Either => true,
879            };
880            if !stream_matches {
881                return;
882            }
883
884            // Check pattern.
885            let matched = match &match_cfg.match_type {
886                OutputMatchType::Contains => line.contains(&match_cfg.pattern),
887                OutputMatchType::Regex => regex::Regex::new(&match_cfg.pattern)
888                    .map(|re| re.is_match(line))
889                    .unwrap_or(false),
890            };
891
892            if matched {
893                Some(match_cfg.clone())
894            } else {
895                None
896            }
897        }; // Lock released.
898
899        if let Some(match_cfg) = match_info {
900            self.dispatch_match(line, stream, &match_cfg);
901        }
902    }
903
904    /// Dispatch a `job.output.matched` event and append a delivery record to
905    /// `notification_events.ndjson`.  Failures are non-fatal.
906    fn dispatch_match(
907        &self,
908        line: &str,
909        stream: &str,
910        match_cfg: &crate::schema::OutputMatchConfig,
911    ) {
912        use std::io::Write;
913
914        let job_id = self
915            .job_dir_path
916            .file_name()
917            .and_then(|n| n.to_str())
918            .unwrap_or("unknown");
919
920        let stdout_log_path = self.job_dir_path.join("stdout.log").display().to_string();
921        let stderr_log_path = self.job_dir_path.join("stderr.log").display().to_string();
922        let events_path = self.job_dir_path.join("notification_events.ndjson");
923        let events_path_str = events_path.display().to_string();
924
925        let match_type_str = match &match_cfg.match_type {
926            crate::schema::OutputMatchType::Contains => "contains",
927            crate::schema::OutputMatchType::Regex => "regex",
928        };
929
930        let event = crate::schema::OutputMatchEvent {
931            schema_version: crate::schema::SCHEMA_VERSION.to_string(),
932            event_type: "job.output.matched".to_string(),
933            job_id: job_id.to_string(),
934            pattern: match_cfg.pattern.clone(),
935            match_type: match_type_str.to_string(),
936            stream: stream.to_string(),
937            line: line.to_string(),
938            stdout_log_path,
939            stderr_log_path,
940        };
941
942        let event_json = serde_json::to_string(&event).unwrap_or_default();
943        let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
944
945        if let Some(ref cmd) = match_cfg.command {
946            delivery_results.push(dispatch_command_sink(
947                cmd,
948                &event_json,
949                job_id,
950                &events_path_str,
951                &self.shell_wrapper,
952                "job.output.matched",
953            ));
954        }
955        if let Some(ref file_path) = match_cfg.file {
956            delivery_results.push(dispatch_file_sink(file_path, &event_json));
957        }
958
959        // Append delivery record to notification_events.ndjson.
960        let record = crate::schema::OutputMatchEventRecord {
961            event,
962            delivery_results,
963        };
964        if let Ok(record_json) = serde_json::to_string(&record)
965            && let Ok(mut f) = std::fs::OpenOptions::new()
966                .create(true)
967                .append(true)
968                .open(&events_path)
969        {
970            let _ = writeln!(f, "{record_json}");
971        }
972    }
973}
974
975/// Stream bytes from a child process output pipe to an individual log file and
976/// to the shared `full.log`.
977///
978/// Reads byte chunks (not lines) so that output without a trailing newline is
979/// still captured in the individual log immediately.  The `full.log` format
980/// `"<RFC3339> [LABEL] <line>"` is maintained via a line-accumulation buffer:
981/// bytes are appended to the buffer until a newline is found, at which point a
982/// formatted line is written to `full.log`.  Any remaining bytes at EOF are
983/// flushed as a final line.
984///
985/// The optional `on_line` callback is invoked for each complete line (without
986/// the trailing newline) and is used to drive output-match checking.
987///
988/// This helper is used by both the stdout and stderr monitoring threads inside
989/// [`supervise`], replacing the previously duplicated per-stream implementations.
990/// Buffer size (8192 bytes) and newline-split logic are preserved unchanged.
991fn stream_to_logs<R, F>(
992    stream: R,
993    log_path: &std::path::Path,
994    full_log: std::sync::Arc<std::sync::Mutex<std::fs::File>>,
995    label: &str,
996    on_line: Option<F>,
997) where
998    R: std::io::Read,
999    F: Fn(&str),
1000{
1001    use std::io::Write;
1002    let mut log_file = std::fs::File::create(log_path).expect("create stream log file in thread");
1003    let mut stream = stream;
1004    let mut buf = [0u8; 8192];
1005    // Incomplete-line buffer for full.log formatting.
1006    let mut line_buf: Vec<u8> = Vec::new();
1007    loop {
1008        match stream.read(&mut buf) {
1009            Ok(0) => break, // EOF
1010            Ok(n) => {
1011                let chunk = &buf[..n];
1012                // Write raw bytes to the individual log (captures partial lines too).
1013                let _ = log_file.write_all(chunk);
1014                // Accumulate bytes for full.log line formatting.
1015                for &b in chunk {
1016                    if b == b'\n' {
1017                        let line = String::from_utf8_lossy(&line_buf);
1018                        if let Ok(mut fl) = full_log.lock() {
1019                            let ts = now_rfc3339();
1020                            let _ = writeln!(fl, "{ts} [{label}] {line}");
1021                        }
1022                        if let Some(ref f) = on_line {
1023                            f(&line);
1024                        }
1025                        line_buf.clear();
1026                    } else {
1027                        line_buf.push(b);
1028                    }
1029                }
1030            }
1031            Err(_) => break,
1032        }
1033    }
1034    // Flush any remaining incomplete line to full.log and trigger callback.
1035    if !line_buf.is_empty() {
1036        let line = String::from_utf8_lossy(&line_buf);
1037        if let Ok(mut fl) = full_log.lock() {
1038            let ts = now_rfc3339();
1039            let _ = writeln!(fl, "{ts} [{label}] {line}");
1040        }
1041        if let Some(ref f) = on_line {
1042            f(&line);
1043        }
1044    }
1045}
1046
1047/// Internal supervisor sub-command.
1048///
1049/// Runs the target command, streams stdout/stderr to individual log files
1050/// (`stdout.log`, `stderr.log`) **and** to the combined `full.log`, then
1051/// updates `state.json` when the process finishes.
1052///
1053/// On Windows, the child process is assigned to a named Job Object so that
1054/// the entire process tree can be terminated with a single `kill` call.
1055/// The Job Object name is recorded in `state.json` as `windows_job_name`.
1056pub fn supervise(opts: SuperviseOpts) -> Result<()> {
1057    use std::sync::{Arc, Mutex};
1058
1059    let job_id = opts.job_id;
1060    let root = opts.root;
1061    let command = opts.command;
1062
1063    if command.is_empty() {
1064        anyhow::bail!("supervisor: no command");
1065    }
1066
1067    let job_dir = JobDir::open(root, job_id)?;
1068
1069    // Read meta.json for notification config and cwd (used in completion event).
1070    let meta = job_dir.read_meta()?;
1071    // Use the actual execution start time, not meta.created_at.
1072    // For `run`, these are nearly identical. For `start`, the job may have
1073    // been created long before it was started.
1074    let started_at = now_rfc3339();
1075
1076    // Determine full.log path.
1077    let full_log_path = if let Some(p) = opts.full_log {
1078        std::path::PathBuf::from(p)
1079    } else {
1080        job_dir.full_log_path()
1081    };
1082
1083    // Create the full.log file (shared between stdout/stderr threads).
1084    // Ensure parent directories exist for custom paths.
1085    if let Some(parent) = full_log_path.parent() {
1086        std::fs::create_dir_all(parent)
1087            .with_context(|| format!("create dir for full.log: {}", parent.display()))?;
1088    }
1089    let full_log_file = std::fs::File::create(&full_log_path).context("create full.log")?;
1090    let full_log = Arc::new(Mutex::new(full_log_file));
1091
1092    // Execute command through the shell wrapper.
1093    //
1094    // Two launch modes:
1095    //
1096    //   String mode (command.len() == 1):  The single element is a shell command
1097    //   string passed as-is to the wrapper (e.g. `"echo hello && ls"` preserves
1098    //   shell operators).  The wrapper process is the workload boundary.
1099    //
1100    //   Argv mode (command.len() > 1):  The wrapper is used for login-shell
1101    //   environment initialisation but immediately hands off to the target via
1102    //   `exec "$@"`.  The shell replaces itself so the observed child PID and
1103    //   lifecycle align with the intended workload, not the wrapper.
1104    //
1105    // --notify-command delivery always uses the wrapper in string mode
1106    // (see dispatch_command_sink); this change only affects job argv launches.
1107    if opts.shell_wrapper.is_empty() {
1108        anyhow::bail!("supervisor: shell wrapper must not be empty");
1109    }
1110    let mut child_cmd = Command::new(&opts.shell_wrapper[0]);
1111    if command.len() == 1 {
1112        // Shell-string mode: pass the command string to the wrapper as-is.
1113        child_cmd.args(&opts.shell_wrapper[1..]).arg(&command[0]);
1114    } else {
1115        // Argv mode: launch the workload via the shell wrapper.
1116        //
1117        // On Unix the wrapper hands off to the workload via `exec "$@"` so the
1118        // shell replaces itself and the observed PID / lifecycle align with the
1119        // intended workload, not the wrapper.
1120        //
1121        // On non-Unix platforms (Windows) there is no POSIX `exec`; the wrapper
1122        // is invoked in shell-string mode with the argv joined into a single
1123        // quoted command string, preserving the existing cmd/C semantics.
1124        #[cfg(unix)]
1125        {
1126            // `--` serves as $0; argv elements become $1..$n so `$@` expands
1127            // to the full workload argv.
1128            child_cmd
1129                .args(&opts.shell_wrapper[1..])
1130                .arg("exec \"$@\"")
1131                .arg("--")
1132                .args(command);
1133        }
1134        #[cfg(not(unix))]
1135        {
1136            // Windows fallback: join argv into a shell-compatible string and
1137            // pass it to the wrapper as a single command string (same as
1138            // shell-string mode), so cmd /C semantics are preserved.
1139            let joined = command
1140                .iter()
1141                .map(|a| {
1142                    if a.contains(' ') {
1143                        format!("\"{}\"", a)
1144                    } else {
1145                        a.clone()
1146                    }
1147                })
1148                .collect::<Vec<_>>()
1149                .join(" ");
1150            child_cmd.args(&opts.shell_wrapper[1..]).arg(joined);
1151        }
1152    }
1153
1154    if opts.inherit_env {
1155        // Start with the current environment (default).
1156    } else {
1157        child_cmd.env_clear();
1158    }
1159
1160    // Apply env files in order.
1161    for env_file in &opts.env_files {
1162        let vars = load_env_file(env_file)?;
1163        for (k, v) in vars {
1164            child_cmd.env(&k, &v);
1165        }
1166    }
1167
1168    // Apply --env KEY=VALUE overrides (applied after env-files).
1169    for env_var in &opts.env_vars {
1170        let (k, v) = parse_env_var(env_var);
1171        child_cmd.env(&k, &v);
1172    }
1173
1174    // Set working directory if specified.
1175    if let Some(cwd) = opts.cwd {
1176        child_cmd.current_dir(cwd);
1177    }
1178
1179    // Put the child in its own process group so that timeout signals
1180    // (SIGTERM / SIGKILL) reach the entire process tree, not just the
1181    // shell wrapper.  Without this, `sh -lc "sleep 60"` would absorb
1182    // the signal while the grandchild (`sleep`) keeps running.
1183    #[cfg(unix)]
1184    {
1185        use std::os::unix::process::CommandExt;
1186        // SAFETY: setsid is async-signal-safe and called before exec.
1187        unsafe {
1188            child_cmd.pre_exec(|| {
1189                libc::setsid();
1190                Ok(())
1191            });
1192        }
1193    }
1194
1195    // Spawn the child with piped stdout/stderr so we can tee to logs.
1196    let child_stdin = open_child_stdin(&job_dir, opts.stdin_file.as_deref())?;
1197    let mut child = child_cmd
1198        .stdin(child_stdin)
1199        .stdout(std::process::Stdio::piped())
1200        .stderr(std::process::Stdio::piped())
1201        .spawn()
1202        .context("supervisor: spawn child")?;
1203
1204    let pid = child.id();
1205    info!(job_id, pid, "child process started");
1206
1207    // On Windows, assign child to a named Job Object for process-tree management.
1208    // The job name is derived from the job_id so that `kill` can look it up.
1209    // Assignment is a MUST requirement on Windows: if it fails, the supervisor
1210    // kills the child process and updates state.json to "failed" before returning
1211    // an error, so that the run front-end (which may have already returned) can
1212    // detect the failure via state.json on next poll.
1213    #[cfg(windows)]
1214    let windows_job_name = {
1215        match assign_to_job_object(job_id, pid) {
1216            Ok(name) => Some(name),
1217            Err(e) => {
1218                // Job Object assignment failed. Per design.md this is a MUST
1219                // requirement on Windows. Kill the child process and update
1220                // state.json to "failed" so the run front-end can detect it.
1221                let kill_err = child.kill();
1222                let _ = child.wait(); // reap to avoid zombies
1223
1224                let failed_state = JobState {
1225                    job: JobStateJob {
1226                        id: job_id.to_string(),
1227                        status: JobStatus::Failed,
1228                        started_at: Some(started_at.clone()),
1229                    },
1230                    result: JobStateResult {
1231                        exit_code: None,
1232                        signal: None,
1233                        duration_ms: None,
1234                    },
1235                    pid: Some(pid),
1236                    finished_at: Some(now_rfc3339()),
1237                    updated_at: now_rfc3339(),
1238                    windows_job_name: None,
1239                };
1240                // Best-effort: if writing state fails, we still propagate the
1241                // original assignment error.
1242                let _ = job_dir.write_state(&failed_state);
1243
1244                // Dispatch completion event for the failed state if notifications are configured.
1245                // This mirrors the dispatch logic in the normal exit path so that callers
1246                // receive a job.finished event even when the supervisor fails early (Windows only).
1247                if opts.notify_command.is_some() || opts.notify_file.is_some() {
1248                    let finished_at_ts =
1249                        failed_state.finished_at.clone().unwrap_or_else(now_rfc3339);
1250                    let stdout_log = job_dir.stdout_path().display().to_string();
1251                    let stderr_log = job_dir.stderr_path().display().to_string();
1252                    let fail_event = crate::schema::CompletionEvent {
1253                        schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1254                        event_type: "job.finished".to_string(),
1255                        job_id: job_id.to_string(),
1256                        state: JobStatus::Failed.as_str().to_string(),
1257                        command: meta.command.clone(),
1258                        cwd: meta.cwd.clone(),
1259                        started_at: started_at.clone(),
1260                        finished_at: finished_at_ts,
1261                        duration_ms: None,
1262                        exit_code: None,
1263                        signal: None,
1264                        stdout_log_path: stdout_log,
1265                        stderr_log_path: stderr_log,
1266                    };
1267                    let fail_event_json = serde_json::to_string(&fail_event).unwrap_or_default();
1268                    let fail_event_path = job_dir.completion_event_path().display().to_string();
1269                    let mut fail_delivery_results: Vec<crate::schema::SinkDeliveryResult> =
1270                        Vec::new();
1271                    if let Err(we) = job_dir.write_completion_event_atomic(
1272                        &crate::schema::CompletionEventRecord {
1273                            event: fail_event.clone(),
1274                            delivery_results: vec![],
1275                        },
1276                    ) {
1277                        warn!(
1278                            job_id,
1279                            error = %we,
1280                            "failed to write initial completion_event.json for failed job"
1281                        );
1282                    }
1283                    if let Some(ref shell_cmd) = opts.notify_command {
1284                        fail_delivery_results.push(dispatch_command_sink(
1285                            shell_cmd,
1286                            &fail_event_json,
1287                            job_id,
1288                            &fail_event_path,
1289                            &opts.shell_wrapper,
1290                            "job.finished",
1291                        ));
1292                    }
1293                    if let Some(ref file_path) = opts.notify_file {
1294                        fail_delivery_results.push(dispatch_file_sink(file_path, &fail_event_json));
1295                    }
1296                    if let Err(we) = job_dir.write_completion_event_atomic(
1297                        &crate::schema::CompletionEventRecord {
1298                            event: fail_event,
1299                            delivery_results: fail_delivery_results,
1300                        },
1301                    ) {
1302                        warn!(
1303                            job_id,
1304                            error = %we,
1305                            "failed to update completion_event.json with delivery results for failed job"
1306                        );
1307                    }
1308                }
1309
1310                if let Err(ke) = kill_err {
1311                    return Err(anyhow::anyhow!(
1312                        "supervisor: failed to assign pid {pid} to Job Object \
1313                         (Windows MUST requirement): {e}; also failed to kill child: {ke}"
1314                    ));
1315                }
1316                return Err(anyhow::anyhow!(
1317                    "supervisor: failed to assign pid {pid} to Job Object \
1318                     (Windows MUST requirement); child process was killed; \
1319                     consider running outside a nested Job Object environment: {e}"
1320                ));
1321            }
1322        }
1323    };
1324    #[cfg(not(windows))]
1325    let windows_job_name: Option<String> = None;
1326
1327    // Update state.json with real child PID and Windows Job Object name.
1328    // On Windows, windows_job_name is always Some at this point (guaranteed
1329    // by the MUST requirement above), so state.json will always contain the
1330    // Job Object identifier while the job is running.
1331    let state = JobState {
1332        job: JobStateJob {
1333            id: job_id.to_string(),
1334            status: JobStatus::Running,
1335            started_at: Some(started_at.clone()),
1336        },
1337        result: JobStateResult {
1338            exit_code: None,
1339            signal: None,
1340            duration_ms: None,
1341        },
1342        pid: Some(pid),
1343        finished_at: None,
1344        updated_at: now_rfc3339(),
1345        windows_job_name,
1346    };
1347    job_dir.write_state(&state)?;
1348
1349    let child_start_time = std::time::Instant::now();
1350
1351    // Take stdout/stderr handles before moving child.
1352    let child_stdout = child.stdout.take().expect("child stdout piped");
1353    let child_stderr = child.stderr.take().expect("child stderr piped");
1354
1355    // Create shared output-match checker from the initial meta notification config.
1356    let match_checker = std::sync::Arc::new(OutputMatchChecker::new(
1357        job_dir.path.clone(),
1358        opts.shell_wrapper.clone(),
1359        meta.notification.clone(),
1360    ));
1361
1362    // Completion channels for log threads: each thread sends `()` after stream_to_logs returns.
1363    // Used for bounded joins below (allows supervisor to exit promptly when descendants
1364    // hold inherited pipe ends open indefinitely).
1365    let (tx_stdout_done, rx_stdout_done) = std::sync::mpsc::channel::<()>();
1366    let (tx_stderr_done, rx_stderr_done) = std::sync::mpsc::channel::<()>();
1367
1368    // Thread: read stdout, write to stdout.log and full.log.
1369    let stdout_log_path = job_dir.stdout_path();
1370    let full_log_stdout = Arc::clone(&full_log);
1371    let match_checker_stdout = std::sync::Arc::clone(&match_checker);
1372    let t_stdout = std::thread::spawn(move || {
1373        stream_to_logs(
1374            child_stdout,
1375            &stdout_log_path,
1376            full_log_stdout,
1377            "STDOUT",
1378            Some(move |line: &str| match_checker_stdout.check_line(line, "stdout")),
1379        );
1380        let _ = tx_stdout_done.send(());
1381    });
1382
1383    // Thread: read stderr, write to stderr.log and full.log.
1384    let stderr_log_path = job_dir.stderr_path();
1385    let full_log_stderr = Arc::clone(&full_log);
1386    let match_checker_stderr = std::sync::Arc::clone(&match_checker);
1387    let t_stderr = std::thread::spawn(move || {
1388        stream_to_logs(
1389            child_stderr,
1390            &stderr_log_path,
1391            full_log_stderr,
1392            "STDERR",
1393            Some(move |line: &str| match_checker_stderr.check_line(line, "stderr")),
1394        );
1395        let _ = tx_stderr_done.send(());
1396    });
1397
1398    // Timeout / kill-after / progress-every handling.
1399    // We spawn a watcher thread to handle timeout and periodic state.json updates.
1400    let timeout_ms = opts.timeout_ms;
1401    let kill_after_ms = opts.kill_after_ms;
1402    let progress_every_ms = opts.progress_every_ms;
1403    let state_path = job_dir.state_path();
1404    let job_id_str = job_id.to_string();
1405
1406    // Use an atomic flag to signal the watcher thread when the child has exited.
1407    use std::sync::atomic::{AtomicBool, Ordering};
1408    let child_done = Arc::new(AtomicBool::new(false));
1409
1410    let watcher = if timeout_ms > 0 || progress_every_ms > 0 {
1411        let state_path_clone = state_path.clone();
1412        let child_done_clone = Arc::clone(&child_done);
1413        Some(std::thread::spawn(move || {
1414            let start = std::time::Instant::now();
1415            let timeout_dur = if timeout_ms > 0 {
1416                Some(std::time::Duration::from_millis(timeout_ms))
1417            } else {
1418                None
1419            };
1420            let progress_dur = if progress_every_ms > 0 {
1421                Some(std::time::Duration::from_millis(progress_every_ms))
1422            } else {
1423                None
1424            };
1425
1426            let poll_interval = std::time::Duration::from_millis(100);
1427
1428            loop {
1429                std::thread::sleep(poll_interval);
1430
1431                // Exit the watcher loop if the child process has finished.
1432                if child_done_clone.load(Ordering::Relaxed) {
1433                    break;
1434                }
1435
1436                let elapsed = start.elapsed();
1437
1438                // Check for timeout.
1439                if let Some(td) = timeout_dur
1440                    && elapsed >= td
1441                {
1442                    info!(job_id = %job_id_str, "timeout reached, sending SIGTERM to process group");
1443                    // Send SIGTERM to the entire process group (negative PID).
1444                    // The child was placed in its own session/group via setsid.
1445                    #[cfg(unix)]
1446                    {
1447                        unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGTERM) };
1448                    }
1449                    // If kill_after > 0, wait kill_after ms then SIGKILL.
1450                    if kill_after_ms > 0 {
1451                        std::thread::sleep(std::time::Duration::from_millis(kill_after_ms));
1452                        info!(job_id = %job_id_str, "kill-after elapsed, sending SIGKILL to process group");
1453                        #[cfg(unix)]
1454                        {
1455                            unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1456                        }
1457                    } else {
1458                        // Immediate SIGKILL to the process group.
1459                        #[cfg(unix)]
1460                        {
1461                            unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1462                        }
1463                    }
1464                    break;
1465                }
1466
1467                // Progress-every: update updated_at periodically.
1468                if let Some(pd) = progress_dur {
1469                    let elapsed_ms = elapsed.as_millis() as u64;
1470                    let pd_ms = pd.as_millis() as u64;
1471                    let poll_ms = poll_interval.as_millis() as u64;
1472                    if elapsed_ms % pd_ms < poll_ms {
1473                        // Read, update updated_at, write back.
1474                        if let Ok(raw) = std::fs::read(&state_path_clone)
1475                            && let Ok(mut st) =
1476                                serde_json::from_slice::<crate::schema::JobState>(&raw)
1477                        {
1478                            st.updated_at = now_rfc3339();
1479                            if let Ok(s) = serde_json::to_string_pretty(&st) {
1480                                let _ = std::fs::write(&state_path_clone, s);
1481                            }
1482                        }
1483                    }
1484                }
1485            }
1486        }))
1487    } else {
1488        None
1489    };
1490
1491    // Wait for child to finish.
1492    let exit_status = child.wait().context("wait for child")?;
1493
1494    // Signal the watcher that the child has finished so it can exit its loop.
1495    child_done.store(true, Ordering::Relaxed);
1496
1497    // Persist terminal state immediately after the wrapped root process exits.
1498    // This must happen BEFORE joining log threads, because log threads block on
1499    // EOF of stdout/stderr pipes and descendant processes that inherited those
1500    // pipes may keep them open indefinitely. Persisting state here ensures that
1501    // `status` and `wait` can observe the terminal state without waiting for
1502    // all descendants to close their inherited handles.
1503    let duration_ms = child_start_time.elapsed().as_millis() as u64;
1504    let exit_code = exit_status.code();
1505    let finished_at = now_rfc3339();
1506
1507    // Detect signal-killed processes on Unix for accurate state and completion event.
1508    #[cfg(unix)]
1509    let (terminal_status, signal_name) = {
1510        use std::os::unix::process::ExitStatusExt;
1511        if let Some(sig) = exit_status.signal() {
1512            (JobStatus::Killed, Some(sig.to_string()))
1513        } else {
1514            (JobStatus::Exited, None)
1515        }
1516    };
1517    #[cfg(not(unix))]
1518    let (terminal_status, signal_name) = (JobStatus::Exited, None::<String>);
1519
1520    let state = JobState {
1521        job: JobStateJob {
1522            id: job_id.to_string(),
1523            status: terminal_status.clone(),
1524            started_at: Some(started_at.clone()),
1525        },
1526        result: JobStateResult {
1527            exit_code,
1528            signal: signal_name.clone(),
1529            duration_ms: Some(duration_ms),
1530        },
1531        pid: Some(pid),
1532        finished_at: Some(finished_at.clone()),
1533        updated_at: now_rfc3339(),
1534        windows_job_name: None, // not needed after process exits
1535    };
1536    job_dir.write_state(&state)?;
1537    info!(job_id, ?exit_code, "child process finished");
1538
1539    // Bounded join for log-reader threads.
1540    //
1541    // After the wrapped root process exits, we give log threads a short window
1542    // to drain any remaining output and fire output-match callbacks (which is
1543    // the common case: no descendants, pipe write-end closes promptly).
1544    //
1545    // If a thread does not complete within the window, it is detached (dropped).
1546    // Detaching means the thread will be killed when the supervisor process exits,
1547    // which is the correct trade-off: supervisor must not linger indefinitely
1548    // because a descendant holds an inherited pipe write-end open.
1549    const LOG_DRAIN_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000);
1550    let drain_deadline = std::time::Instant::now() + LOG_DRAIN_TIMEOUT;
1551
1552    let remaining = drain_deadline
1553        .checked_duration_since(std::time::Instant::now())
1554        .unwrap_or(std::time::Duration::ZERO);
1555    if rx_stdout_done.recv_timeout(remaining).is_ok() {
1556        let _ = t_stdout.join();
1557    } else {
1558        drop(t_stdout); // detach: descendant holds the pipe open
1559    }
1560
1561    let remaining = drain_deadline
1562        .checked_duration_since(std::time::Instant::now())
1563        .unwrap_or(std::time::Duration::ZERO);
1564    if rx_stderr_done.recv_timeout(remaining).is_ok() {
1565        let _ = t_stderr.join();
1566    } else {
1567        drop(t_stderr); // detach: descendant holds the pipe open
1568    }
1569
1570    // Join watcher if present; it exits promptly once child_done is set.
1571    if let Some(w) = watcher {
1572        let _ = w.join();
1573    }
1574
1575    // Reload the latest notification config from meta.json to pick up any post-creation
1576    // updates (e.g. from `notify set` invoked after the job was launched).
1577    let latest_notification = job_dir.read_meta().ok().and_then(|m| m.notification);
1578    let (current_notify_command, current_notify_file) = match &latest_notification {
1579        Some(n) => (n.notify_command.clone(), n.notify_file.clone()),
1580        None => (None, None),
1581    };
1582
1583    // Dispatch completion event to configured notification sinks.
1584    // Failure here must not alter job state (delivery result is recorded separately).
1585    let has_notification = current_notify_command.is_some() || current_notify_file.is_some();
1586    if has_notification {
1587        let stdout_log = job_dir.stdout_path().display().to_string();
1588        let stderr_log = job_dir.stderr_path().display().to_string();
1589        let event = crate::schema::CompletionEvent {
1590            schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1591            event_type: "job.finished".to_string(),
1592            job_id: job_id.to_string(),
1593            state: terminal_status.as_str().to_string(),
1594            command: meta.command.clone(),
1595            cwd: meta.cwd.clone(),
1596            started_at,
1597            finished_at,
1598            duration_ms: Some(duration_ms),
1599            exit_code,
1600            signal: signal_name,
1601            stdout_log_path: stdout_log,
1602            stderr_log_path: stderr_log,
1603        };
1604
1605        let event_json = serde_json::to_string(&event).unwrap_or_default();
1606        let event_path = job_dir.completion_event_path().display().to_string();
1607        let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
1608
1609        // Write initial completion_event.json before dispatching sinks.
1610        if let Err(e) =
1611            job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1612                event: event.clone(),
1613                delivery_results: vec![],
1614            })
1615        {
1616            warn!(job_id, error = %e, "failed to write initial completion_event.json");
1617        }
1618
1619        if let Some(ref shell_cmd) = current_notify_command {
1620            delivery_results.push(dispatch_command_sink(
1621                shell_cmd,
1622                &event_json,
1623                job_id,
1624                &event_path,
1625                &opts.shell_wrapper,
1626                "job.finished",
1627            ));
1628        }
1629        if let Some(ref file_path) = current_notify_file {
1630            delivery_results.push(dispatch_file_sink(file_path, &event_json));
1631        }
1632
1633        // Update completion_event.json with delivery results.
1634        if let Err(e) =
1635            job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1636                event,
1637                delivery_results,
1638            })
1639        {
1640            warn!(job_id, error = %e, "failed to update completion_event.json with delivery results");
1641        }
1642    }
1643
1644    Ok(())
1645}
1646
1647/// Dispatch the command sink: execute the shell command string via the configured shell wrapper,
1648/// pass event JSON via stdin, and set AGENT_EXEC_EVENT_PATH / AGENT_EXEC_JOB_ID /
1649/// AGENT_EXEC_EVENT_TYPE env vars.
1650///
1651/// The shell wrapper argv (e.g. `["sh", "-lc"]`) is provided by the caller.
1652/// The command string is appended as the final argument to the wrapper.
1653fn dispatch_command_sink(
1654    shell_cmd: &str,
1655    event_json: &str,
1656    job_id: &str,
1657    event_path: &str,
1658    shell_wrapper: &[String],
1659    event_type: &str,
1660) -> crate::schema::SinkDeliveryResult {
1661    use std::io::Write;
1662    let attempted_at = now_rfc3339();
1663    let target = shell_cmd.to_string();
1664
1665    if shell_cmd.trim().is_empty() {
1666        return crate::schema::SinkDeliveryResult {
1667            sink_type: "command".to_string(),
1668            target,
1669            success: false,
1670            error: Some("empty shell command".to_string()),
1671            attempted_at,
1672        };
1673    }
1674
1675    if shell_wrapper.is_empty() {
1676        return crate::schema::SinkDeliveryResult {
1677            sink_type: "command".to_string(),
1678            target,
1679            success: false,
1680            error: Some("shell wrapper must not be empty".to_string()),
1681            attempted_at,
1682        };
1683    }
1684
1685    let mut cmd = Command::new(&shell_wrapper[0]);
1686    cmd.args(&shell_wrapper[1..]).arg(shell_cmd);
1687
1688    cmd.env("AGENT_EXEC_EVENT_PATH", event_path);
1689    cmd.env("AGENT_EXEC_JOB_ID", job_id);
1690    cmd.env("AGENT_EXEC_EVENT_TYPE", event_type);
1691    cmd.stdin(std::process::Stdio::piped());
1692    cmd.stdout(std::process::Stdio::null());
1693    cmd.stderr(std::process::Stdio::null());
1694
1695    match cmd.spawn() {
1696        Ok(mut child) => {
1697            if let Some(mut stdin) = child.stdin.take() {
1698                let _ = stdin.write_all(event_json.as_bytes());
1699            }
1700            match child.wait() {
1701                Ok(status) if status.success() => crate::schema::SinkDeliveryResult {
1702                    sink_type: "command".to_string(),
1703                    target,
1704                    success: true,
1705                    error: None,
1706                    attempted_at,
1707                },
1708                Ok(status) => crate::schema::SinkDeliveryResult {
1709                    sink_type: "command".to_string(),
1710                    target,
1711                    success: false,
1712                    error: Some(format!("exited with status {status}")),
1713                    attempted_at,
1714                },
1715                Err(e) => crate::schema::SinkDeliveryResult {
1716                    sink_type: "command".to_string(),
1717                    target,
1718                    success: false,
1719                    error: Some(format!("wait error: {e}")),
1720                    attempted_at,
1721                },
1722            }
1723        }
1724        Err(e) => crate::schema::SinkDeliveryResult {
1725            sink_type: "command".to_string(),
1726            target,
1727            success: false,
1728            error: Some(format!("spawn error: {e}")),
1729            attempted_at,
1730        },
1731    }
1732}
1733
1734/// Dispatch the file sink: append event JSON as a single NDJSON line.
1735/// Creates parent directories automatically.
1736fn dispatch_file_sink(file_path: &str, event_json: &str) -> crate::schema::SinkDeliveryResult {
1737    use std::io::Write;
1738    let attempted_at = now_rfc3339();
1739    let path = std::path::Path::new(file_path);
1740
1741    if let Some(parent) = path.parent()
1742        && let Err(e) = std::fs::create_dir_all(parent)
1743    {
1744        return crate::schema::SinkDeliveryResult {
1745            sink_type: "file".to_string(),
1746            target: file_path.to_string(),
1747            success: false,
1748            error: Some(format!("create parent dir: {e}")),
1749            attempted_at,
1750        };
1751    }
1752
1753    match std::fs::OpenOptions::new()
1754        .create(true)
1755        .append(true)
1756        .open(path)
1757    {
1758        Ok(mut f) => match writeln!(f, "{event_json}") {
1759            Ok(_) => crate::schema::SinkDeliveryResult {
1760                sink_type: "file".to_string(),
1761                target: file_path.to_string(),
1762                success: true,
1763                error: None,
1764                attempted_at,
1765            },
1766            Err(e) => crate::schema::SinkDeliveryResult {
1767                sink_type: "file".to_string(),
1768                target: file_path.to_string(),
1769                success: false,
1770                error: Some(format!("write error: {e}")),
1771                attempted_at,
1772            },
1773        },
1774        Err(e) => crate::schema::SinkDeliveryResult {
1775            sink_type: "file".to_string(),
1776            target: file_path.to_string(),
1777            success: false,
1778            error: Some(format!("open error: {e}")),
1779            attempted_at,
1780        },
1781    }
1782}
1783
1784/// Public alias so other modules can call the timestamp helper.
1785pub fn now_rfc3339_pub() -> String {
1786    now_rfc3339()
1787}
1788
1789fn now_rfc3339() -> String {
1790    // Use a simple approach that works without chrono.
1791    let d = std::time::SystemTime::now()
1792        .duration_since(std::time::UNIX_EPOCH)
1793        .unwrap_or_default();
1794    format_rfc3339(d.as_secs())
1795}
1796
1797fn format_rfc3339(secs: u64) -> String {
1798    // Manual conversion of Unix timestamp to UTC date-time string.
1799    let mut s = secs;
1800    let seconds = s % 60;
1801    s /= 60;
1802    let minutes = s % 60;
1803    s /= 60;
1804    let hours = s % 24;
1805    s /= 24;
1806
1807    // Days since 1970-01-01
1808    let mut days = s;
1809    let mut year = 1970u64;
1810    loop {
1811        let days_in_year = if is_leap(year) { 366 } else { 365 };
1812        if days < days_in_year {
1813            break;
1814        }
1815        days -= days_in_year;
1816        year += 1;
1817    }
1818
1819    let leap = is_leap(year);
1820    let month_days: [u64; 12] = [
1821        31,
1822        if leap { 29 } else { 28 },
1823        31,
1824        30,
1825        31,
1826        30,
1827        31,
1828        31,
1829        30,
1830        31,
1831        30,
1832        31,
1833    ];
1834    let mut month = 0usize;
1835    for (i, &d) in month_days.iter().enumerate() {
1836        if days < d {
1837            month = i;
1838            break;
1839        }
1840        days -= d;
1841    }
1842    let day = days + 1;
1843
1844    format!(
1845        "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
1846        year,
1847        month + 1,
1848        day,
1849        hours,
1850        minutes,
1851        seconds
1852    )
1853}
1854
1855fn is_leap(year: u64) -> bool {
1856    (year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
1857}
1858
1859/// Windows-only: create a named Job Object and assign the given child process
1860/// to it so that the entire process tree can be terminated via `kill`.
1861///
1862/// The Job Object is named `"AgentExec-{job_id}"`. This name is stored in
1863/// `state.json` so that future `kill` invocations can open the same Job Object
1864/// by name and call `TerminateJobObject` to stop the whole tree.
1865///
1866/// Returns `Ok(name)` on success.  Returns `Err` on failure — the caller
1867/// (`supervise`) treats failure as a fatal error because reliable process-tree
1868/// management is a Windows MUST requirement (design.md).
1869#[cfg(windows)]
1870fn assign_to_job_object(job_id: &str, pid: u32) -> Result<String> {
1871    use windows::Win32::Foundation::CloseHandle;
1872    use windows::Win32::System::JobObjects::{AssignProcessToJobObject, CreateJobObjectW};
1873    use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
1874    use windows::core::HSTRING;
1875
1876    let job_name = format!("AgentExec-{job_id}");
1877    let hname = HSTRING::from(job_name.as_str());
1878
1879    unsafe {
1880        // Open the child process handle (needed for AssignProcessToJobObject).
1881        let proc_handle =
1882            OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid).map_err(|e| {
1883                anyhow::anyhow!(
1884                    "supervisor: OpenProcess(pid={pid}) failed — cannot assign to Job Object: {e}"
1885                )
1886            })?;
1887
1888        // Create a named Job Object.
1889        let job = match CreateJobObjectW(None, &hname) {
1890            Ok(h) => h,
1891            Err(e) => {
1892                let _ = CloseHandle(proc_handle);
1893                return Err(anyhow::anyhow!(
1894                    "supervisor: CreateJobObjectW({job_name}) failed: {e}"
1895                ));
1896            }
1897        };
1898
1899        // Assign the child process to the Job Object.
1900        // This can fail if the process is already in another job (e.g. CI/nested).
1901        // Per design.md, assignment is a MUST on Windows — failure is a fatal error.
1902        if let Err(e) = AssignProcessToJobObject(job, proc_handle) {
1903            let _ = CloseHandle(job);
1904            let _ = CloseHandle(proc_handle);
1905            return Err(anyhow::anyhow!(
1906                "supervisor: AssignProcessToJobObject(pid={pid}) failed \
1907                 (process may already belong to another Job Object, e.g. in a CI environment): {e}"
1908            ));
1909        }
1910
1911        // Keep job handle open for the lifetime of the supervisor so the Job
1912        // Object remains valid. We intentionally do NOT close it here.
1913        // The OS will close it automatically when the supervisor exits.
1914        // (We close proc_handle since we only needed it for assignment.)
1915        let _ = CloseHandle(proc_handle);
1916        // Note: job handle is intentionally leaked here to keep the Job Object alive.
1917        // The handle will be closed when the supervisor process exits.
1918        std::mem::forget(job);
1919    }
1920
1921    info!(job_id, name = %job_name, "supervisor: child assigned to Job Object");
1922    Ok(job_name)
1923}
1924
1925#[cfg(test)]
1926mod tests {
1927    use super::*;
1928
1929    #[test]
1930    fn rfc3339_epoch() {
1931        assert_eq!(format_rfc3339(0), "1970-01-01T00:00:00Z");
1932    }
1933
1934    #[test]
1935    fn rfc3339_known_date() {
1936        // 2024-01-01T00:00:00Z = 1704067200
1937        assert_eq!(format_rfc3339(1704067200), "2024-01-01T00:00:00Z");
1938    }
1939}