Skip to main content

agent_exec/
run.rs

1//! Implementation of the `run` sub-command.
2//!
3//! Design decisions (from design.md):
4//! - `run` spawns a short-lived front-end that forks a `_supervise` child.
5//! - The supervisor continues logging stdout/stderr after `run` returns.
6//! - If `--snapshot-after` elapses before `run` returns, a snapshot is
7//!   included in the JSON response.
8
9use anyhow::{Context, Result};
10use std::path::Path;
11use std::process::Command;
12use tracing::{debug, info, warn};
13use ulid::Ulid;
14
15use crate::jobstore::{JobDir, resolve_root};
16use crate::schema::{
17    JobMeta, JobMetaJob, JobState, JobStateJob, JobStateResult, JobStatus, Response, RunData,
18    Snapshot,
19};
20
21/// Options for the `run` sub-command.
22#[derive(Debug)]
23pub struct RunOpts<'a> {
24    /// Command and arguments to execute.
25    pub command: Vec<String>,
26    /// Override for jobs root directory.
27    pub root: Option<&'a str>,
28    /// Milliseconds to wait before returning; 0 = return immediately.
29    pub snapshot_after: u64,
30    /// Number of tail lines to include in snapshot.
31    pub tail_lines: u64,
32    /// Max bytes for tail.
33    pub max_bytes: u64,
34    /// Timeout in milliseconds; 0 = no timeout.
35    pub timeout_ms: u64,
36    /// Milliseconds after SIGTERM before SIGKILL; 0 = immediate SIGKILL.
37    pub kill_after_ms: u64,
38    /// Working directory for the command.
39    pub cwd: Option<&'a str>,
40    /// Environment variables as KEY=VALUE strings.
41    pub env_vars: Vec<String>,
42    /// Paths to env files, applied in order.
43    pub env_files: Vec<String>,
44    /// Whether to inherit the current process environment (default: true).
45    pub inherit_env: bool,
46    /// Keys to mask in JSON output (values replaced with "***").
47    pub mask: Vec<String>,
48    /// Override full.log path; None = use job dir.
49    pub log: Option<&'a str>,
50    /// Interval (ms) for state.json updated_at refresh; 0 = disabled.
51    pub progress_every_ms: u64,
52    /// If true, wait for the job to reach a terminal state before returning.
53    /// The response will include exit_code, finished_at, and final_snapshot.
54    pub wait: bool,
55    /// Poll interval in milliseconds when `wait` is true.
56    pub wait_poll_ms: u64,
57    /// Shell command string for command notification sink; executed via platform shell.
58    /// None = no command sink.
59    pub notify_command: Option<String>,
60    /// File path for NDJSON notification sink; None = no file sink.
61    pub notify_file: Option<String>,
62    /// Resolved shell wrapper argv used to execute command strings.
63    /// e.g. `["sh", "-lc"]` or `["bash", "-lc"]`.
64    pub shell_wrapper: Vec<String>,
65}
66
67impl<'a> Default for RunOpts<'a> {
68    fn default() -> Self {
69        RunOpts {
70            command: vec![],
71            root: None,
72            snapshot_after: 10_000,
73            tail_lines: 50,
74            max_bytes: 65536,
75            timeout_ms: 0,
76            kill_after_ms: 0,
77            cwd: None,
78            env_vars: vec![],
79            env_files: vec![],
80            inherit_env: true,
81            mask: vec![],
82            log: None,
83            progress_every_ms: 0,
84            wait: false,
85            wait_poll_ms: 200,
86            notify_command: None,
87            notify_file: None,
88            shell_wrapper: crate::config::default_shell_wrapper(),
89        }
90    }
91}
92
93/// Maximum allowed value for `snapshot_after` in milliseconds (10 seconds).
94const MAX_SNAPSHOT_AFTER_MS: u64 = 10_000;
95
96/// Execute `run`: spawn job, possibly wait for snapshot, return JSON.
97pub fn execute(opts: RunOpts) -> Result<()> {
98    if opts.command.is_empty() {
99        anyhow::bail!("no command specified for run");
100    }
101
102    let elapsed_start = std::time::Instant::now();
103
104    let root = resolve_root(opts.root);
105    std::fs::create_dir_all(&root)
106        .with_context(|| format!("create jobs root {}", root.display()))?;
107
108    let job_id = Ulid::new().to_string();
109    let created_at = now_rfc3339();
110
111    // Extract only the key names from KEY=VALUE env var strings (values are not persisted).
112    let env_keys: Vec<String> = opts
113        .env_vars
114        .iter()
115        .map(|kv| kv.split('=').next().unwrap_or(kv.as_str()).to_string())
116        .collect();
117
118    // Apply masking: replace values of masked keys with "***" in env_vars for metadata.
119    let masked_env_vars = mask_env_vars(&opts.env_vars, &opts.mask);
120
121    // Resolve the effective working directory for this job.
122    // If --cwd was specified, use that path; otherwise use the current process's working directory.
123    // Canonicalize the path for consistent comparison; fall back to absolute path on failure.
124    let effective_cwd = resolve_effective_cwd(opts.cwd);
125
126    let notification = if opts.notify_command.is_some() || opts.notify_file.is_some() {
127        Some(crate::schema::NotificationConfig {
128            notify_command: opts.notify_command.clone(),
129            notify_file: opts.notify_file.clone(),
130        })
131    } else {
132        None
133    };
134
135    let meta = JobMeta {
136        job: JobMetaJob { id: job_id.clone() },
137        schema_version: crate::schema::SCHEMA_VERSION.to_string(),
138        command: opts.command.clone(),
139        created_at: created_at.clone(),
140        root: root.display().to_string(),
141        env_keys,
142        env_vars: masked_env_vars.clone(),
143        mask: opts.mask.clone(),
144        cwd: Some(effective_cwd),
145        notification,
146    };
147
148    let job_dir = JobDir::create(&root, &job_id, &meta)?;
149    info!(job_id = %job_id, "created job directory");
150
151    // Determine the full.log path (may be overridden by --log).
152    let full_log_path = if let Some(log) = opts.log {
153        log.to_string()
154    } else {
155        job_dir.full_log_path().display().to_string()
156    };
157
158    // Pre-create empty log files so they exist before the supervisor starts.
159    // This guarantees that `stdout.log`, `stderr.log`, and `full.log` are
160    // present immediately after `run` returns, even if the supervisor has
161    // not yet begun writing.
162    for log_path in [
163        job_dir.stdout_path(),
164        job_dir.stderr_path(),
165        job_dir.full_log_path(),
166    ] {
167        std::fs::OpenOptions::new()
168            .create(true)
169            .append(true)
170            .open(&log_path)
171            .with_context(|| format!("pre-create log file {}", log_path.display()))?;
172    }
173
174    // Spawn the supervisor (same binary, internal `_supervise` sub-command).
175    let exe = std::env::current_exe().context("resolve current exe")?;
176    let mut supervisor_cmd = Command::new(&exe);
177    supervisor_cmd
178        .arg("_supervise")
179        .arg("--job-id")
180        .arg(&job_id)
181        .arg("--root")
182        .arg(root.display().to_string())
183        .arg("--full-log")
184        .arg(&full_log_path);
185
186    if opts.timeout_ms > 0 {
187        supervisor_cmd
188            .arg("--timeout")
189            .arg(opts.timeout_ms.to_string());
190    }
191    if opts.kill_after_ms > 0 {
192        supervisor_cmd
193            .arg("--kill-after")
194            .arg(opts.kill_after_ms.to_string());
195    }
196    if let Some(cwd) = opts.cwd {
197        supervisor_cmd.arg("--cwd").arg(cwd);
198    }
199    for env_file in &opts.env_files {
200        supervisor_cmd.arg("--env-file").arg(env_file);
201    }
202    for env_var in &opts.env_vars {
203        supervisor_cmd.arg("--env").arg(env_var);
204    }
205    if !opts.inherit_env {
206        supervisor_cmd.arg("--no-inherit-env");
207    }
208    // Note: masking is handled by `run` (meta.json + JSON response). The supervisor
209    // receives the real env var values so the child process can use them as intended.
210    if opts.progress_every_ms > 0 {
211        supervisor_cmd
212            .arg("--progress-every")
213            .arg(opts.progress_every_ms.to_string());
214    }
215    if let Some(ref nc) = opts.notify_command {
216        supervisor_cmd.arg("--notify-command").arg(nc);
217    }
218    if let Some(ref nf) = opts.notify_file {
219        supervisor_cmd.arg("--notify-file").arg(nf);
220    }
221    // Pass the resolved shell wrapper to the supervisor as a JSON array to
222    // preserve argv fidelity (no join/split round-trip).
223    let wrapper_json = serde_json::to_string(&opts.shell_wrapper)
224        .context("serialize shell wrapper")?;
225    supervisor_cmd
226        .arg("--shell-wrapper-resolved")
227        .arg(&wrapper_json);
228
229    supervisor_cmd
230        .arg("--")
231        .args(&opts.command)
232        .stdin(std::process::Stdio::null())
233        .stdout(std::process::Stdio::null())
234        .stderr(std::process::Stdio::null());
235
236    let supervisor = supervisor_cmd.spawn().context("spawn supervisor")?;
237
238    let supervisor_pid = supervisor.id();
239    debug!(supervisor_pid, "supervisor spawned");
240
241    // Write initial state with supervisor PID so `status` can track it.
242    // On Windows, this also pre-records the deterministic Job Object name
243    // (AgentExec-{job_id}) so that callers can find it immediately after run returns.
244    job_dir.init_state(supervisor_pid, &created_at)?;
245
246    // On Windows, poll state.json until the supervisor confirms Job Object
247    // assignment (state pid changes to child pid or state changes to "failed").
248    // This handshake ensures `run` can detect Job Object assignment failures
249    // before returning.  We wait up to 5 seconds (500 × 10ms intervals).
250    #[cfg(windows)]
251    {
252        let handshake_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
253        loop {
254            std::thread::sleep(std::time::Duration::from_millis(10));
255            if let Ok(current_state) = job_dir.read_state() {
256                // Supervisor has updated state once the pid changes from
257                // supervisor_pid to the child pid (success), or state becomes
258                // "failed" (Job Object assignment error).
259                let supervisor_updated = current_state
260                    .pid
261                    .map(|p| p != supervisor_pid)
262                    .unwrap_or(false)
263                    || *current_state.status() == JobStatus::Failed;
264                if supervisor_updated {
265                    if *current_state.status() == JobStatus::Failed {
266                        // Supervisor failed to assign the child to a Job Object
267                        // and has already killed the child and updated state.json.
268                        // Report failure to the caller.
269                        anyhow::bail!(
270                            "supervisor failed to assign child process to Job Object \
271                             (Windows MUST requirement); see stderr for details"
272                        );
273                    }
274                    debug!("supervisor confirmed Job Object assignment via state.json handshake");
275                    break;
276                }
277            }
278            if std::time::Instant::now() >= handshake_deadline {
279                // Supervisor did not confirm within 5 seconds. Proceed with
280                // the initial state (deterministic job name already written).
281                debug!("supervisor handshake timed out; proceeding with initial state");
282                break;
283            }
284        }
285    }
286
287    // Compute absolute paths for stdout.log and stderr.log.
288    let stdout_log_path = job_dir.stdout_path().display().to_string();
289    let stderr_log_path = job_dir.stderr_path().display().to_string();
290
291    // Clamp snapshot_after to MAX_SNAPSHOT_AFTER_MS, but only when --wait is NOT set.
292    // When --wait is set, we skip the snapshot_after phase entirely (the final_snapshot
293    // from the --wait phase replaces it), so the clamp is irrelevant.
294    let effective_snapshot_after = if opts.wait {
295        // Skip the snapshot_after wait when --wait is active; the terminal-state
296        // poll below will produce the definitive final_snapshot.
297        0
298    } else {
299        opts.snapshot_after.min(MAX_SNAPSHOT_AFTER_MS)
300    };
301
302    // Start a single wait_start timer that spans both the snapshot_after phase
303    // and the optional --wait phase so waited_ms reflects total wait time.
304    let wait_start = std::time::Instant::now();
305
306    // Optionally wait for snapshot and measure waited_ms.
307    // Uses a polling loop so we can exit early when output is available
308    // or the job has finished, rather than always sleeping the full duration.
309    let snapshot = if effective_snapshot_after > 0 {
310        debug!(ms = effective_snapshot_after, "polling for snapshot");
311        let deadline = wait_start + std::time::Duration::from_millis(effective_snapshot_after);
312        // Poll interval: 15ms gives good responsiveness without excessive CPU.
313        let poll_interval = std::time::Duration::from_millis(15);
314        loop {
315            std::thread::sleep(poll_interval);
316            // Early exit: job state changed from running (finished or failed).
317            // Output availability alone does NOT cause early exit; we always
318            // wait until the deadline when the job is still running.
319            if let Ok(st) = job_dir.read_state()
320                && *st.status() != JobStatus::Running
321            {
322                debug!("snapshot poll: job no longer running, exiting early");
323                break;
324            }
325            // Exit when deadline is reached.
326            if std::time::Instant::now() >= deadline {
327                debug!("snapshot poll: deadline reached");
328                break;
329            }
330        }
331        let snap = build_snapshot(&job_dir, opts.tail_lines, opts.max_bytes);
332        Some(snap)
333    } else {
334        None
335    };
336
337    // If --wait is set, wait for the job to reach a terminal state.
338    // Unlike snapshot_after, there is no upper bound on wait time.
339    // waited_ms will accumulate the full time spent (snapshot_after + wait phases).
340    let (final_state, exit_code_opt, finished_at_opt, final_snapshot_opt) = if opts.wait {
341        debug!("--wait: polling for terminal state");
342        let poll = std::time::Duration::from_millis(opts.wait_poll_ms.max(1));
343        loop {
344            std::thread::sleep(poll);
345            if let Ok(st) = job_dir.read_state()
346                && *st.status() != JobStatus::Running
347            {
348                let snap = build_snapshot(&job_dir, opts.tail_lines, opts.max_bytes);
349                let ec = st.exit_code();
350                let fa = st.finished_at.clone();
351                let state_str = st.status().as_str().to_string();
352                break (state_str, ec, fa, Some(snap));
353            }
354        }
355    } else {
356        (JobStatus::Running.as_str().to_string(), None, None, None)
357    };
358
359    // waited_ms reflects the total time spent waiting (snapshot_after + --wait phases).
360    let waited_ms = wait_start.elapsed().as_millis() as u64;
361
362    let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
363
364    let response = Response::new(
365        "run",
366        RunData {
367            job_id,
368            state: final_state,
369            // Include masked env_vars in the JSON response so callers can inspect
370            // which variables were set (with secret values replaced by "***").
371            env_vars: masked_env_vars,
372            snapshot,
373            stdout_log_path,
374            stderr_log_path,
375            waited_ms,
376            elapsed_ms,
377            exit_code: exit_code_opt,
378            finished_at: finished_at_opt,
379            final_snapshot: final_snapshot_opt,
380        },
381    );
382    response.print();
383    Ok(())
384}
385
386fn build_snapshot(job_dir: &JobDir, tail_lines: u64, max_bytes: u64) -> Snapshot {
387    let stdout = job_dir.read_tail_metrics("stdout.log", tail_lines, max_bytes);
388    let stderr = job_dir.read_tail_metrics("stderr.log", tail_lines, max_bytes);
389    Snapshot {
390        truncated: stdout.truncated || stderr.truncated,
391        encoding: "utf-8-lossy".to_string(),
392        stdout_observed_bytes: stdout.observed_bytes,
393        stderr_observed_bytes: stderr.observed_bytes,
394        stdout_included_bytes: stdout.included_bytes,
395        stderr_included_bytes: stderr.included_bytes,
396        stdout_tail: stdout.tail,
397        stderr_tail: stderr.tail,
398    }
399}
400
401/// Options for the `_supervise` internal sub-command.
402///
403/// Masking is the responsibility of `run` (which writes masked values to meta.json
404/// and includes them in the JSON response). The supervisor only needs the real
405/// environment variable values to launch the child process correctly.
406#[derive(Debug)]
407pub struct SuperviseOpts<'a> {
408    pub job_id: &'a str,
409    pub root: &'a Path,
410    pub command: &'a [String],
411    /// Override full.log path; None = use job dir default.
412    pub full_log: Option<&'a str>,
413    /// Timeout in milliseconds; 0 = no timeout.
414    pub timeout_ms: u64,
415    /// Milliseconds after SIGTERM before SIGKILL; 0 = immediate SIGKILL.
416    pub kill_after_ms: u64,
417    /// Working directory for the child process.
418    pub cwd: Option<&'a str>,
419    /// Environment variables as KEY=VALUE strings (real values, not masked).
420    pub env_vars: Vec<String>,
421    /// Paths to env files, applied in order.
422    pub env_files: Vec<String>,
423    /// Whether to inherit the current process environment.
424    pub inherit_env: bool,
425    /// Interval (ms) for state.json updated_at refresh; 0 = disabled.
426    pub progress_every_ms: u64,
427    /// Shell command string for command notification sink; executed via platform shell.
428    /// None = no command sink.
429    pub notify_command: Option<String>,
430    /// File path for NDJSON notification sink; None = no file sink.
431    pub notify_file: Option<String>,
432    /// Resolved shell wrapper argv used to execute command strings.
433    pub shell_wrapper: Vec<String>,
434}
435
436/// Resolve the effective working directory for a job.
437///
438/// If `cwd_override` is `Some`, use that path as the base. Otherwise use the
439/// current process working directory. In either case, attempt to canonicalize
440/// the path for consistent comparison; on failure, fall back to the absolute
441/// path representation (avoids symlink / permission issues on some systems).
442pub fn resolve_effective_cwd(cwd_override: Option<&str>) -> String {
443    let base = match cwd_override {
444        Some(p) => std::path::PathBuf::from(p),
445        None => std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")),
446    };
447
448    // Prefer canonicalized (resolves symlinks); fall back to making the path absolute.
449    match base.canonicalize() {
450        Ok(canonical) => canonical.display().to_string(),
451        Err(_) => {
452            // If base is already absolute, use as-is; otherwise prepend cwd.
453            if base.is_absolute() {
454                base.display().to_string()
455            } else {
456                // Best-effort: join with cwd, ignore errors.
457                let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
458                cwd.join(base).display().to_string()
459            }
460        }
461    }
462}
463
464/// Mask the values of specified keys in a list of KEY=VALUE strings.
465/// Keys listed in `mask_keys` will have their value replaced with "***".
466fn mask_env_vars(env_vars: &[String], mask_keys: &[String]) -> Vec<String> {
467    if mask_keys.is_empty() {
468        return env_vars.to_vec();
469    }
470    env_vars
471        .iter()
472        .map(|s| {
473            let (key, _val) = parse_env_var(s);
474            if mask_keys.iter().any(|k| k == &key) {
475                format!("{key}=***")
476            } else {
477                s.clone()
478            }
479        })
480        .collect()
481}
482
483/// Parse a single KEY=VALUE or KEY= string into (key, value).
484fn parse_env_var(s: &str) -> (String, String) {
485    if let Some(pos) = s.find('=') {
486        (s[..pos].to_string(), s[pos + 1..].to_string())
487    } else {
488        (s.to_string(), String::new())
489    }
490}
491
492/// Load environment variables from a .env-style file.
493/// Supports KEY=VALUE lines; lines starting with '#' and empty lines are ignored.
494fn load_env_file(path: &str) -> Result<Vec<(String, String)>> {
495    let contents =
496        std::fs::read_to_string(path).with_context(|| format!("read env-file {path}"))?;
497    let mut vars = Vec::new();
498    for line in contents.lines() {
499        let line = line.trim();
500        if line.is_empty() || line.starts_with('#') {
501            continue;
502        }
503        vars.push(parse_env_var(line));
504    }
505    Ok(vars)
506}
507
508/// Stream bytes from a child process output pipe to an individual log file and
509/// to the shared `full.log`.
510///
511/// Reads byte chunks (not lines) so that output without a trailing newline is
512/// still captured in the individual log immediately.  The `full.log` format
513/// `"<RFC3339> [LABEL] <line>"` is maintained via a line-accumulation buffer:
514/// bytes are appended to the buffer until a newline is found, at which point a
515/// formatted line is written to `full.log`.  Any remaining bytes at EOF are
516/// flushed as a final line.
517///
518/// This helper is used by both the stdout and stderr monitoring threads inside
519/// [`supervise`], replacing the previously duplicated per-stream implementations.
520/// Buffer size (8192 bytes) and newline-split logic are preserved unchanged.
521fn stream_to_logs<R>(
522    stream: R,
523    log_path: &std::path::Path,
524    full_log: std::sync::Arc<std::sync::Mutex<std::fs::File>>,
525    label: &str,
526) where
527    R: std::io::Read,
528{
529    use std::io::Write;
530    let mut log_file = std::fs::File::create(log_path).expect("create stream log file in thread");
531    let mut stream = stream;
532    let mut buf = [0u8; 8192];
533    // Incomplete-line buffer for full.log formatting.
534    let mut line_buf: Vec<u8> = Vec::new();
535    loop {
536        match stream.read(&mut buf) {
537            Ok(0) => break, // EOF
538            Ok(n) => {
539                let chunk = &buf[..n];
540                // Write raw bytes to the individual log (captures partial lines too).
541                let _ = log_file.write_all(chunk);
542                // Accumulate bytes for full.log line formatting.
543                for &b in chunk {
544                    if b == b'\n' {
545                        let line = String::from_utf8_lossy(&line_buf);
546                        if let Ok(mut fl) = full_log.lock() {
547                            let ts = now_rfc3339();
548                            let _ = writeln!(fl, "{ts} [{label}] {line}");
549                        }
550                        line_buf.clear();
551                    } else {
552                        line_buf.push(b);
553                    }
554                }
555            }
556            Err(_) => break,
557        }
558    }
559    // Flush any remaining incomplete line to full.log.
560    if !line_buf.is_empty() {
561        let line = String::from_utf8_lossy(&line_buf);
562        if let Ok(mut fl) = full_log.lock() {
563            let ts = now_rfc3339();
564            let _ = writeln!(fl, "{ts} [{label}] {line}");
565        }
566    }
567}
568
569/// Internal supervisor sub-command.
570///
571/// Runs the target command, streams stdout/stderr to individual log files
572/// (`stdout.log`, `stderr.log`) **and** to the combined `full.log`, then
573/// updates `state.json` when the process finishes.
574///
575/// On Windows, the child process is assigned to a named Job Object so that
576/// the entire process tree can be terminated with a single `kill` call.
577/// The Job Object name is recorded in `state.json` as `windows_job_name`.
578pub fn supervise(opts: SuperviseOpts) -> Result<()> {
579    use std::sync::{Arc, Mutex};
580
581    let job_id = opts.job_id;
582    let root = opts.root;
583    let command = opts.command;
584
585    if command.is_empty() {
586        anyhow::bail!("supervisor: no command");
587    }
588
589    let job_dir = JobDir::open(root, job_id)?;
590
591    // Read meta.json to get the started_at timestamp.
592    let meta = job_dir.read_meta()?;
593    let started_at = meta.created_at.clone();
594
595    // Determine full.log path.
596    let full_log_path = if let Some(p) = opts.full_log {
597        std::path::PathBuf::from(p)
598    } else {
599        job_dir.full_log_path()
600    };
601
602    // Create the full.log file (shared between stdout/stderr threads).
603    // Ensure parent directories exist for custom paths.
604    if let Some(parent) = full_log_path.parent() {
605        std::fs::create_dir_all(parent)
606            .with_context(|| format!("create dir for full.log: {}", parent.display()))?;
607    }
608    let full_log_file = std::fs::File::create(&full_log_path).context("create full.log")?;
609    let full_log = Arc::new(Mutex::new(full_log_file));
610
611    // Execute command through the shell wrapper (same launcher as --notify-command).
612    //
613    // Single-element commands are treated as shell command strings and passed
614    // as-is (e.g. `"echo hello && ls"` preserves shell operators).
615    // Multi-element argv have each element POSIX-single-quoted before joining so
616    // that arguments with spaces, `$`, or special characters are preserved through
617    // the shell layer (e.g. `["sh", "-c", "exit 42"]` → `'sh' '-c' 'exit 42'`).
618    let cmd_str = build_cmd_str(command);
619    if opts.shell_wrapper.is_empty() {
620        anyhow::bail!("supervisor: shell wrapper must not be empty");
621    }
622    let mut child_cmd = Command::new(&opts.shell_wrapper[0]);
623    child_cmd.args(&opts.shell_wrapper[1..]).arg(&cmd_str);
624
625    if opts.inherit_env {
626        // Start with the current environment (default).
627    } else {
628        child_cmd.env_clear();
629    }
630
631    // Apply env files in order.
632    for env_file in &opts.env_files {
633        let vars = load_env_file(env_file)?;
634        for (k, v) in vars {
635            child_cmd.env(&k, &v);
636        }
637    }
638
639    // Apply --env KEY=VALUE overrides (applied after env-files).
640    for env_var in &opts.env_vars {
641        let (k, v) = parse_env_var(env_var);
642        child_cmd.env(&k, &v);
643    }
644
645    // Set working directory if specified.
646    if let Some(cwd) = opts.cwd {
647        child_cmd.current_dir(cwd);
648    }
649
650    // Spawn the child with piped stdout/stderr so we can tee to logs.
651    let mut child = child_cmd
652        .stdin(std::process::Stdio::null())
653        .stdout(std::process::Stdio::piped())
654        .stderr(std::process::Stdio::piped())
655        .spawn()
656        .context("supervisor: spawn child")?;
657
658    let pid = child.id();
659    info!(job_id, pid, "child process started");
660
661    // On Windows, assign child to a named Job Object for process-tree management.
662    // The job name is derived from the job_id so that `kill` can look it up.
663    // Assignment is a MUST requirement on Windows: if it fails, the supervisor
664    // kills the child process and updates state.json to "failed" before returning
665    // an error, so that the run front-end (which may have already returned) can
666    // detect the failure via state.json on next poll.
667    #[cfg(windows)]
668    let windows_job_name = {
669        match assign_to_job_object(job_id, pid) {
670            Ok(name) => Some(name),
671            Err(e) => {
672                // Job Object assignment failed. Per design.md this is a MUST
673                // requirement on Windows. Kill the child process and update
674                // state.json to "failed" so the run front-end can detect it.
675                let kill_err = child.kill();
676                let _ = child.wait(); // reap to avoid zombies
677
678                let failed_state = JobState {
679                    job: JobStateJob {
680                        id: job_id.to_string(),
681                        status: JobStatus::Failed,
682                        started_at: started_at.clone(),
683                    },
684                    result: JobStateResult {
685                        exit_code: None,
686                        signal: None,
687                        duration_ms: None,
688                    },
689                    pid: Some(pid),
690                    finished_at: Some(now_rfc3339()),
691                    updated_at: now_rfc3339(),
692                    windows_job_name: None,
693                };
694                // Best-effort: if writing state fails, we still propagate the
695                // original assignment error.
696                let _ = job_dir.write_state(&failed_state);
697
698                // Dispatch completion event for the failed state if notifications are configured.
699                // This mirrors the dispatch logic in the normal exit path so that callers
700                // receive a job.finished event even when the supervisor fails early (Windows only).
701                if opts.notify_command.is_some() || opts.notify_file.is_some() {
702                    let finished_at_ts =
703                        failed_state.finished_at.clone().unwrap_or_else(now_rfc3339);
704                    let stdout_log = job_dir.stdout_path().display().to_string();
705                    let stderr_log = job_dir.stderr_path().display().to_string();
706                    let fail_event = crate::schema::CompletionEvent {
707                        schema_version: crate::schema::SCHEMA_VERSION.to_string(),
708                        event_type: "job.finished".to_string(),
709                        job_id: job_id.to_string(),
710                        state: JobStatus::Failed.as_str().to_string(),
711                        command: meta.command.clone(),
712                        cwd: meta.cwd.clone(),
713                        started_at: started_at.clone(),
714                        finished_at: finished_at_ts,
715                        duration_ms: None,
716                        exit_code: None,
717                        signal: None,
718                        stdout_log_path: stdout_log,
719                        stderr_log_path: stderr_log,
720                    };
721                    let fail_event_json = serde_json::to_string(&fail_event).unwrap_or_default();
722                    let fail_event_path = job_dir.completion_event_path().display().to_string();
723                    let mut fail_delivery_results: Vec<crate::schema::SinkDeliveryResult> =
724                        Vec::new();
725                    if let Err(we) = job_dir.write_completion_event_atomic(
726                        &crate::schema::CompletionEventRecord {
727                            event: fail_event.clone(),
728                            delivery_results: vec![],
729                        },
730                    ) {
731                        warn!(
732                            job_id,
733                            error = %we,
734                            "failed to write initial completion_event.json for failed job"
735                        );
736                    }
737                    if let Some(ref shell_cmd) = opts.notify_command {
738                        fail_delivery_results.push(dispatch_command_sink(
739                            shell_cmd,
740                            &fail_event_json,
741                            job_id,
742                            &fail_event_path,
743                            &opts.shell_wrapper,
744                        ));
745                    }
746                    if let Some(ref file_path) = opts.notify_file {
747                        fail_delivery_results.push(dispatch_file_sink(file_path, &fail_event_json));
748                    }
749                    if let Err(we) = job_dir.write_completion_event_atomic(
750                        &crate::schema::CompletionEventRecord {
751                            event: fail_event,
752                            delivery_results: fail_delivery_results,
753                        },
754                    ) {
755                        warn!(
756                            job_id,
757                            error = %we,
758                            "failed to update completion_event.json with delivery results for failed job"
759                        );
760                    }
761                }
762
763                if let Err(ke) = kill_err {
764                    return Err(anyhow::anyhow!(
765                        "supervisor: failed to assign pid {pid} to Job Object \
766                         (Windows MUST requirement): {e}; also failed to kill child: {ke}"
767                    ));
768                }
769                return Err(anyhow::anyhow!(
770                    "supervisor: failed to assign pid {pid} to Job Object \
771                     (Windows MUST requirement); child process was killed; \
772                     consider running outside a nested Job Object environment: {e}"
773                ));
774            }
775        }
776    };
777    #[cfg(not(windows))]
778    let windows_job_name: Option<String> = None;
779
780    // Update state.json with real child PID and Windows Job Object name.
781    // On Windows, windows_job_name is always Some at this point (guaranteed
782    // by the MUST requirement above), so state.json will always contain the
783    // Job Object identifier while the job is running.
784    let state = JobState {
785        job: JobStateJob {
786            id: job_id.to_string(),
787            status: JobStatus::Running,
788            started_at: started_at.clone(),
789        },
790        result: JobStateResult {
791            exit_code: None,
792            signal: None,
793            duration_ms: None,
794        },
795        pid: Some(pid),
796        finished_at: None,
797        updated_at: now_rfc3339(),
798        windows_job_name,
799    };
800    job_dir.write_state(&state)?;
801
802    let child_start_time = std::time::Instant::now();
803
804    // Take stdout/stderr handles before moving child.
805    let child_stdout = child.stdout.take().expect("child stdout piped");
806    let child_stderr = child.stderr.take().expect("child stderr piped");
807
808    // Thread: read stdout, write to stdout.log and full.log.
809    let stdout_log_path = job_dir.stdout_path();
810    let full_log_stdout = Arc::clone(&full_log);
811    let t_stdout = std::thread::spawn(move || {
812        stream_to_logs(child_stdout, &stdout_log_path, full_log_stdout, "STDOUT");
813    });
814
815    // Thread: read stderr, write to stderr.log and full.log.
816    let stderr_log_path = job_dir.stderr_path();
817    let full_log_stderr = Arc::clone(&full_log);
818    let t_stderr = std::thread::spawn(move || {
819        stream_to_logs(child_stderr, &stderr_log_path, full_log_stderr, "STDERR");
820    });
821
822    // Timeout / kill-after / progress-every handling.
823    // We spawn a watcher thread to handle timeout and periodic state.json updates.
824    let timeout_ms = opts.timeout_ms;
825    let kill_after_ms = opts.kill_after_ms;
826    let progress_every_ms = opts.progress_every_ms;
827    let state_path = job_dir.state_path();
828    let job_id_str = job_id.to_string();
829
830    // Use an atomic flag to signal the watcher thread when the child has exited.
831    use std::sync::atomic::{AtomicBool, Ordering};
832    let child_done = Arc::new(AtomicBool::new(false));
833
834    let watcher = if timeout_ms > 0 || progress_every_ms > 0 {
835        let state_path_clone = state_path.clone();
836        let child_done_clone = Arc::clone(&child_done);
837        Some(std::thread::spawn(move || {
838            let start = std::time::Instant::now();
839            let timeout_dur = if timeout_ms > 0 {
840                Some(std::time::Duration::from_millis(timeout_ms))
841            } else {
842                None
843            };
844            let progress_dur = if progress_every_ms > 0 {
845                Some(std::time::Duration::from_millis(progress_every_ms))
846            } else {
847                None
848            };
849
850            let poll_interval = std::time::Duration::from_millis(100);
851
852            loop {
853                std::thread::sleep(poll_interval);
854
855                // Exit the watcher loop if the child process has finished.
856                if child_done_clone.load(Ordering::Relaxed) {
857                    break;
858                }
859
860                let elapsed = start.elapsed();
861
862                // Check for timeout.
863                if let Some(td) = timeout_dur
864                    && elapsed >= td
865                {
866                    info!(job_id = %job_id_str, "timeout reached, sending SIGTERM");
867                    // Send SIGTERM.
868                    #[cfg(unix)]
869                    {
870                        unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM) };
871                    }
872                    // If kill_after > 0, wait kill_after ms then SIGKILL.
873                    if kill_after_ms > 0 {
874                        std::thread::sleep(std::time::Duration::from_millis(kill_after_ms));
875                        info!(job_id = %job_id_str, "kill-after elapsed, sending SIGKILL");
876                        #[cfg(unix)]
877                        {
878                            unsafe { libc::kill(pid as libc::pid_t, libc::SIGKILL) };
879                        }
880                    } else {
881                        // Immediate SIGKILL.
882                        #[cfg(unix)]
883                        {
884                            unsafe { libc::kill(pid as libc::pid_t, libc::SIGKILL) };
885                        }
886                    }
887                    break;
888                }
889
890                // Progress-every: update updated_at periodically.
891                if let Some(pd) = progress_dur {
892                    let elapsed_ms = elapsed.as_millis() as u64;
893                    let pd_ms = pd.as_millis() as u64;
894                    let poll_ms = poll_interval.as_millis() as u64;
895                    if elapsed_ms % pd_ms < poll_ms {
896                        // Read, update updated_at, write back.
897                        if let Ok(raw) = std::fs::read(&state_path_clone)
898                            && let Ok(mut st) =
899                                serde_json::from_slice::<crate::schema::JobState>(&raw)
900                        {
901                            st.updated_at = now_rfc3339();
902                            if let Ok(s) = serde_json::to_string_pretty(&st) {
903                                let _ = std::fs::write(&state_path_clone, s);
904                            }
905                        }
906                    }
907                }
908            }
909        }))
910    } else {
911        None
912    };
913
914    // Wait for child to finish.
915    let exit_status = child.wait().context("wait for child")?;
916
917    // Signal the watcher that the child has finished so it can exit its loop.
918    child_done.store(true, Ordering::Relaxed);
919
920    // Join logging threads.
921    let _ = t_stdout.join();
922    let _ = t_stderr.join();
923
924    // Join watcher if present.
925    if let Some(w) = watcher {
926        let _ = w.join();
927    }
928
929    let duration_ms = child_start_time.elapsed().as_millis() as u64;
930    let exit_code = exit_status.code();
931    let finished_at = now_rfc3339();
932
933    // Detect signal-killed processes on Unix for accurate state and completion event.
934    #[cfg(unix)]
935    let (terminal_status, signal_name) = {
936        use std::os::unix::process::ExitStatusExt;
937        if let Some(sig) = exit_status.signal() {
938            (JobStatus::Killed, Some(sig.to_string()))
939        } else {
940            (JobStatus::Exited, None)
941        }
942    };
943    #[cfg(not(unix))]
944    let (terminal_status, signal_name) = (JobStatus::Exited, None::<String>);
945
946    let state = JobState {
947        job: JobStateJob {
948            id: job_id.to_string(),
949            status: terminal_status.clone(),
950            started_at: started_at.clone(),
951        },
952        result: JobStateResult {
953            exit_code,
954            signal: signal_name.clone(),
955            duration_ms: Some(duration_ms),
956        },
957        pid: Some(pid),
958        finished_at: Some(finished_at.clone()),
959        updated_at: now_rfc3339(),
960        windows_job_name: None, // not needed after process exits
961    };
962    job_dir.write_state(&state)?;
963    info!(job_id, ?exit_code, "child process finished");
964
965    // Dispatch completion event to configured notification sinks.
966    // Failure here must not alter job state (delivery result is recorded separately).
967    let has_notification = opts.notify_command.is_some() || opts.notify_file.is_some();
968    if has_notification {
969        let stdout_log = job_dir.stdout_path().display().to_string();
970        let stderr_log = job_dir.stderr_path().display().to_string();
971        let event = crate::schema::CompletionEvent {
972            schema_version: crate::schema::SCHEMA_VERSION.to_string(),
973            event_type: "job.finished".to_string(),
974            job_id: job_id.to_string(),
975            state: terminal_status.as_str().to_string(),
976            command: meta.command.clone(),
977            cwd: meta.cwd.clone(),
978            started_at,
979            finished_at,
980            duration_ms: Some(duration_ms),
981            exit_code,
982            signal: signal_name,
983            stdout_log_path: stdout_log,
984            stderr_log_path: stderr_log,
985        };
986
987        let event_json = serde_json::to_string(&event).unwrap_or_default();
988        let event_path = job_dir.completion_event_path().display().to_string();
989        let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
990
991        // Write initial completion_event.json before dispatching sinks.
992        if let Err(e) =
993            job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
994                event: event.clone(),
995                delivery_results: vec![],
996            })
997        {
998            warn!(job_id, error = %e, "failed to write initial completion_event.json");
999        }
1000
1001        if let Some(ref shell_cmd) = opts.notify_command {
1002            delivery_results.push(dispatch_command_sink(
1003                shell_cmd,
1004                &event_json,
1005                job_id,
1006                &event_path,
1007                &opts.shell_wrapper,
1008            ));
1009        }
1010        if let Some(ref file_path) = opts.notify_file {
1011            delivery_results.push(dispatch_file_sink(file_path, &event_json));
1012        }
1013
1014        // Update completion_event.json with delivery results.
1015        if let Err(e) =
1016            job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1017                event,
1018                delivery_results,
1019            })
1020        {
1021            warn!(job_id, error = %e, "failed to update completion_event.json with delivery results");
1022        }
1023    }
1024
1025    Ok(())
1026}
1027
1028/// Dispatch the command sink: execute the shell command string via the configured shell wrapper,
1029/// pass event JSON via stdin, and set AGENT_EXEC_EVENT_PATH / AGENT_EXEC_JOB_ID /
1030/// AGENT_EXEC_EVENT_TYPE env vars.
1031///
1032/// The shell wrapper argv (e.g. `["sh", "-lc"]`) is provided by the caller.
1033/// The command string is appended as the final argument to the wrapper.
1034fn dispatch_command_sink(
1035    shell_cmd: &str,
1036    event_json: &str,
1037    job_id: &str,
1038    event_path: &str,
1039    shell_wrapper: &[String],
1040) -> crate::schema::SinkDeliveryResult {
1041    use std::io::Write;
1042    let attempted_at = now_rfc3339();
1043    let target = shell_cmd.to_string();
1044
1045    if shell_cmd.trim().is_empty() {
1046        return crate::schema::SinkDeliveryResult {
1047            sink_type: "command".to_string(),
1048            target,
1049            success: false,
1050            error: Some("empty shell command".to_string()),
1051            attempted_at,
1052        };
1053    }
1054
1055    if shell_wrapper.is_empty() {
1056        return crate::schema::SinkDeliveryResult {
1057            sink_type: "command".to_string(),
1058            target,
1059            success: false,
1060            error: Some("shell wrapper must not be empty".to_string()),
1061            attempted_at,
1062        };
1063    }
1064
1065    let mut cmd = Command::new(&shell_wrapper[0]);
1066    cmd.args(&shell_wrapper[1..]).arg(shell_cmd);
1067
1068    cmd.env("AGENT_EXEC_EVENT_PATH", event_path);
1069    cmd.env("AGENT_EXEC_JOB_ID", job_id);
1070    cmd.env("AGENT_EXEC_EVENT_TYPE", "job.finished");
1071    cmd.stdin(std::process::Stdio::piped());
1072    cmd.stdout(std::process::Stdio::null());
1073    cmd.stderr(std::process::Stdio::null());
1074
1075    match cmd.spawn() {
1076        Ok(mut child) => {
1077            if let Some(mut stdin) = child.stdin.take() {
1078                let _ = stdin.write_all(event_json.as_bytes());
1079            }
1080            match child.wait() {
1081                Ok(status) if status.success() => crate::schema::SinkDeliveryResult {
1082                    sink_type: "command".to_string(),
1083                    target,
1084                    success: true,
1085                    error: None,
1086                    attempted_at,
1087                },
1088                Ok(status) => crate::schema::SinkDeliveryResult {
1089                    sink_type: "command".to_string(),
1090                    target,
1091                    success: false,
1092                    error: Some(format!("exited with status {status}")),
1093                    attempted_at,
1094                },
1095                Err(e) => crate::schema::SinkDeliveryResult {
1096                    sink_type: "command".to_string(),
1097                    target,
1098                    success: false,
1099                    error: Some(format!("wait error: {e}")),
1100                    attempted_at,
1101                },
1102            }
1103        }
1104        Err(e) => crate::schema::SinkDeliveryResult {
1105            sink_type: "command".to_string(),
1106            target,
1107            success: false,
1108            error: Some(format!("spawn error: {e}")),
1109            attempted_at,
1110        },
1111    }
1112}
1113
1114/// Dispatch the file sink: append event JSON as a single NDJSON line.
1115/// Creates parent directories automatically.
1116fn dispatch_file_sink(file_path: &str, event_json: &str) -> crate::schema::SinkDeliveryResult {
1117    use std::io::Write;
1118    let attempted_at = now_rfc3339();
1119    let path = std::path::Path::new(file_path);
1120
1121    if let Some(parent) = path.parent()
1122        && let Err(e) = std::fs::create_dir_all(parent)
1123    {
1124        return crate::schema::SinkDeliveryResult {
1125            sink_type: "file".to_string(),
1126            target: file_path.to_string(),
1127            success: false,
1128            error: Some(format!("create parent dir: {e}")),
1129            attempted_at,
1130        };
1131    }
1132
1133    match std::fs::OpenOptions::new()
1134        .create(true)
1135        .append(true)
1136        .open(path)
1137    {
1138        Ok(mut f) => match writeln!(f, "{event_json}") {
1139            Ok(_) => crate::schema::SinkDeliveryResult {
1140                sink_type: "file".to_string(),
1141                target: file_path.to_string(),
1142                success: true,
1143                error: None,
1144                attempted_at,
1145            },
1146            Err(e) => crate::schema::SinkDeliveryResult {
1147                sink_type: "file".to_string(),
1148                target: file_path.to_string(),
1149                success: false,
1150                error: Some(format!("write error: {e}")),
1151                attempted_at,
1152            },
1153        },
1154        Err(e) => crate::schema::SinkDeliveryResult {
1155            sink_type: "file".to_string(),
1156            target: file_path.to_string(),
1157            success: false,
1158            error: Some(format!("open error: {e}")),
1159            attempted_at,
1160        },
1161    }
1162}
1163
1164/// Public alias so other modules can call the timestamp helper.
1165pub fn now_rfc3339_pub() -> String {
1166    now_rfc3339()
1167}
1168
1169fn now_rfc3339() -> String {
1170    // Use a simple approach that works without chrono.
1171    let d = std::time::SystemTime::now()
1172        .duration_since(std::time::UNIX_EPOCH)
1173        .unwrap_or_default();
1174    format_rfc3339(d.as_secs())
1175}
1176
1177fn format_rfc3339(secs: u64) -> String {
1178    // Manual conversion of Unix timestamp to UTC date-time string.
1179    let mut s = secs;
1180    let seconds = s % 60;
1181    s /= 60;
1182    let minutes = s % 60;
1183    s /= 60;
1184    let hours = s % 24;
1185    s /= 24;
1186
1187    // Days since 1970-01-01
1188    let mut days = s;
1189    let mut year = 1970u64;
1190    loop {
1191        let days_in_year = if is_leap(year) { 366 } else { 365 };
1192        if days < days_in_year {
1193            break;
1194        }
1195        days -= days_in_year;
1196        year += 1;
1197    }
1198
1199    let leap = is_leap(year);
1200    let month_days: [u64; 12] = [
1201        31,
1202        if leap { 29 } else { 28 },
1203        31,
1204        30,
1205        31,
1206        30,
1207        31,
1208        31,
1209        30,
1210        31,
1211        30,
1212        31,
1213    ];
1214    let mut month = 0usize;
1215    for (i, &d) in month_days.iter().enumerate() {
1216        if days < d {
1217            month = i;
1218            break;
1219        }
1220        days -= d;
1221    }
1222    let day = days + 1;
1223
1224    format!(
1225        "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
1226        year,
1227        month + 1,
1228        day,
1229        hours,
1230        minutes,
1231        seconds
1232    )
1233}
1234
1235fn is_leap(year: u64) -> bool {
1236    (year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
1237}
1238
1239/// Windows-only: create a named Job Object and assign the given child process
1240/// to it so that the entire process tree can be terminated via `kill`.
1241///
1242/// The Job Object is named `"AgentExec-{job_id}"`. This name is stored in
1243/// `state.json` so that future `kill` invocations can open the same Job Object
1244/// by name and call `TerminateJobObject` to stop the whole tree.
1245///
1246/// Returns `Ok(name)` on success.  Returns `Err` on failure — the caller
1247/// (`supervise`) treats failure as a fatal error because reliable process-tree
1248/// management is a Windows MUST requirement (design.md).
1249#[cfg(windows)]
1250fn assign_to_job_object(job_id: &str, pid: u32) -> Result<String> {
1251    use windows::Win32::Foundation::CloseHandle;
1252    use windows::Win32::System::JobObjects::{AssignProcessToJobObject, CreateJobObjectW};
1253    use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
1254    use windows::core::HSTRING;
1255
1256    let job_name = format!("AgentExec-{job_id}");
1257    let hname = HSTRING::from(job_name.as_str());
1258
1259    unsafe {
1260        // Open the child process handle (needed for AssignProcessToJobObject).
1261        let proc_handle =
1262            OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid).map_err(|e| {
1263                anyhow::anyhow!(
1264                    "supervisor: OpenProcess(pid={pid}) failed — cannot assign to Job Object: {e}"
1265                )
1266            })?;
1267
1268        // Create a named Job Object.
1269        let job = match CreateJobObjectW(None, &hname) {
1270            Ok(h) => h,
1271            Err(e) => {
1272                let _ = CloseHandle(proc_handle);
1273                return Err(anyhow::anyhow!(
1274                    "supervisor: CreateJobObjectW({job_name}) failed: {e}"
1275                ));
1276            }
1277        };
1278
1279        // Assign the child process to the Job Object.
1280        // This can fail if the process is already in another job (e.g. CI/nested).
1281        // Per design.md, assignment is a MUST on Windows — failure is a fatal error.
1282        if let Err(e) = AssignProcessToJobObject(job, proc_handle) {
1283            let _ = CloseHandle(job);
1284            let _ = CloseHandle(proc_handle);
1285            return Err(anyhow::anyhow!(
1286                "supervisor: AssignProcessToJobObject(pid={pid}) failed \
1287                 (process may already belong to another Job Object, e.g. in a CI environment): {e}"
1288            ));
1289        }
1290
1291        // Keep job handle open for the lifetime of the supervisor so the Job
1292        // Object remains valid. We intentionally do NOT close it here.
1293        // The OS will close it automatically when the supervisor exits.
1294        // (We close proc_handle since we only needed it for assignment.)
1295        let _ = CloseHandle(proc_handle);
1296        // Note: job handle is intentionally leaked here to keep the Job Object alive.
1297        // The handle will be closed when the supervisor process exits.
1298        std::mem::forget(job);
1299    }
1300
1301    info!(job_id, name = %job_name, "supervisor: child assigned to Job Object");
1302    Ok(job_name)
1303}
1304
1305/// Build the shell command string passed to the shell wrapper.
1306///
1307/// - Single-element commands are treated as shell command strings and passed
1308///   as-is, preserving shell operators like `&&`, pipes, etc.
1309/// - Multi-element commands have each element POSIX-single-quoted before
1310///   joining so that argv semantics survive the shell layer, including
1311///   arguments that contain spaces, `$`, quotes, or other special characters.
1312fn build_cmd_str(command: &[String]) -> String {
1313    if command.len() == 1 {
1314        command[0].clone()
1315    } else {
1316        command
1317            .iter()
1318            .map(|s| posix_single_quote(s))
1319            .collect::<Vec<_>>()
1320            .join(" ")
1321    }
1322}
1323
1324/// Wrap a string in POSIX single quotes, escaping any embedded single quotes.
1325fn posix_single_quote(s: &str) -> String {
1326    format!("'{}'", s.replace('\'', "'\\''"))
1327}
1328
1329#[cfg(test)]
1330mod tests {
1331    use super::*;
1332
1333    #[test]
1334    fn rfc3339_epoch() {
1335        assert_eq!(format_rfc3339(0), "1970-01-01T00:00:00Z");
1336    }
1337
1338    #[test]
1339    fn rfc3339_known_date() {
1340        // 2024-01-01T00:00:00Z = 1704067200
1341        assert_eq!(format_rfc3339(1704067200), "2024-01-01T00:00:00Z");
1342    }
1343}