Skip to main content

agent_exec/
start.rs

1//! Implementation of the `start` sub-command.
2//!
3//! `start` launches a previously `create`d job.  It reads the persisted
4//! execution definition from `meta.json`, validates that the job is in
5//! `created` state, and then spawns the supervisor process.
6
7use anyhow::Result;
8use tracing::info;
9
10use crate::jobstore::{InvalidJobState, JobDir, resolve_root};
11use crate::run::{
12    SnapshotWaitOpts, SpawnSupervisorParams, mask_env_vars, run_snapshot_wait,
13    spawn_supervisor_process,
14};
15use crate::schema::{JobStatus, Response, RunData};
16
17/// Options for the `start` sub-command.
18#[derive(Debug)]
19pub struct StartOpts<'a> {
20    /// Job ID of a previously created job.
21    pub job_id: &'a str,
22    /// Override for jobs root directory.
23    pub root: Option<&'a str>,
24    /// Milliseconds to wait before returning; 0 = return immediately.
25    pub snapshot_after: u64,
26    /// Number of tail lines to include in snapshot.
27    pub tail_lines: u64,
28    /// Max bytes for tail.
29    pub max_bytes: u64,
30    /// If true, wait for the job to reach a terminal state before returning.
31    pub wait: bool,
32    /// Poll interval in milliseconds when `wait` is true.
33    pub wait_poll_ms: u64,
34}
35
36/// Execute `start`: launch a created job and return JSON.
37pub fn execute(opts: StartOpts) -> Result<()> {
38    let root = resolve_root(opts.root);
39    let job_dir = JobDir::open(&root, opts.job_id)?;
40
41    let meta = job_dir.read_meta()?;
42    let state = job_dir.read_state()?;
43
44    // Only jobs in `created` state can be started.
45    if *state.status() != JobStatus::Created {
46        return Err(anyhow::Error::new(InvalidJobState(format!(
47            "job {} is in '{}' state; only 'created' jobs can be started",
48            opts.job_id,
49            state.status().as_str()
50        ))));
51    }
52
53    info!(job_id = %opts.job_id, "starting created job");
54
55    // Determine full.log path.
56    let full_log_path = job_dir.full_log_path().display().to_string();
57
58    // Resolve shell wrapper: use persisted value from meta, or re-resolve from config.
59    let shell_wrapper = if let Some(ref w) = meta.shell_wrapper {
60        w.clone()
61    } else {
62        crate::config::default_shell_wrapper()
63    };
64
65    // Use the persisted runtime env vars (unmasked) for the supervisor call.
66    // env_vars_runtime stores the actual KEY=VALUE pairs written by `create`; this
67    // ensures that `--mask KEY` only redacts the display/metadata view while the real
68    // value is still applied to the child process environment at start time.
69    // env_files are re-read here (deferred loading) so file contents reflect the
70    // current state of the files at start time, not at create time.
71
72    let (supervisor_pid, started_at) = spawn_supervisor_process(
73        &job_dir,
74        SpawnSupervisorParams {
75            job_id: job_dir.job_id.clone(),
76            root: root.clone(),
77            full_log_path: full_log_path.clone(),
78            timeout_ms: meta.timeout_ms,
79            kill_after_ms: meta.kill_after_ms,
80            cwd: meta.cwd.clone(),
81            env_vars: meta.env_vars_runtime.clone(),
82            env_files: meta.env_files.clone(),
83            inherit_env: meta.inherit_env,
84            progress_every_ms: meta.progress_every_ms,
85            notify_command: meta
86                .notification
87                .as_ref()
88                .and_then(|n| n.notify_command.clone()),
89            notify_file: meta
90                .notification
91                .as_ref()
92                .and_then(|n| n.notify_file.clone()),
93            shell_wrapper,
94            command: meta.command.clone(),
95        },
96    )?;
97
98    info!(job_id = %opts.job_id, supervisor_pid, started_at = %started_at, "job started");
99
100    let stdout_log_path = job_dir.stdout_path().display().to_string();
101    let stderr_log_path = job_dir.stderr_path().display().to_string();
102
103    let elapsed_start = std::time::Instant::now();
104
105    let (final_state, exit_code_opt, finished_at_opt, snapshot, final_snapshot_opt, waited_ms) =
106        run_snapshot_wait(
107            &job_dir,
108            &SnapshotWaitOpts {
109                snapshot_after: opts.snapshot_after,
110                tail_lines: opts.tail_lines,
111                max_bytes: opts.max_bytes,
112                wait: opts.wait,
113                wait_poll_ms: opts.wait_poll_ms,
114            },
115        );
116
117    let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
118
119    // The response uses the masked env_vars (display view), not the runtime values.
120    let masked_env_vars = mask_env_vars(&meta.env_vars_runtime, &meta.mask);
121
122    Response::new(
123        "start",
124        RunData {
125            job_id: job_dir.job_id.clone(),
126            state: final_state,
127            tags: meta.tags.clone(),
128            env_vars: masked_env_vars,
129            snapshot,
130            stdout_log_path,
131            stderr_log_path,
132            waited_ms,
133            elapsed_ms,
134            exit_code: exit_code_opt,
135            finished_at: finished_at_opt,
136            final_snapshot: final_snapshot_opt,
137        },
138    )
139    .print();
140
141    Ok(())
142}