Skip to main content

agent_exec/
start.rs

1//! Implementation of the `start` sub-command.
2//!
3//! `start` launches a previously `create`d job.  It reads the persisted
4//! execution definition from `meta.json`, validates that the job is in
5//! `created` state, and then spawns the supervisor process.
6
7use anyhow::Result;
8use tracing::info;
9
10use crate::jobstore::{InvalidJobState, JobDir, resolve_root};
11use crate::run::{
12    SpawnSupervisorParams, mask_env_vars, observe_inline_output, spawn_supervisor_process,
13};
14use crate::schema::{JobStatus, Response, RunData};
15
16/// Options for the `start` sub-command.
17#[derive(Debug)]
18pub struct StartOpts<'a> {
19    /// Job ID of a previously created job.
20    pub job_id: &'a str,
21    /// Override for jobs root directory.
22    pub root: Option<&'a str>,
23    /// Disable best-effort auto-GC for this invocation.
24    pub no_auto_gc: bool,
25    /// Optional auto-GC retention override.
26    pub auto_gc_older_than: Option<String>,
27    /// Optional auto-GC max-jobs override.
28    pub auto_gc_max_jobs: Option<u64>,
29    /// Optional auto-GC max-bytes override.
30    pub auto_gc_max_bytes: Option<u64>,
31    /// Base auto-GC settings resolved from config/defaults.
32    pub auto_gc_config: crate::gc::AutoGcConfig,
33    /// Wait for inline output observation before returning.
34    pub wait: bool,
35    /// Maximum wait duration in seconds for inline observation.
36    pub until_seconds: u64,
37    /// Wait indefinitely for terminal state / observation budget.
38    pub forever: bool,
39    /// Maximum bytes to include from the head of each stream.
40    pub max_bytes: u64,
41    pub compression_mode: crate::compress::CompressionMode,
42}
43
44/// Execute `start`: launch a created job and return JSON.
45pub fn execute(opts: StartOpts) -> Result<()> {
46    let root = resolve_root(opts.root);
47    let job_dir = JobDir::open(&root, opts.job_id)?;
48
49    let meta = job_dir.read_meta()?;
50    let state = job_dir.read_state()?;
51
52    // Only jobs in `created` state can be started.
53    if *state.status() != JobStatus::Created {
54        return Err(anyhow::Error::new(InvalidJobState(format!(
55            "job {} is in '{}' state; only 'created' jobs can be started",
56            opts.job_id,
57            state.status().as_str()
58        ))));
59    }
60
61    info!(job_id = %opts.job_id, "starting created job");
62
63    // Determine full.log path.
64    let full_log_path = job_dir.full_log_path().display().to_string();
65
66    // Resolve shell wrapper: use persisted value from meta, or re-resolve from config.
67    let shell_wrapper = if let Some(ref w) = meta.shell_wrapper {
68        w.clone()
69    } else {
70        crate::config::default_shell_wrapper()
71    };
72
73    // Use the persisted runtime env vars (unmasked) for the supervisor call.
74    // env_vars_runtime stores the actual KEY=VALUE pairs written by `create`; this
75    // ensures that `--mask KEY` only redacts the display/metadata view while the real
76    // value is still applied to the child process environment at start time.
77    // env_files are re-read here (deferred loading) so file contents reflect the
78    // current state of the files at start time, not at create time.
79
80    let (supervisor_pid, started_at) = spawn_supervisor_process(
81        &job_dir,
82        SpawnSupervisorParams {
83            job_id: job_dir.job_id.clone(),
84            root: root.clone(),
85            full_log_path: full_log_path.clone(),
86            timeout_ms: meta.timeout_ms,
87            kill_after_ms: meta.kill_after_ms,
88            cwd: meta.cwd.clone(),
89            env_vars: meta.env_vars_runtime.clone(),
90            env_files: meta.env_files.clone(),
91            inherit_env: meta.inherit_env,
92            stdin_file: meta.stdin_file.clone(),
93            progress_every_ms: meta.progress_every_ms,
94            notify_command: meta
95                .notification
96                .as_ref()
97                .and_then(|n| n.notify_command.clone()),
98            notify_file: meta
99                .notification
100                .as_ref()
101                .and_then(|n| n.notify_file.clone()),
102            shell_wrapper,
103            command: meta.command.clone(),
104        },
105    )?;
106
107    info!(job_id = %opts.job_id, supervisor_pid, started_at = %started_at, "job started");
108
109    if !opts.no_auto_gc {
110        let mut auto_cfg = opts.auto_gc_config.clone();
111        if let Some(v) = opts.auto_gc_older_than {
112            auto_cfg.older_than = v;
113        }
114        if let Some(v) = opts.auto_gc_max_jobs {
115            auto_cfg.max_jobs = usize::try_from(v).ok();
116        }
117        if let Some(v) = opts.auto_gc_max_bytes {
118            auto_cfg.max_bytes = Some(v);
119        }
120        crate::gc::maybe_run_auto_gc(&root, &auto_cfg);
121    }
122
123    let stdout_log_path = job_dir.stdout_path().display().to_string();
124    let stderr_log_path = job_dir.stderr_path().display().to_string();
125
126    // The response uses the masked env_vars (display view), not the runtime values.
127    let masked_env_vars = mask_env_vars(&meta.env_vars_runtime, &meta.mask);
128    let observation = observe_inline_output(
129        &job_dir,
130        opts.wait,
131        opts.until_seconds,
132        opts.forever,
133        opts.max_bytes,
134    )?;
135    let compression = crate::compress::compress(crate::compress::CompressionInput {
136        command: &meta.command,
137        stdout: &observation.stdout,
138        stderr: &observation.stderr,
139        stdout_original_bytes: observation.stdout_total_bytes,
140        stderr_original_bytes: observation.stderr_total_bytes,
141        mode: opts.compression_mode,
142    });
143
144    Response::new(
145        "start",
146        RunData {
147            job_id: job_dir.job_id.clone(),
148            state: observation.state,
149            tags: meta.tags.clone(),
150            env_vars: masked_env_vars,
151            stdout_log_path,
152            stderr_log_path,
153            elapsed_ms: 0,
154            waited_ms: observation.waited_ms,
155            stdout: observation.stdout,
156            stderr: observation.stderr,
157            stdout_range: observation.stdout_range,
158            stderr_range: observation.stderr_range,
159            stdout_total_bytes: observation.stdout_total_bytes,
160            stderr_total_bytes: observation.stderr_total_bytes,
161            encoding: observation.encoding,
162            exit_code: observation.exit_code,
163            finished_at: observation.finished_at,
164            signal: observation.signal,
165            duration_ms: observation.duration_ms,
166            compression,
167        },
168    )
169    .print();
170
171    Ok(())
172}