Skip to main content

agent_exec/
run.rs

1//! Implementation of the `run` sub-command.
2//!
3//! Design decisions (from design.md):
4//! - `run` spawns a short-lived front-end that forks a `_supervise` child.
5//! - The supervisor continues logging stdout/stderr after `run` returns.
6//! - `run` returns launch metadata immediately; observation is delegated to
7//!   `status` / `wait` / `tail`.
8
9use anyhow::{Context, Result};
10use std::io::IsTerminal;
11use std::path::Path;
12use std::process::Command;
13use tracing::{debug, info, warn};
14
15use crate::jobstore::{JobDir, generate_job_id, resolve_root};
16use crate::schema::{
17    JobMeta, JobMetaJob, JobState, JobStateJob, JobStateResult, JobStatus, Response, RunData,
18};
19
20#[derive(Debug, Clone)]
21pub struct InlineObservation {
22    pub waited_ms: u64,
23    pub stdout: String,
24    pub stderr: String,
25    pub stdout_range: [u64; 2],
26    pub stderr_range: [u64; 2],
27    pub stdout_total_bytes: u64,
28    pub stderr_total_bytes: u64,
29    pub encoding: String,
30    pub state: String,
31    pub exit_code: Option<i32>,
32    pub finished_at: Option<String>,
33    pub signal: Option<String>,
34    pub duration_ms: Option<u64>,
35}
36use crate::tag::dedup_tags;
37
38/// Options for the `run` sub-command.
39///
40/// # Definition-time option alignment rule
41///
42/// Every definition-time option accepted here MUST also be accepted by `create` (and vice versa),
43/// since both commands write the same persisted job definition to `meta.json`. When adding a
44/// new persisted metadata field, wire it through both `run` and `create` unless the spec
45/// explicitly documents it as launch-only (e.g. snapshot timing, tail sizing, --wait).
46#[derive(Debug)]
47pub struct RunOpts<'a> {
48    /// Command and arguments to execute.
49    pub command: Vec<String>,
50    /// Override for jobs root directory.
51    pub root: Option<&'a str>,
52    /// Disable best-effort auto-GC for this invocation.
53    pub no_auto_gc: bool,
54    /// Optional auto-GC retention override.
55    pub auto_gc_older_than: Option<String>,
56    /// Optional auto-GC max-jobs override.
57    pub auto_gc_max_jobs: Option<u64>,
58    /// Optional auto-GC max-bytes override.
59    pub auto_gc_max_bytes: Option<u64>,
60    /// Base auto-GC settings resolved from config/defaults.
61    pub auto_gc_config: crate::gc::AutoGcConfig,
62    /// Wait for inline output observation before returning.
63    pub wait: bool,
64    /// Maximum wait duration in seconds for inline observation.
65    pub until_seconds: u64,
66    /// Wait indefinitely for terminal state / observation budget.
67    pub forever: bool,
68    /// Maximum bytes to include from the head of each stream.
69    pub max_bytes: u64,
70    /// Timeout in milliseconds; 0 = no timeout.
71    pub timeout_ms: u64,
72    /// Milliseconds after SIGTERM before SIGKILL; 0 = immediate SIGKILL.
73    pub kill_after_ms: u64,
74    /// Working directory for the command.
75    pub cwd: Option<&'a str>,
76    /// Environment variables as KEY=VALUE strings.
77    pub env_vars: Vec<String>,
78    /// Paths to env files, applied in order.
79    pub env_files: Vec<String>,
80    /// Whether to inherit the current process environment (default: true).
81    pub inherit_env: bool,
82    /// Keys to mask in JSON output (values replaced with "***").
83    pub mask: Vec<String>,
84    /// Optional stdin source definition persisted in meta and materialized into stdin.bin.
85    pub stdin: Option<StdinSource>,
86    /// Maximum bytes allowed for materialized stdin.bin (default: 64 MiB).
87    pub stdin_max_bytes: u64,
88    /// User-defined tags for this job (deduplicated preserving first-seen order).
89    pub tags: Vec<String>,
90    /// Override full.log path; None = use job dir.
91    pub log: Option<&'a str>,
92    /// Interval (ms) for state.json updated_at refresh; 0 = disabled.
93    pub progress_every_ms: u64,
94    /// Shell command string for command notification sink; executed via platform shell.
95    /// None = no command sink.
96    pub notify_command: Option<String>,
97    /// File path for NDJSON notification sink; None = no file sink.
98    pub notify_file: Option<String>,
99    /// Pattern to match against output lines (output-match notification).
100    pub output_pattern: Option<String>,
101    /// Match type for output-match: "contains" or "regex".
102    pub output_match_type: Option<String>,
103    /// Stream selector: "stdout", "stderr", or "either".
104    pub output_stream: Option<String>,
105    /// Shell command string for output-match command sink.
106    pub output_command: Option<String>,
107    /// File path for output-match NDJSON file sink.
108    pub output_file: Option<String>,
109    /// Resolved shell wrapper argv used to execute command strings.
110    /// e.g. `["sh", "-lc"]` or `["bash", "-lc"]`.
111    pub shell_wrapper: Vec<String>,
112}
113
114impl<'a> Default for RunOpts<'a> {
115    fn default() -> Self {
116        RunOpts {
117            command: vec![],
118            root: None,
119            no_auto_gc: false,
120            auto_gc_older_than: None,
121            auto_gc_max_jobs: None,
122            auto_gc_max_bytes: None,
123            auto_gc_config: crate::gc::AutoGcConfig::default(),
124            wait: true,
125            until_seconds: 10,
126            forever: false,
127            max_bytes: 65536,
128            timeout_ms: 0,
129            kill_after_ms: 0,
130            cwd: None,
131            env_vars: vec![],
132            env_files: vec![],
133            inherit_env: true,
134            mask: vec![],
135            stdin: None,
136            stdin_max_bytes: DEFAULT_STDIN_MAX_BYTES,
137            tags: vec![],
138            log: None,
139            progress_every_ms: 0,
140            notify_command: None,
141            notify_file: None,
142            output_pattern: None,
143            output_match_type: None,
144            output_stream: None,
145            output_command: None,
146            output_file: None,
147            shell_wrapper: crate::config::default_shell_wrapper(),
148        }
149    }
150}
151
152/// Parameters for spawning a supervisor process.
153///
154/// Shared by `run::execute` and `start::execute`.
155#[derive(Debug, Clone)]
156pub enum StdinSource {
157    CallerStdin,
158    Inline(String),
159    File(String),
160}
161
162pub struct SpawnSupervisorParams {
163    pub job_id: String,
164    pub root: std::path::PathBuf,
165    pub full_log_path: String,
166    pub timeout_ms: u64,
167    pub kill_after_ms: u64,
168    pub cwd: Option<String>,
169    /// Real (unmasked) KEY=VALUE env var pairs.
170    pub env_vars: Vec<String>,
171    pub env_files: Vec<String>,
172    pub inherit_env: bool,
173    pub stdin_file: Option<String>,
174    pub progress_every_ms: u64,
175    pub notify_command: Option<String>,
176    pub notify_file: Option<String>,
177    pub shell_wrapper: Vec<String>,
178    pub command: Vec<String>,
179}
180
181pub fn resolve_stdin_source(
182    stdin: Option<String>,
183    stdin_file: Option<String>,
184) -> Option<StdinSource> {
185    if let Some(value) = stdin {
186        if value == "-" {
187            Some(StdinSource::CallerStdin)
188        } else {
189            Some(StdinSource::Inline(value))
190        }
191    } else {
192        stdin_file.map(StdinSource::File)
193    }
194}
195
196pub fn validate_stdin_source(stdin: Option<&StdinSource>) -> Result<()> {
197    if matches!(stdin, Some(StdinSource::CallerStdin)) {
198        let stdin = std::io::stdin();
199        if stdin.is_terminal() {
200            return Err(anyhow::anyhow!(StdinRequired(
201                "stdin_required: --stdin - requires non-interactive stdin (pipe/heredoc/redirect)"
202                    .to_string(),
203            )));
204        }
205    }
206    Ok(())
207}
208
209fn create_stdin_file(path: &std::path::Path) -> Result<std::fs::File> {
210    #[cfg(unix)]
211    {
212        use std::os::unix::fs::OpenOptionsExt;
213        std::fs::OpenOptions::new()
214            .write(true)
215            .create(true)
216            .truncate(true)
217            .mode(0o600)
218            .open(path)
219            .with_context(|| format!("create materialized stdin {}", path.display()))
220    }
221    #[cfg(not(unix))]
222    {
223        std::fs::File::create(path)
224            .with_context(|| format!("create materialized stdin {}", path.display()))
225    }
226}
227
228struct LimitedWriter<W> {
229    inner: W,
230    written: u64,
231    limit: u64,
232}
233
234impl<W: std::io::Write> std::io::Write for LimitedWriter<W> {
235    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
236        if self.written + buf.len() as u64 > self.limit {
237            return Err(std::io::Error::other("stdin_too_large"));
238        }
239        let n = self.inner.write(buf)?;
240        self.written += n as u64;
241        Ok(n)
242    }
243
244    fn flush(&mut self) -> std::io::Result<()> {
245        self.inner.flush()
246    }
247}
248
249fn materialize_stdin(
250    job_dir: &JobDir,
251    stdin: Option<&StdinSource>,
252    max_bytes: u64,
253) -> Result<Option<String>> {
254    let Some(source) = stdin else {
255        return Ok(None);
256    };
257
258    let target_name = "stdin.bin".to_string();
259    let target_path = job_dir.path.join(&target_name);
260    let file = create_stdin_file(&target_path)?;
261    let mut target = LimitedWriter {
262        inner: file,
263        written: 0,
264        limit: max_bytes,
265    };
266
267    let copy_result = match source {
268        StdinSource::CallerStdin => {
269            let mut stdin = std::io::stdin();
270            std::io::copy(&mut stdin, &mut target).context("materialize caller stdin to stdin.bin")
271        }
272        StdinSource::Inline(value) => {
273            use std::io::Write;
274            target
275                .write_all(value.as_bytes())
276                .map(|_| 0u64)
277                .context("write inline stdin to stdin.bin")
278        }
279        StdinSource::File(path) => {
280            let mut input = std::fs::File::open(path)
281                .with_context(|| format!("open --stdin-file source {}", path))?;
282            std::io::copy(&mut input, &mut target)
283                .with_context(|| format!("copy --stdin-file source {} to stdin.bin", path))
284        }
285    };
286
287    if let Err(e) = copy_result {
288        let _ = std::fs::remove_file(&target_path);
289        let is_too_large = e
290            .chain()
291            .any(|cause| cause.to_string().contains("stdin_too_large"));
292        if is_too_large {
293            return Err(anyhow::anyhow!(StdinTooLarge(format!(
294                "stdin_too_large: input exceeds {} byte limit",
295                max_bytes
296            ))));
297        }
298        return Err(e);
299    }
300
301    Ok(Some(target_name))
302}
303
304fn resolve_stdin_path(job_dir: &JobDir, stdin_file: Option<&str>) -> Option<std::path::PathBuf> {
305    stdin_file.map(|p| {
306        let path = std::path::Path::new(p);
307        if path.is_absolute() {
308            path.to_path_buf()
309        } else {
310            job_dir.path.join(path)
311        }
312    })
313}
314
315#[derive(Debug)]
316pub struct StdinRequired(pub String);
317
318impl std::fmt::Display for StdinRequired {
319    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
320        write!(f, "{}", self.0)
321    }
322}
323
324impl std::error::Error for StdinRequired {}
325
326#[derive(Debug)]
327pub struct StdinTooLarge(pub String);
328
329impl std::fmt::Display for StdinTooLarge {
330    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331        write!(f, "{}", self.0)
332    }
333}
334
335impl std::error::Error for StdinTooLarge {}
336
337pub fn open_child_stdin(job_dir: &JobDir, stdin_file: Option<&str>) -> Result<std::process::Stdio> {
338    if let Some(path) = resolve_stdin_path(job_dir, stdin_file) {
339        let file = std::fs::File::open(&path)
340            .with_context(|| format!("open materialized stdin {}", path.display()))?;
341        Ok(std::process::Stdio::from(file))
342    } else {
343        Ok(std::process::Stdio::null())
344    }
345}
346
347pub const DEFAULT_STDIN_MAX_BYTES: u64 = 64 * 1024 * 1024; // 64 MiB
348
349pub fn materialize_stdin_for_job(
350    job_dir: &JobDir,
351    stdin: Option<&StdinSource>,
352    max_bytes: u64,
353) -> Result<Option<String>> {
354    materialize_stdin(job_dir, stdin, max_bytes)
355}
356
357/// Spawn the supervisor process and write the initial running state to `state.json`.
358///
359/// Returns the supervisor PID and the actual `started_at` timestamp.
360/// Also handles the Windows Job Object handshake before returning.
361pub fn spawn_supervisor_process(
362    job_dir: &JobDir,
363    params: SpawnSupervisorParams,
364) -> Result<(u32, String)> {
365    let started_at = now_rfc3339();
366
367    let exe = std::env::current_exe().context("resolve current exe")?;
368    let mut supervisor_cmd = Command::new(&exe);
369    supervisor_cmd
370        .arg("_supervise")
371        .arg("--job-id")
372        .arg(&params.job_id)
373        .arg("--supervise-root")
374        .arg(params.root.display().to_string())
375        .arg("--full-log")
376        .arg(&params.full_log_path);
377
378    if params.timeout_ms > 0 {
379        let timeout_seconds = params.timeout_ms.saturating_add(999) / 1000;
380        supervisor_cmd
381            .arg("--timeout")
382            .arg(timeout_seconds.to_string());
383    }
384    if params.kill_after_ms > 0 {
385        let kill_after_seconds = params.kill_after_ms.saturating_add(999) / 1000;
386        supervisor_cmd
387            .arg("--kill-after")
388            .arg(kill_after_seconds.to_string());
389    }
390    if let Some(ref cwd) = params.cwd {
391        supervisor_cmd.arg("--cwd").arg(cwd);
392    }
393    for env_file in &params.env_files {
394        supervisor_cmd.arg("--env-file").arg(env_file);
395    }
396    for env_var in &params.env_vars {
397        supervisor_cmd.arg("--env").arg(env_var);
398    }
399    if !params.inherit_env {
400        supervisor_cmd.arg("--no-inherit-env");
401    }
402    if let Some(ref stdin_file) = params.stdin_file {
403        supervisor_cmd.arg("--stdin-file").arg(stdin_file);
404    }
405    if params.progress_every_ms > 0 {
406        let progress_every_seconds = params.progress_every_ms.saturating_add(999) / 1000;
407        supervisor_cmd
408            .arg("--progress-every")
409            .arg(progress_every_seconds.to_string());
410    }
411    if let Some(ref nc) = params.notify_command {
412        supervisor_cmd.arg("--notify-command").arg(nc);
413    }
414    if let Some(ref nf) = params.notify_file {
415        supervisor_cmd.arg("--notify-file").arg(nf);
416    }
417    let wrapper_json =
418        serde_json::to_string(&params.shell_wrapper).context("serialize shell wrapper")?;
419    supervisor_cmd
420        .arg("--shell-wrapper-resolved")
421        .arg(&wrapper_json);
422
423    supervisor_cmd
424        .arg("--")
425        .args(&params.command)
426        .stdin(std::process::Stdio::null())
427        .stdout(std::process::Stdio::null())
428        .stderr(std::process::Stdio::null());
429
430    let supervisor = supervisor_cmd.spawn().context("spawn supervisor")?;
431    let supervisor_pid = supervisor.id();
432    debug!(supervisor_pid, "supervisor spawned");
433
434    // Write initial running state.
435    job_dir.init_state(supervisor_pid, &started_at)?;
436
437    // Windows Job Object handshake.
438    #[cfg(windows)]
439    {
440        let handshake_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
441        loop {
442            std::thread::sleep(std::time::Duration::from_millis(10));
443            if let Ok(current_state) = job_dir.read_state() {
444                let supervisor_updated = current_state
445                    .pid
446                    .map(|p| p != supervisor_pid)
447                    .unwrap_or(false)
448                    || *current_state.status() == crate::schema::JobStatus::Failed;
449                if supervisor_updated {
450                    if *current_state.status() == crate::schema::JobStatus::Failed {
451                        anyhow::bail!(
452                            "supervisor failed to assign child process to Job Object \
453                             (Windows MUST requirement); see stderr for details"
454                        );
455                    }
456                    debug!("supervisor confirmed Job Object assignment via state.json handshake");
457                    break;
458                }
459            }
460            if std::time::Instant::now() >= handshake_deadline {
461                debug!("supervisor handshake timed out; proceeding with initial state");
462                break;
463            }
464        }
465    }
466
467    Ok((supervisor_pid, started_at))
468}
469
470/// Pre-create empty log files (stdout.log, stderr.log, full.log) so they exist
471/// immediately after job creation, before the supervisor starts writing.
472pub fn pre_create_log_files(job_dir: &JobDir) -> Result<()> {
473    for log_path in [
474        job_dir.stdout_path(),
475        job_dir.stderr_path(),
476        job_dir.full_log_path(),
477    ] {
478        std::fs::OpenOptions::new()
479            .create(true)
480            .append(true)
481            .open(&log_path)
482            .with_context(|| format!("pre-create log file {}", log_path.display()))?;
483    }
484    Ok(())
485}
486
487/// Execute `run`: spawn job and return launch metadata immediately.
488pub fn execute(opts: RunOpts) -> Result<()> {
489    if opts.command.is_empty() {
490        anyhow::bail!("no command specified for run");
491    }
492
493    let elapsed_start = std::time::Instant::now();
494
495    let root = resolve_root(opts.root);
496    std::fs::create_dir_all(&root)
497        .with_context(|| format!("create jobs root {}", root.display()))?;
498
499    let job_id = generate_job_id(&root)?;
500    let created_at = now_rfc3339();
501
502    // Extract only the key names from KEY=VALUE env var strings (values are not persisted).
503    let env_keys: Vec<String> = opts
504        .env_vars
505        .iter()
506        .map(|kv| kv.split('=').next().unwrap_or(kv.as_str()).to_string())
507        .collect();
508
509    // Apply masking: replace values of masked keys with "***" in env_vars for metadata.
510    let masked_env_vars = mask_env_vars(&opts.env_vars, &opts.mask);
511
512    // Resolve the effective working directory for this job.
513    // If --cwd was specified, use that path; otherwise use the current process's working directory.
514    // Canonicalize the path for consistent comparison; fall back to absolute path on failure.
515    let effective_cwd = resolve_effective_cwd(opts.cwd);
516
517    // Build output-match config from definition-time options (same logic as `create` and `notify set`).
518    let on_output_match = crate::notify::build_output_match_config(
519        opts.output_pattern,
520        opts.output_match_type,
521        opts.output_stream,
522        opts.output_command,
523        opts.output_file,
524        None,
525    );
526
527    let notification =
528        if opts.notify_command.is_some() || opts.notify_file.is_some() || on_output_match.is_some()
529        {
530            Some(crate::schema::NotificationConfig {
531                notify_command: opts.notify_command.clone(),
532                notify_file: opts.notify_file.clone(),
533                on_output_match,
534            })
535        } else {
536            None
537        };
538
539    // Validate and deduplicate tags (preserving first-seen order).
540    let tags = dedup_tags(opts.tags)?;
541
542    let meta = JobMeta {
543        job: JobMetaJob { id: job_id.clone() },
544        schema_version: crate::schema::SCHEMA_VERSION.to_string(),
545        command: opts.command.clone(),
546        created_at: created_at.clone(),
547        root: root.display().to_string(),
548        env_keys,
549        env_vars: masked_env_vars.clone(),
550        // For `run`, env_vars_runtime is not populated because the supervisor
551        // is spawned immediately with the real values; no deferred start needed.
552        env_vars_runtime: vec![],
553        mask: opts.mask.clone(),
554        cwd: Some(effective_cwd),
555        notification,
556        // Execution-definition fields (used by start if ever applicable).
557        inherit_env: opts.inherit_env,
558        env_files: opts.env_files.clone(),
559        timeout_ms: opts.timeout_ms,
560        kill_after_ms: opts.kill_after_ms,
561        progress_every_ms: opts.progress_every_ms,
562        shell_wrapper: Some(opts.shell_wrapper.clone()),
563        stdin_file: None,
564        tags: tags.clone(),
565    };
566
567    validate_stdin_source(opts.stdin.as_ref())?;
568
569    let job_dir = JobDir::create(&root, &job_id, &meta)?;
570    let stdin_file =
571        materialize_stdin_for_job(&job_dir, opts.stdin.as_ref(), opts.stdin_max_bytes)?;
572    if stdin_file.is_some() {
573        let mut meta_with_stdin = meta.clone();
574        meta_with_stdin.stdin_file = stdin_file.clone();
575        job_dir.write_meta_atomic(&meta_with_stdin)?;
576    }
577    info!(job_id = %job_id, "created job directory");
578
579    // Determine the full.log path (may be overridden by --log).
580    let full_log_path = if let Some(log) = opts.log {
581        log.to_string()
582    } else {
583        job_dir.full_log_path().display().to_string()
584    };
585
586    // Pre-create empty log files so they exist before the supervisor starts.
587    pre_create_log_files(&job_dir)?;
588
589    // Spawn the supervisor using the shared helper.
590    // Note: masking is handled by `run` (meta.json + JSON response). The supervisor
591    // receives the real env var values so the child process can use them as intended.
592    let (_supervisor_pid, _started_at) = spawn_supervisor_process(
593        &job_dir,
594        SpawnSupervisorParams {
595            job_id: job_id.clone(),
596            root: root.clone(),
597            full_log_path: full_log_path.clone(),
598            timeout_ms: opts.timeout_ms,
599            kill_after_ms: opts.kill_after_ms,
600            cwd: opts.cwd.map(|s| s.to_string()),
601            env_vars: opts.env_vars.clone(),
602            env_files: opts.env_files.clone(),
603            inherit_env: opts.inherit_env,
604            stdin_file: stdin_file.clone(),
605            progress_every_ms: opts.progress_every_ms,
606            notify_command: opts.notify_command.clone(),
607            notify_file: opts.notify_file.clone(),
608            shell_wrapper: opts.shell_wrapper.clone(),
609            command: opts.command.clone(),
610        },
611    )?;
612
613    // Compute absolute paths for stdout.log and stderr.log.
614    let stdout_log_path = job_dir.stdout_path().display().to_string();
615    let stderr_log_path = job_dir.stderr_path().display().to_string();
616
617    let observation = observe_inline_output(
618        &job_dir,
619        opts.wait,
620        opts.until_seconds,
621        opts.forever,
622        opts.max_bytes,
623    )?;
624    let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
625
626    if !opts.no_auto_gc {
627        let mut auto_cfg = opts.auto_gc_config.clone();
628        if let Some(v) = opts.auto_gc_older_than {
629            auto_cfg.older_than = v;
630        }
631        if let Some(v) = opts.auto_gc_max_jobs {
632            auto_cfg.max_jobs = usize::try_from(v).ok();
633        }
634        if let Some(v) = opts.auto_gc_max_bytes {
635            auto_cfg.max_bytes = Some(v);
636        }
637        crate::gc::maybe_run_auto_gc(&root, &auto_cfg);
638    }
639
640    let response = Response::new(
641        "run",
642        RunData {
643            job_id,
644            state: observation.state,
645            tags,
646            // Include masked env_vars in the JSON response so callers can inspect
647            // which variables were set (with secret values replaced by "***").
648            env_vars: masked_env_vars,
649            stdout_log_path,
650            stderr_log_path,
651            elapsed_ms,
652            waited_ms: observation.waited_ms,
653            stdout: observation.stdout,
654            stderr: observation.stderr,
655            stdout_range: observation.stdout_range,
656            stderr_range: observation.stderr_range,
657            stdout_total_bytes: observation.stdout_total_bytes,
658            stderr_total_bytes: observation.stderr_total_bytes,
659            encoding: observation.encoding,
660            exit_code: observation.exit_code,
661            finished_at: observation.finished_at,
662            signal: observation.signal,
663            duration_ms: observation.duration_ms,
664        },
665    );
666    response.print();
667    Ok(())
668}
669
670/// Options for the `_supervise` internal sub-command.
671///
672/// Masking is the responsibility of `run` (which writes masked values to meta.json
673/// and includes them in the JSON response). The supervisor only needs the real
674/// environment variable values to launch the child process correctly.
675#[derive(Debug)]
676pub struct SuperviseOpts<'a> {
677    pub job_id: &'a str,
678    pub root: &'a Path,
679    pub command: &'a [String],
680    /// Override full.log path; None = use job dir default.
681    pub full_log: Option<&'a str>,
682    /// Timeout in milliseconds; 0 = no timeout.
683    pub timeout_ms: u64,
684    /// Milliseconds after SIGTERM before SIGKILL; 0 = immediate SIGKILL.
685    pub kill_after_ms: u64,
686    /// Working directory for the child process.
687    pub cwd: Option<&'a str>,
688    /// Environment variables as KEY=VALUE strings (real values, not masked).
689    pub env_vars: Vec<String>,
690    /// Paths to env files, applied in order.
691    pub env_files: Vec<String>,
692    /// Whether to inherit the current process environment.
693    pub inherit_env: bool,
694    /// Materialized stdin file path (relative to job dir) to feed child stdin.
695    pub stdin_file: Option<String>,
696    /// Interval (ms) for state.json updated_at refresh; 0 = disabled.
697    pub progress_every_ms: u64,
698    /// Shell command string for command notification sink; executed via platform shell.
699    /// None = no command sink.
700    pub notify_command: Option<String>,
701    /// File path for NDJSON notification sink; None = no file sink.
702    pub notify_file: Option<String>,
703    /// Resolved shell wrapper argv used to execute command strings.
704    pub shell_wrapper: Vec<String>,
705}
706
707/// Resolve the effective working directory for a job.
708///
709/// If `cwd_override` is `Some`, use that path as the base. Otherwise use the
710/// current process working directory. In either case, attempt to canonicalize
711/// the path for consistent comparison; on failure, fall back to the absolute
712/// path representation (avoids symlink / permission issues on some systems).
713pub fn observe_inline_output(
714    job_dir: &JobDir,
715    wait: bool,
716    until_seconds: u64,
717    forever: bool,
718    max_bytes: u64,
719) -> Result<InlineObservation> {
720    let started = std::time::Instant::now();
721    let deadline = if wait {
722        if forever {
723            None
724        } else {
725            Some(started + std::time::Duration::from_secs(until_seconds))
726        }
727    } else {
728        Some(started)
729    };
730
731    if wait {
732        loop {
733            let state = job_dir.read_state()?;
734            let has_output = std::fs::metadata(job_dir.stdout_path())
735                .map(|m| m.len() > 0)
736                .unwrap_or(false)
737                || std::fs::metadata(job_dir.stderr_path())
738                    .map(|m| m.len() > 0)
739                    .unwrap_or(false);
740
741            if !state.status().is_non_terminal() || has_output {
742                break;
743            }
744
745            if let Some(dl) = deadline
746                && std::time::Instant::now() >= dl
747            {
748                break;
749            }
750
751            std::thread::sleep(std::time::Duration::from_millis(100));
752        }
753    }
754
755    let state = job_dir.read_state()?;
756    let stdout = job_dir.read_head_metrics("stdout.log", max_bytes);
757    let stderr = job_dir.read_head_metrics("stderr.log", max_bytes);
758
759    Ok(InlineObservation {
760        waited_ms: if wait {
761            started.elapsed().as_millis() as u64
762        } else {
763            0
764        },
765        stdout: stdout.head,
766        stderr: stderr.head,
767        stdout_range: stdout.range,
768        stderr_range: stderr.range,
769        stdout_total_bytes: stdout.observed_bytes,
770        stderr_total_bytes: stderr.observed_bytes,
771        encoding: "utf-8-lossy".to_string(),
772        state: state.status().as_str().to_string(),
773        exit_code: state.exit_code(),
774        signal: state.signal().map(|s| s.to_string()),
775        duration_ms: state.duration_ms(),
776        finished_at: state.finished_at,
777    })
778}
779
780pub fn resolve_effective_cwd(cwd_override: Option<&str>) -> String {
781    let base = match cwd_override {
782        Some(p) => std::path::PathBuf::from(p),
783        None => std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")),
784    };
785
786    // Prefer canonicalized (resolves symlinks); fall back to making the path absolute.
787    match base.canonicalize() {
788        Ok(canonical) => canonical.display().to_string(),
789        Err(_) => {
790            // If base is already absolute, use as-is; otherwise prepend cwd.
791            if base.is_absolute() {
792                base.display().to_string()
793            } else {
794                // Best-effort: join with cwd, ignore errors.
795                let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
796                cwd.join(base).display().to_string()
797            }
798        }
799    }
800}
801
802/// Mask the values of specified keys in a list of KEY=VALUE strings.
803/// Keys listed in `mask_keys` will have their value replaced with "***".
804pub fn mask_env_vars(env_vars: &[String], mask_keys: &[String]) -> Vec<String> {
805    if mask_keys.is_empty() {
806        return env_vars.to_vec();
807    }
808    env_vars
809        .iter()
810        .map(|s| {
811            let (key, _val) = parse_env_var(s);
812            if mask_keys.iter().any(|k| k == &key) {
813                format!("{key}=***")
814            } else {
815                s.clone()
816            }
817        })
818        .collect()
819}
820
821/// Parse a single KEY=VALUE or KEY= string into (key, value).
822fn parse_env_var(s: &str) -> (String, String) {
823    if let Some(pos) = s.find('=') {
824        (s[..pos].to_string(), s[pos + 1..].to_string())
825    } else {
826        (s.to_string(), String::new())
827    }
828}
829
830/// Load environment variables from a .env-style file.
831/// Supports KEY=VALUE lines; lines starting with '#' and empty lines are ignored.
832fn load_env_file(path: &str) -> Result<Vec<(String, String)>> {
833    let contents =
834        std::fs::read_to_string(path).with_context(|| format!("read env-file {path}"))?;
835    let mut vars = Vec::new();
836    for line in contents.lines() {
837        let line = line.trim();
838        if line.is_empty() || line.starts_with('#') {
839            continue;
840        }
841        vars.push(parse_env_var(line));
842    }
843    Ok(vars)
844}
845
846/// Shared state for output-match checking, used by streaming threads in `supervise`.
847///
848/// Reloads `meta.json` on every observed line so that a `notify set` update is
849/// visible for the very next line, regardless of how recently the last reload
850/// occurred.  Multiple streaming threads share the same checker via `Arc`; the
851/// internal `Mutex` serialises access.
852struct OutputMatchChecker {
853    job_dir_path: std::path::PathBuf,
854    shell_wrapper: Vec<String>,
855    inner: std::sync::Mutex<OutputMatchInner>,
856}
857
858struct OutputMatchInner {
859    config: Option<crate::schema::NotificationConfig>,
860}
861
862impl OutputMatchChecker {
863    fn new(
864        job_dir_path: std::path::PathBuf,
865        shell_wrapper: Vec<String>,
866        initial_config: Option<crate::schema::NotificationConfig>,
867    ) -> Self {
868        Self {
869            job_dir_path,
870            shell_wrapper,
871            inner: std::sync::Mutex::new(OutputMatchInner {
872                config: initial_config,
873            }),
874        }
875    }
876
877    /// Check a newly observed output line for a configured match.
878    ///
879    /// Reloads `meta.json` on every call so that `notify set` updates are
880    /// visible for the next line without any delay.
881    /// Dispatches `job.output.matched` events outside the lock to avoid blocking
882    /// other streaming threads.
883    fn check_line(&self, line: &str, stream: &str) {
884        use crate::schema::{OutputMatchStream, OutputMatchType};
885
886        // Lock, reload, evaluate match, then release before dispatching.
887        let match_info: Option<crate::schema::OutputMatchConfig> = {
888            let mut inner = self.inner.lock().unwrap();
889
890            // Reload config on every line to pick up `notify set` updates immediately.
891            {
892                let meta_path = self.job_dir_path.join("meta.json");
893                if let Ok(raw) = std::fs::read(&meta_path)
894                    && let Ok(meta) = serde_json::from_slice::<crate::schema::JobMeta>(&raw)
895                {
896                    inner.config = meta.notification;
897                }
898            }
899
900            let Some(ref notification) = inner.config else {
901                return;
902            };
903            let Some(ref match_cfg) = notification.on_output_match else {
904                return;
905            };
906
907            // Check stream filter.
908            let stream_matches = match match_cfg.stream {
909                OutputMatchStream::Stdout => stream == "stdout",
910                OutputMatchStream::Stderr => stream == "stderr",
911                OutputMatchStream::Either => true,
912            };
913            if !stream_matches {
914                return;
915            }
916
917            // Check pattern.
918            let matched = match &match_cfg.match_type {
919                OutputMatchType::Contains => line.contains(&match_cfg.pattern),
920                OutputMatchType::Regex => regex::Regex::new(&match_cfg.pattern)
921                    .map(|re| re.is_match(line))
922                    .unwrap_or(false),
923            };
924
925            if matched {
926                Some(match_cfg.clone())
927            } else {
928                None
929            }
930        }; // Lock released.
931
932        if let Some(match_cfg) = match_info {
933            self.dispatch_match(line, stream, &match_cfg);
934        }
935    }
936
937    /// Dispatch a `job.output.matched` event and append a delivery record to
938    /// `notification_events.ndjson`.  Failures are non-fatal.
939    fn dispatch_match(
940        &self,
941        line: &str,
942        stream: &str,
943        match_cfg: &crate::schema::OutputMatchConfig,
944    ) {
945        use std::io::Write;
946
947        let job_id = self
948            .job_dir_path
949            .file_name()
950            .and_then(|n| n.to_str())
951            .unwrap_or("unknown");
952
953        let stdout_log_path = self.job_dir_path.join("stdout.log").display().to_string();
954        let stderr_log_path = self.job_dir_path.join("stderr.log").display().to_string();
955        let events_path = self.job_dir_path.join("notification_events.ndjson");
956        let events_path_str = events_path.display().to_string();
957
958        let match_type_str = match &match_cfg.match_type {
959            crate::schema::OutputMatchType::Contains => "contains",
960            crate::schema::OutputMatchType::Regex => "regex",
961        };
962
963        let event = crate::schema::OutputMatchEvent {
964            schema_version: crate::schema::SCHEMA_VERSION.to_string(),
965            event_type: "job.output.matched".to_string(),
966            job_id: job_id.to_string(),
967            pattern: match_cfg.pattern.clone(),
968            match_type: match_type_str.to_string(),
969            stream: stream.to_string(),
970            line: line.to_string(),
971            stdout_log_path,
972            stderr_log_path,
973        };
974
975        let event_json = serde_json::to_string(&event).unwrap_or_default();
976        let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
977
978        if let Some(ref cmd) = match_cfg.command {
979            delivery_results.push(dispatch_command_sink(
980                cmd,
981                &event_json,
982                job_id,
983                &events_path_str,
984                &self.shell_wrapper,
985                "job.output.matched",
986            ));
987        }
988        if let Some(ref file_path) = match_cfg.file {
989            delivery_results.push(dispatch_file_sink(file_path, &event_json));
990        }
991
992        // Append delivery record to notification_events.ndjson.
993        let record = crate::schema::OutputMatchEventRecord {
994            event,
995            delivery_results,
996        };
997        if let Ok(record_json) = serde_json::to_string(&record)
998            && let Ok(mut f) = std::fs::OpenOptions::new()
999                .create(true)
1000                .append(true)
1001                .open(&events_path)
1002        {
1003            let _ = writeln!(f, "{record_json}");
1004        }
1005    }
1006}
1007
1008/// Stream bytes from a child process output pipe to an individual log file and
1009/// to the shared `full.log`.
1010///
1011/// Reads byte chunks (not lines) so that output without a trailing newline is
1012/// still captured in the individual log immediately.  The `full.log` format
1013/// `"<RFC3339> [LABEL] <line>"` is maintained via a line-accumulation buffer:
1014/// bytes are appended to the buffer until a newline is found, at which point a
1015/// formatted line is written to `full.log`.  Any remaining bytes at EOF are
1016/// flushed as a final line.
1017///
1018/// The optional `on_line` callback is invoked for each complete line (without
1019/// the trailing newline) and is used to drive output-match checking.
1020///
1021/// This helper is used by both the stdout and stderr monitoring threads inside
1022/// [`supervise`], replacing the previously duplicated per-stream implementations.
1023/// Buffer size (8192 bytes) and newline-split logic are preserved unchanged.
1024fn stream_to_logs<R, F>(
1025    stream: R,
1026    log_path: &std::path::Path,
1027    full_log: std::sync::Arc<std::sync::Mutex<std::fs::File>>,
1028    label: &str,
1029    on_line: Option<F>,
1030) where
1031    R: std::io::Read,
1032    F: Fn(&str),
1033{
1034    use std::io::Write;
1035    let mut log_file = std::fs::File::create(log_path).expect("create stream log file in thread");
1036    let mut stream = stream;
1037    let mut buf = [0u8; 8192];
1038    // Incomplete-line buffer for full.log formatting.
1039    let mut line_buf: Vec<u8> = Vec::new();
1040    loop {
1041        match stream.read(&mut buf) {
1042            Ok(0) => break, // EOF
1043            Ok(n) => {
1044                let chunk = &buf[..n];
1045                // Write raw bytes to the individual log (captures partial lines too).
1046                let _ = log_file.write_all(chunk);
1047                // Accumulate bytes for full.log line formatting.
1048                for &b in chunk {
1049                    if b == b'\n' {
1050                        let line = String::from_utf8_lossy(&line_buf);
1051                        if let Ok(mut fl) = full_log.lock() {
1052                            let ts = now_rfc3339();
1053                            let _ = writeln!(fl, "{ts} [{label}] {line}");
1054                        }
1055                        if let Some(ref f) = on_line {
1056                            f(&line);
1057                        }
1058                        line_buf.clear();
1059                    } else {
1060                        line_buf.push(b);
1061                    }
1062                }
1063            }
1064            Err(_) => break,
1065        }
1066    }
1067    // Flush any remaining incomplete line to full.log and trigger callback.
1068    if !line_buf.is_empty() {
1069        let line = String::from_utf8_lossy(&line_buf);
1070        if let Ok(mut fl) = full_log.lock() {
1071            let ts = now_rfc3339();
1072            let _ = writeln!(fl, "{ts} [{label}] {line}");
1073        }
1074        if let Some(ref f) = on_line {
1075            f(&line);
1076        }
1077    }
1078}
1079
1080/// Internal supervisor sub-command.
1081///
1082/// Runs the target command, streams stdout/stderr to individual log files
1083/// (`stdout.log`, `stderr.log`) **and** to the combined `full.log`, then
1084/// updates `state.json` when the process finishes.
1085///
1086/// On Windows, the child process is assigned to a named Job Object so that
1087/// the entire process tree can be terminated with a single `kill` call.
1088/// The Job Object name is recorded in `state.json` as `windows_job_name`.
1089pub fn supervise(opts: SuperviseOpts) -> Result<()> {
1090    use std::sync::{Arc, Mutex};
1091
1092    let job_id = opts.job_id;
1093    let root = opts.root;
1094    let command = opts.command;
1095
1096    if command.is_empty() {
1097        anyhow::bail!("supervisor: no command");
1098    }
1099
1100    let job_dir = JobDir::open(root, job_id)?;
1101
1102    // Read meta.json for notification config and cwd (used in completion event).
1103    let meta = job_dir.read_meta()?;
1104    // Use the actual execution start time, not meta.created_at.
1105    // For `run`, these are nearly identical. For `start`, the job may have
1106    // been created long before it was started.
1107    let started_at = now_rfc3339();
1108
1109    // Determine full.log path.
1110    let full_log_path = if let Some(p) = opts.full_log {
1111        std::path::PathBuf::from(p)
1112    } else {
1113        job_dir.full_log_path()
1114    };
1115
1116    // Create the full.log file (shared between stdout/stderr threads).
1117    // Ensure parent directories exist for custom paths.
1118    if let Some(parent) = full_log_path.parent() {
1119        std::fs::create_dir_all(parent)
1120            .with_context(|| format!("create dir for full.log: {}", parent.display()))?;
1121    }
1122    let full_log_file = std::fs::File::create(&full_log_path).context("create full.log")?;
1123    let full_log = Arc::new(Mutex::new(full_log_file));
1124
1125    // Execute command through the shell wrapper.
1126    //
1127    // Two launch modes:
1128    //
1129    //   String mode (command.len() == 1):  The single element is a shell command
1130    //   string passed as-is to the wrapper (e.g. `"echo hello && ls"` preserves
1131    //   shell operators).  The wrapper process is the workload boundary.
1132    //
1133    //   Argv mode (command.len() > 1):  The wrapper is used for login-shell
1134    //   environment initialisation but immediately hands off to the target via
1135    //   `exec "$@"`.  The shell replaces itself so the observed child PID and
1136    //   lifecycle align with the intended workload, not the wrapper.
1137    //
1138    // --notify-command delivery always uses the wrapper in string mode
1139    // (see dispatch_command_sink); this change only affects job argv launches.
1140    if opts.shell_wrapper.is_empty() {
1141        anyhow::bail!("supervisor: shell wrapper must not be empty");
1142    }
1143    let mut child_cmd = Command::new(&opts.shell_wrapper[0]);
1144    if command.len() == 1 {
1145        // Shell-string mode: pass the command string to the wrapper as-is.
1146        child_cmd.args(&opts.shell_wrapper[1..]).arg(&command[0]);
1147    } else {
1148        // Argv mode: launch the workload via the shell wrapper.
1149        //
1150        // On Unix the wrapper hands off to the workload via `exec "$@"` so the
1151        // shell replaces itself and the observed PID / lifecycle align with the
1152        // intended workload, not the wrapper.
1153        //
1154        // On non-Unix platforms (Windows) there is no POSIX `exec`; the wrapper
1155        // is invoked in shell-string mode with the argv joined into a single
1156        // quoted command string, preserving the existing cmd/C semantics.
1157        #[cfg(unix)]
1158        {
1159            // `--` serves as $0; argv elements become $1..$n so `$@` expands
1160            // to the full workload argv.
1161            child_cmd
1162                .args(&opts.shell_wrapper[1..])
1163                .arg("exec \"$@\"")
1164                .arg("--")
1165                .args(command);
1166        }
1167        #[cfg(not(unix))]
1168        {
1169            // Windows fallback: join argv into a shell-compatible string and
1170            // pass it to the wrapper as a single command string (same as
1171            // shell-string mode), so cmd /C semantics are preserved.
1172            let joined = command
1173                .iter()
1174                .map(|a| {
1175                    if a.contains(' ') {
1176                        format!("\"{}\"", a)
1177                    } else {
1178                        a.clone()
1179                    }
1180                })
1181                .collect::<Vec<_>>()
1182                .join(" ");
1183            child_cmd.args(&opts.shell_wrapper[1..]).arg(joined);
1184        }
1185    }
1186
1187    if opts.inherit_env {
1188        // Start with the current environment (default).
1189    } else {
1190        child_cmd.env_clear();
1191    }
1192
1193    // Apply env files in order.
1194    for env_file in &opts.env_files {
1195        let vars = load_env_file(env_file)?;
1196        for (k, v) in vars {
1197            child_cmd.env(&k, &v);
1198        }
1199    }
1200
1201    // Apply --env KEY=VALUE overrides (applied after env-files).
1202    for env_var in &opts.env_vars {
1203        let (k, v) = parse_env_var(env_var);
1204        child_cmd.env(&k, &v);
1205    }
1206
1207    // Set working directory if specified.
1208    if let Some(cwd) = opts.cwd {
1209        child_cmd.current_dir(cwd);
1210    }
1211
1212    // Put the child in its own process group so that timeout signals
1213    // (SIGTERM / SIGKILL) reach the entire process tree, not just the
1214    // shell wrapper.  Without this, `sh -lc "sleep 60"` would absorb
1215    // the signal while the grandchild (`sleep`) keeps running.
1216    #[cfg(unix)]
1217    {
1218        use std::os::unix::process::CommandExt;
1219        // SAFETY: setsid is async-signal-safe and called before exec.
1220        unsafe {
1221            child_cmd.pre_exec(|| {
1222                libc::setsid();
1223                Ok(())
1224            });
1225        }
1226    }
1227
1228    // Spawn the child with piped stdout/stderr so we can tee to logs.
1229    let child_stdin = open_child_stdin(&job_dir, opts.stdin_file.as_deref())?;
1230    let mut child = child_cmd
1231        .stdin(child_stdin)
1232        .stdout(std::process::Stdio::piped())
1233        .stderr(std::process::Stdio::piped())
1234        .spawn()
1235        .context("supervisor: spawn child")?;
1236
1237    let pid = child.id();
1238    info!(job_id, pid, "child process started");
1239
1240    // On Windows, assign child to a named Job Object for process-tree management.
1241    // The job name is derived from the job_id so that `kill` can look it up.
1242    // Assignment is a MUST requirement on Windows: if it fails, the supervisor
1243    // kills the child process and updates state.json to "failed" before returning
1244    // an error, so that the run front-end (which may have already returned) can
1245    // detect the failure via state.json on next poll.
1246    #[cfg(windows)]
1247    let windows_job_name = {
1248        match assign_to_job_object(job_id, pid) {
1249            Ok(name) => Some(name),
1250            Err(e) => {
1251                // Job Object assignment failed. Per design.md this is a MUST
1252                // requirement on Windows. Kill the child process and update
1253                // state.json to "failed" so the run front-end can detect it.
1254                let kill_err = child.kill();
1255                let _ = child.wait(); // reap to avoid zombies
1256
1257                let failed_state = JobState {
1258                    job: JobStateJob {
1259                        id: job_id.to_string(),
1260                        status: JobStatus::Failed,
1261                        started_at: Some(started_at.clone()),
1262                    },
1263                    result: JobStateResult {
1264                        exit_code: None,
1265                        signal: None,
1266                        duration_ms: None,
1267                    },
1268                    pid: Some(pid),
1269                    finished_at: Some(now_rfc3339()),
1270                    updated_at: now_rfc3339(),
1271                    windows_job_name: None,
1272                };
1273                // Best-effort: if writing state fails, we still propagate the
1274                // original assignment error.
1275                let _ = job_dir.write_state(&failed_state);
1276
1277                // Dispatch completion event for the failed state if notifications are configured.
1278                // This mirrors the dispatch logic in the normal exit path so that callers
1279                // receive a job.finished event even when the supervisor fails early (Windows only).
1280                if opts.notify_command.is_some() || opts.notify_file.is_some() {
1281                    let finished_at_ts =
1282                        failed_state.finished_at.clone().unwrap_or_else(now_rfc3339);
1283                    let stdout_log = job_dir.stdout_path().display().to_string();
1284                    let stderr_log = job_dir.stderr_path().display().to_string();
1285                    let fail_event = crate::schema::CompletionEvent {
1286                        schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1287                        event_type: "job.finished".to_string(),
1288                        job_id: job_id.to_string(),
1289                        state: JobStatus::Failed.as_str().to_string(),
1290                        command: meta.command.clone(),
1291                        cwd: meta.cwd.clone(),
1292                        started_at: started_at.clone(),
1293                        finished_at: finished_at_ts,
1294                        duration_ms: None,
1295                        exit_code: None,
1296                        signal: None,
1297                        stdout_log_path: stdout_log,
1298                        stderr_log_path: stderr_log,
1299                    };
1300                    let fail_event_json = serde_json::to_string(&fail_event).unwrap_or_default();
1301                    let fail_event_path = job_dir.completion_event_path().display().to_string();
1302                    let mut fail_delivery_results: Vec<crate::schema::SinkDeliveryResult> =
1303                        Vec::new();
1304                    if let Err(we) = job_dir.write_completion_event_atomic(
1305                        &crate::schema::CompletionEventRecord {
1306                            event: fail_event.clone(),
1307                            delivery_results: vec![],
1308                        },
1309                    ) {
1310                        warn!(
1311                            job_id,
1312                            error = %we,
1313                            "failed to write initial completion_event.json for failed job"
1314                        );
1315                    }
1316                    if let Some(ref shell_cmd) = opts.notify_command {
1317                        fail_delivery_results.push(dispatch_command_sink(
1318                            shell_cmd,
1319                            &fail_event_json,
1320                            job_id,
1321                            &fail_event_path,
1322                            &opts.shell_wrapper,
1323                            "job.finished",
1324                        ));
1325                    }
1326                    if let Some(ref file_path) = opts.notify_file {
1327                        fail_delivery_results.push(dispatch_file_sink(file_path, &fail_event_json));
1328                    }
1329                    if let Err(we) = job_dir.write_completion_event_atomic(
1330                        &crate::schema::CompletionEventRecord {
1331                            event: fail_event,
1332                            delivery_results: fail_delivery_results,
1333                        },
1334                    ) {
1335                        warn!(
1336                            job_id,
1337                            error = %we,
1338                            "failed to update completion_event.json with delivery results for failed job"
1339                        );
1340                    }
1341                }
1342
1343                if let Err(ke) = kill_err {
1344                    return Err(anyhow::anyhow!(
1345                        "supervisor: failed to assign pid {pid} to Job Object \
1346                         (Windows MUST requirement): {e}; also failed to kill child: {ke}"
1347                    ));
1348                }
1349                return Err(anyhow::anyhow!(
1350                    "supervisor: failed to assign pid {pid} to Job Object \
1351                     (Windows MUST requirement); child process was killed; \
1352                     consider running outside a nested Job Object environment: {e}"
1353                ));
1354            }
1355        }
1356    };
1357    #[cfg(not(windows))]
1358    let windows_job_name: Option<String> = None;
1359
1360    // Update state.json with real child PID and Windows Job Object name.
1361    // On Windows, windows_job_name is always Some at this point (guaranteed
1362    // by the MUST requirement above), so state.json will always contain the
1363    // Job Object identifier while the job is running.
1364    let state = JobState {
1365        job: JobStateJob {
1366            id: job_id.to_string(),
1367            status: JobStatus::Running,
1368            started_at: Some(started_at.clone()),
1369        },
1370        result: JobStateResult {
1371            exit_code: None,
1372            signal: None,
1373            duration_ms: None,
1374        },
1375        pid: Some(pid),
1376        finished_at: None,
1377        updated_at: now_rfc3339(),
1378        windows_job_name,
1379    };
1380    job_dir.write_state(&state)?;
1381
1382    let child_start_time = std::time::Instant::now();
1383
1384    // Take stdout/stderr handles before moving child.
1385    let child_stdout = child.stdout.take().expect("child stdout piped");
1386    let child_stderr = child.stderr.take().expect("child stderr piped");
1387
1388    // Create shared output-match checker from the initial meta notification config.
1389    let match_checker = std::sync::Arc::new(OutputMatchChecker::new(
1390        job_dir.path.clone(),
1391        opts.shell_wrapper.clone(),
1392        meta.notification.clone(),
1393    ));
1394
1395    // Completion channels for log threads: each thread sends `()` after stream_to_logs returns.
1396    // Used for bounded joins below (allows supervisor to exit promptly when descendants
1397    // hold inherited pipe ends open indefinitely).
1398    let (tx_stdout_done, rx_stdout_done) = std::sync::mpsc::channel::<()>();
1399    let (tx_stderr_done, rx_stderr_done) = std::sync::mpsc::channel::<()>();
1400
1401    // Thread: read stdout, write to stdout.log and full.log.
1402    let stdout_log_path = job_dir.stdout_path();
1403    let full_log_stdout = Arc::clone(&full_log);
1404    let match_checker_stdout = std::sync::Arc::clone(&match_checker);
1405    let t_stdout = std::thread::spawn(move || {
1406        stream_to_logs(
1407            child_stdout,
1408            &stdout_log_path,
1409            full_log_stdout,
1410            "STDOUT",
1411            Some(move |line: &str| match_checker_stdout.check_line(line, "stdout")),
1412        );
1413        let _ = tx_stdout_done.send(());
1414    });
1415
1416    // Thread: read stderr, write to stderr.log and full.log.
1417    let stderr_log_path = job_dir.stderr_path();
1418    let full_log_stderr = Arc::clone(&full_log);
1419    let match_checker_stderr = std::sync::Arc::clone(&match_checker);
1420    let t_stderr = std::thread::spawn(move || {
1421        stream_to_logs(
1422            child_stderr,
1423            &stderr_log_path,
1424            full_log_stderr,
1425            "STDERR",
1426            Some(move |line: &str| match_checker_stderr.check_line(line, "stderr")),
1427        );
1428        let _ = tx_stderr_done.send(());
1429    });
1430
1431    // Timeout / kill-after / progress-every handling.
1432    // We spawn a watcher thread to handle timeout and periodic state.json updates.
1433    let timeout_ms = opts.timeout_ms;
1434    let kill_after_ms = opts.kill_after_ms;
1435    let progress_every_ms = opts.progress_every_ms;
1436    let state_path = job_dir.state_path();
1437    let job_id_str = job_id.to_string();
1438
1439    // Use an atomic flag to signal the watcher thread when the child has exited.
1440    use std::sync::atomic::{AtomicBool, Ordering};
1441    let child_done = Arc::new(AtomicBool::new(false));
1442
1443    let watcher = if timeout_ms > 0 || progress_every_ms > 0 {
1444        let state_path_clone = state_path.clone();
1445        let child_done_clone = Arc::clone(&child_done);
1446        Some(std::thread::spawn(move || {
1447            let start = std::time::Instant::now();
1448            let timeout_dur = if timeout_ms > 0 {
1449                Some(std::time::Duration::from_millis(timeout_ms))
1450            } else {
1451                None
1452            };
1453            let progress_dur = if progress_every_ms > 0 {
1454                Some(std::time::Duration::from_millis(progress_every_ms))
1455            } else {
1456                None
1457            };
1458
1459            let poll_interval = std::time::Duration::from_millis(100);
1460
1461            loop {
1462                std::thread::sleep(poll_interval);
1463
1464                // Exit the watcher loop if the child process has finished.
1465                if child_done_clone.load(Ordering::Relaxed) {
1466                    break;
1467                }
1468
1469                let elapsed = start.elapsed();
1470
1471                // Check for timeout.
1472                if let Some(td) = timeout_dur
1473                    && elapsed >= td
1474                {
1475                    info!(job_id = %job_id_str, "timeout reached, sending SIGTERM to process group");
1476                    // Send SIGTERM to the entire process group (negative PID).
1477                    // The child was placed in its own session/group via setsid.
1478                    #[cfg(unix)]
1479                    {
1480                        unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGTERM) };
1481                    }
1482                    // If kill_after > 0, wait kill_after ms then SIGKILL.
1483                    if kill_after_ms > 0 {
1484                        std::thread::sleep(std::time::Duration::from_millis(kill_after_ms));
1485                        info!(job_id = %job_id_str, "kill-after elapsed, sending SIGKILL to process group");
1486                        #[cfg(unix)]
1487                        {
1488                            unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1489                        }
1490                    } else {
1491                        // Immediate SIGKILL to the process group.
1492                        #[cfg(unix)]
1493                        {
1494                            unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1495                        }
1496                    }
1497                    break;
1498                }
1499
1500                // Progress-every: update updated_at periodically.
1501                if let Some(pd) = progress_dur {
1502                    let elapsed_ms = elapsed.as_millis() as u64;
1503                    let pd_ms = pd.as_millis() as u64;
1504                    let poll_ms = poll_interval.as_millis() as u64;
1505                    if elapsed_ms % pd_ms < poll_ms {
1506                        // Read, update updated_at, write back.
1507                        if let Ok(raw) = std::fs::read(&state_path_clone)
1508                            && let Ok(mut st) =
1509                                serde_json::from_slice::<crate::schema::JobState>(&raw)
1510                        {
1511                            st.updated_at = now_rfc3339();
1512                            if let Ok(s) = serde_json::to_string_pretty(&st) {
1513                                let _ = std::fs::write(&state_path_clone, s);
1514                            }
1515                        }
1516                    }
1517                }
1518            }
1519        }))
1520    } else {
1521        None
1522    };
1523
1524    // Wait for child to finish.
1525    let exit_status = child.wait().context("wait for child")?;
1526
1527    // Signal the watcher that the child has finished so it can exit its loop.
1528    child_done.store(true, Ordering::Relaxed);
1529
1530    // Persist terminal state immediately after the wrapped root process exits.
1531    // This must happen BEFORE joining log threads, because log threads block on
1532    // EOF of stdout/stderr pipes and descendant processes that inherited those
1533    // pipes may keep them open indefinitely. Persisting state here ensures that
1534    // `status` and `wait` can observe the terminal state without waiting for
1535    // all descendants to close their inherited handles.
1536    let duration_ms = child_start_time.elapsed().as_millis() as u64;
1537    let exit_code = exit_status.code();
1538    let finished_at = now_rfc3339();
1539
1540    // Detect signal-killed processes on Unix for accurate state and completion event.
1541    #[cfg(unix)]
1542    let (terminal_status, signal_name) = {
1543        use std::os::unix::process::ExitStatusExt;
1544        if let Some(sig) = exit_status.signal() {
1545            (JobStatus::Killed, Some(sig.to_string()))
1546        } else {
1547            (JobStatus::Exited, None)
1548        }
1549    };
1550    #[cfg(not(unix))]
1551    let (terminal_status, signal_name) = (JobStatus::Exited, None::<String>);
1552
1553    let state = JobState {
1554        job: JobStateJob {
1555            id: job_id.to_string(),
1556            status: terminal_status.clone(),
1557            started_at: Some(started_at.clone()),
1558        },
1559        result: JobStateResult {
1560            exit_code,
1561            signal: signal_name.clone(),
1562            duration_ms: Some(duration_ms),
1563        },
1564        pid: Some(pid),
1565        finished_at: Some(finished_at.clone()),
1566        updated_at: now_rfc3339(),
1567        windows_job_name: None, // not needed after process exits
1568    };
1569    job_dir.write_state(&state)?;
1570    info!(job_id, ?exit_code, "child process finished");
1571
1572    // Bounded join for log-reader threads.
1573    //
1574    // After the wrapped root process exits, we give log threads a short window
1575    // to drain any remaining output and fire output-match callbacks (which is
1576    // the common case: no descendants, pipe write-end closes promptly).
1577    //
1578    // If a thread does not complete within the window, it is detached (dropped).
1579    // Detaching means the thread will be killed when the supervisor process exits,
1580    // which is the correct trade-off: supervisor must not linger indefinitely
1581    // because a descendant holds an inherited pipe write-end open.
1582    const LOG_DRAIN_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000);
1583    let drain_deadline = std::time::Instant::now() + LOG_DRAIN_TIMEOUT;
1584
1585    let remaining = drain_deadline
1586        .checked_duration_since(std::time::Instant::now())
1587        .unwrap_or(std::time::Duration::ZERO);
1588    if rx_stdout_done.recv_timeout(remaining).is_ok() {
1589        let _ = t_stdout.join();
1590    } else {
1591        drop(t_stdout); // detach: descendant holds the pipe open
1592    }
1593
1594    let remaining = drain_deadline
1595        .checked_duration_since(std::time::Instant::now())
1596        .unwrap_or(std::time::Duration::ZERO);
1597    if rx_stderr_done.recv_timeout(remaining).is_ok() {
1598        let _ = t_stderr.join();
1599    } else {
1600        drop(t_stderr); // detach: descendant holds the pipe open
1601    }
1602
1603    // Join watcher if present; it exits promptly once child_done is set.
1604    if let Some(w) = watcher {
1605        let _ = w.join();
1606    }
1607
1608    // Reload the latest notification config from meta.json to pick up any post-creation
1609    // updates (e.g. from `notify set` invoked after the job was launched).
1610    let latest_notification = job_dir.read_meta().ok().and_then(|m| m.notification);
1611    let (current_notify_command, current_notify_file) = match &latest_notification {
1612        Some(n) => (n.notify_command.clone(), n.notify_file.clone()),
1613        None => (None, None),
1614    };
1615
1616    // Dispatch completion event to configured notification sinks.
1617    // Failure here must not alter job state (delivery result is recorded separately).
1618    let has_notification = current_notify_command.is_some() || current_notify_file.is_some();
1619    if has_notification {
1620        let stdout_log = job_dir.stdout_path().display().to_string();
1621        let stderr_log = job_dir.stderr_path().display().to_string();
1622        let event = crate::schema::CompletionEvent {
1623            schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1624            event_type: "job.finished".to_string(),
1625            job_id: job_id.to_string(),
1626            state: terminal_status.as_str().to_string(),
1627            command: meta.command.clone(),
1628            cwd: meta.cwd.clone(),
1629            started_at,
1630            finished_at,
1631            duration_ms: Some(duration_ms),
1632            exit_code,
1633            signal: signal_name,
1634            stdout_log_path: stdout_log,
1635            stderr_log_path: stderr_log,
1636        };
1637
1638        let event_json = serde_json::to_string(&event).unwrap_or_default();
1639        let event_path = job_dir.completion_event_path().display().to_string();
1640        let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
1641
1642        // Write initial completion_event.json before dispatching sinks.
1643        if let Err(e) =
1644            job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1645                event: event.clone(),
1646                delivery_results: vec![],
1647            })
1648        {
1649            warn!(job_id, error = %e, "failed to write initial completion_event.json");
1650        }
1651
1652        if let Some(ref shell_cmd) = current_notify_command {
1653            delivery_results.push(dispatch_command_sink(
1654                shell_cmd,
1655                &event_json,
1656                job_id,
1657                &event_path,
1658                &opts.shell_wrapper,
1659                "job.finished",
1660            ));
1661        }
1662        if let Some(ref file_path) = current_notify_file {
1663            delivery_results.push(dispatch_file_sink(file_path, &event_json));
1664        }
1665
1666        // Update completion_event.json with delivery results.
1667        if let Err(e) =
1668            job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1669                event,
1670                delivery_results,
1671            })
1672        {
1673            warn!(job_id, error = %e, "failed to update completion_event.json with delivery results");
1674        }
1675    }
1676
1677    Ok(())
1678}
1679
1680/// Dispatch the command sink: execute the shell command string via the configured shell wrapper,
1681/// pass event JSON via stdin, and set AGENT_EXEC_EVENT_PATH / AGENT_EXEC_JOB_ID /
1682/// AGENT_EXEC_EVENT_TYPE env vars.
1683///
1684/// The shell wrapper argv (e.g. `["sh", "-lc"]`) is provided by the caller.
1685/// The command string is appended as the final argument to the wrapper.
1686fn dispatch_command_sink(
1687    shell_cmd: &str,
1688    event_json: &str,
1689    job_id: &str,
1690    event_path: &str,
1691    shell_wrapper: &[String],
1692    event_type: &str,
1693) -> crate::schema::SinkDeliveryResult {
1694    use std::io::Write;
1695    let attempted_at = now_rfc3339();
1696    let target = shell_cmd.to_string();
1697
1698    if shell_cmd.trim().is_empty() {
1699        return crate::schema::SinkDeliveryResult {
1700            sink_type: "command".to_string(),
1701            target,
1702            success: false,
1703            error: Some("empty shell command".to_string()),
1704            attempted_at,
1705        };
1706    }
1707
1708    if shell_wrapper.is_empty() {
1709        return crate::schema::SinkDeliveryResult {
1710            sink_type: "command".to_string(),
1711            target,
1712            success: false,
1713            error: Some("shell wrapper must not be empty".to_string()),
1714            attempted_at,
1715        };
1716    }
1717
1718    let mut cmd = Command::new(&shell_wrapper[0]);
1719    cmd.args(&shell_wrapper[1..]).arg(shell_cmd);
1720
1721    cmd.env("AGENT_EXEC_EVENT_PATH", event_path);
1722    cmd.env("AGENT_EXEC_JOB_ID", job_id);
1723    cmd.env("AGENT_EXEC_EVENT_TYPE", event_type);
1724    cmd.stdin(std::process::Stdio::piped());
1725    cmd.stdout(std::process::Stdio::null());
1726    cmd.stderr(std::process::Stdio::null());
1727
1728    match cmd.spawn() {
1729        Ok(mut child) => {
1730            if let Some(mut stdin) = child.stdin.take() {
1731                let _ = stdin.write_all(event_json.as_bytes());
1732            }
1733            match child.wait() {
1734                Ok(status) if status.success() => crate::schema::SinkDeliveryResult {
1735                    sink_type: "command".to_string(),
1736                    target,
1737                    success: true,
1738                    error: None,
1739                    attempted_at,
1740                },
1741                Ok(status) => crate::schema::SinkDeliveryResult {
1742                    sink_type: "command".to_string(),
1743                    target,
1744                    success: false,
1745                    error: Some(format!("exited with status {status}")),
1746                    attempted_at,
1747                },
1748                Err(e) => crate::schema::SinkDeliveryResult {
1749                    sink_type: "command".to_string(),
1750                    target,
1751                    success: false,
1752                    error: Some(format!("wait error: {e}")),
1753                    attempted_at,
1754                },
1755            }
1756        }
1757        Err(e) => crate::schema::SinkDeliveryResult {
1758            sink_type: "command".to_string(),
1759            target,
1760            success: false,
1761            error: Some(format!("spawn error: {e}")),
1762            attempted_at,
1763        },
1764    }
1765}
1766
1767/// Dispatch the file sink: append event JSON as a single NDJSON line.
1768/// Creates parent directories automatically.
1769fn dispatch_file_sink(file_path: &str, event_json: &str) -> crate::schema::SinkDeliveryResult {
1770    use std::io::Write;
1771    let attempted_at = now_rfc3339();
1772    let path = std::path::Path::new(file_path);
1773
1774    if let Some(parent) = path.parent()
1775        && let Err(e) = std::fs::create_dir_all(parent)
1776    {
1777        return crate::schema::SinkDeliveryResult {
1778            sink_type: "file".to_string(),
1779            target: file_path.to_string(),
1780            success: false,
1781            error: Some(format!("create parent dir: {e}")),
1782            attempted_at,
1783        };
1784    }
1785
1786    match std::fs::OpenOptions::new()
1787        .create(true)
1788        .append(true)
1789        .open(path)
1790    {
1791        Ok(mut f) => match writeln!(f, "{event_json}") {
1792            Ok(_) => crate::schema::SinkDeliveryResult {
1793                sink_type: "file".to_string(),
1794                target: file_path.to_string(),
1795                success: true,
1796                error: None,
1797                attempted_at,
1798            },
1799            Err(e) => crate::schema::SinkDeliveryResult {
1800                sink_type: "file".to_string(),
1801                target: file_path.to_string(),
1802                success: false,
1803                error: Some(format!("write error: {e}")),
1804                attempted_at,
1805            },
1806        },
1807        Err(e) => crate::schema::SinkDeliveryResult {
1808            sink_type: "file".to_string(),
1809            target: file_path.to_string(),
1810            success: false,
1811            error: Some(format!("open error: {e}")),
1812            attempted_at,
1813        },
1814    }
1815}
1816
1817/// Public alias so other modules can call the timestamp helper.
1818pub fn now_rfc3339_pub() -> String {
1819    now_rfc3339()
1820}
1821
1822fn now_rfc3339() -> String {
1823    // Use a simple approach that works without chrono.
1824    let d = std::time::SystemTime::now()
1825        .duration_since(std::time::UNIX_EPOCH)
1826        .unwrap_or_default();
1827    format_rfc3339(d.as_secs())
1828}
1829
1830fn format_rfc3339(secs: u64) -> String {
1831    // Manual conversion of Unix timestamp to UTC date-time string.
1832    let mut s = secs;
1833    let seconds = s % 60;
1834    s /= 60;
1835    let minutes = s % 60;
1836    s /= 60;
1837    let hours = s % 24;
1838    s /= 24;
1839
1840    // Days since 1970-01-01
1841    let mut days = s;
1842    let mut year = 1970u64;
1843    loop {
1844        let days_in_year = if is_leap(year) { 366 } else { 365 };
1845        if days < days_in_year {
1846            break;
1847        }
1848        days -= days_in_year;
1849        year += 1;
1850    }
1851
1852    let leap = is_leap(year);
1853    let month_days: [u64; 12] = [
1854        31,
1855        if leap { 29 } else { 28 },
1856        31,
1857        30,
1858        31,
1859        30,
1860        31,
1861        31,
1862        30,
1863        31,
1864        30,
1865        31,
1866    ];
1867    let mut month = 0usize;
1868    for (i, &d) in month_days.iter().enumerate() {
1869        if days < d {
1870            month = i;
1871            break;
1872        }
1873        days -= d;
1874    }
1875    let day = days + 1;
1876
1877    format!(
1878        "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
1879        year,
1880        month + 1,
1881        day,
1882        hours,
1883        minutes,
1884        seconds
1885    )
1886}
1887
1888fn is_leap(year: u64) -> bool {
1889    (year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
1890}
1891
1892/// Windows-only: create a named Job Object and assign the given child process
1893/// to it so that the entire process tree can be terminated via `kill`.
1894///
1895/// The Job Object is named `"AgentExec-{job_id}"`. This name is stored in
1896/// `state.json` so that future `kill` invocations can open the same Job Object
1897/// by name and call `TerminateJobObject` to stop the whole tree.
1898///
1899/// Returns `Ok(name)` on success.  Returns `Err` on failure — the caller
1900/// (`supervise`) treats failure as a fatal error because reliable process-tree
1901/// management is a Windows MUST requirement (design.md).
1902#[cfg(windows)]
1903fn assign_to_job_object(job_id: &str, pid: u32) -> Result<String> {
1904    use windows::Win32::Foundation::CloseHandle;
1905    use windows::Win32::System::JobObjects::{AssignProcessToJobObject, CreateJobObjectW};
1906    use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
1907    use windows::core::HSTRING;
1908
1909    let job_name = format!("AgentExec-{job_id}");
1910    let hname = HSTRING::from(job_name.as_str());
1911
1912    unsafe {
1913        // Open the child process handle (needed for AssignProcessToJobObject).
1914        let proc_handle =
1915            OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid).map_err(|e| {
1916                anyhow::anyhow!(
1917                    "supervisor: OpenProcess(pid={pid}) failed — cannot assign to Job Object: {e}"
1918                )
1919            })?;
1920
1921        // Create a named Job Object.
1922        let job = match CreateJobObjectW(None, &hname) {
1923            Ok(h) => h,
1924            Err(e) => {
1925                let _ = CloseHandle(proc_handle);
1926                return Err(anyhow::anyhow!(
1927                    "supervisor: CreateJobObjectW({job_name}) failed: {e}"
1928                ));
1929            }
1930        };
1931
1932        // Assign the child process to the Job Object.
1933        // This can fail if the process is already in another job (e.g. CI/nested).
1934        // Per design.md, assignment is a MUST on Windows — failure is a fatal error.
1935        if let Err(e) = AssignProcessToJobObject(job, proc_handle) {
1936            let _ = CloseHandle(job);
1937            let _ = CloseHandle(proc_handle);
1938            return Err(anyhow::anyhow!(
1939                "supervisor: AssignProcessToJobObject(pid={pid}) failed \
1940                 (process may already belong to another Job Object, e.g. in a CI environment): {e}"
1941            ));
1942        }
1943
1944        // Keep job handle open for the lifetime of the supervisor so the Job
1945        // Object remains valid. We intentionally do NOT close it here.
1946        // The OS will close it automatically when the supervisor exits.
1947        // (We close proc_handle since we only needed it for assignment.)
1948        let _ = CloseHandle(proc_handle);
1949        // Note: job handle is intentionally leaked here to keep the Job Object alive.
1950        // The handle will be closed when the supervisor process exits.
1951        std::mem::forget(job);
1952    }
1953
1954    info!(job_id, name = %job_name, "supervisor: child assigned to Job Object");
1955    Ok(job_name)
1956}
1957
1958#[cfg(test)]
1959mod tests {
1960    use super::*;
1961
1962    #[test]
1963    fn rfc3339_epoch() {
1964        assert_eq!(format_rfc3339(0), "1970-01-01T00:00:00Z");
1965    }
1966
1967    #[test]
1968    fn rfc3339_known_date() {
1969        // 2024-01-01T00:00:00Z = 1704067200
1970        assert_eq!(format_rfc3339(1704067200), "2024-01-01T00:00:00Z");
1971    }
1972}