Skip to main content

agent_exec/
restart.rs

1//! Implementation of the `restart` sub-command.
2//!
3//! `restart` reuses an existing job directory and persisted `meta.json` definition
4//! while replacing the current process, if any, with a fresh supervisor launch.
5
6use anyhow::{Context, Result};
7use tracing::{info, warn};
8
9use crate::jobstore::{InvalidJobState, JobDir, resolve_root};
10use crate::run::{
11    SpawnSupervisorParams, mask_env_vars, observe_inline_output, spawn_supervisor_process,
12};
13use crate::schema::{JobStatus, Response, RunData};
14
15const TERMINATION_BUDGET: std::time::Duration = std::time::Duration::from_secs(5);
16const TERMINATION_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
17
18/// Options for the `restart` sub-command.
19#[derive(Debug)]
20pub struct RestartOpts<'a> {
21    /// Job ID of an existing job.
22    pub job_id: &'a str,
23    /// Override for jobs root directory.
24    pub root: Option<&'a str>,
25    /// Signal used to terminate a currently running process tree.
26    pub signal: &'a str,
27    /// Disable best-effort auto-GC for this invocation.
28    pub no_auto_gc: bool,
29    /// Optional auto-GC retention override.
30    pub auto_gc_older_than: Option<String>,
31    /// Optional auto-GC max-jobs override.
32    pub auto_gc_max_jobs: Option<u64>,
33    /// Optional auto-GC max-bytes override.
34    pub auto_gc_max_bytes: Option<u64>,
35    /// Base auto-GC settings resolved from config/defaults.
36    pub auto_gc_config: crate::gc::AutoGcConfig,
37    /// Wait for inline output observation before returning.
38    pub wait: bool,
39    /// Maximum wait duration in seconds for inline observation.
40    pub until_seconds: u64,
41    /// Wait indefinitely for terminal state / observation budget.
42    pub forever: bool,
43    /// Maximum bytes to include from the head of each stream.
44    pub max_bytes: u64,
45    pub compression_mode: crate::compress::CompressionMode,
46}
47
48/// Execute `restart`: replace an existing job's current run and return JSON.
49pub fn execute(opts: RestartOpts) -> Result<()> {
50    let elapsed_start = std::time::Instant::now();
51    let root = resolve_root(opts.root);
52    let job_dir = JobDir::open(&root, opts.job_id)?;
53
54    let meta = job_dir.read_meta()?;
55    if meta.job_id() != job_dir.job_id {
56        return Err(anyhow::Error::new(InvalidJobState(format!(
57            "job {} metadata identity mismatch: meta.json has {}",
58            job_dir.job_id,
59            meta.job_id()
60        ))));
61    }
62
63    let state = job_dir.read_state()?;
64    info!(
65        job_id = %job_dir.job_id,
66        state = %state.status().as_str(),
67        "restarting job"
68    );
69
70    if *state.status() == JobStatus::Running {
71        terminate_running_job(&job_dir, opts.signal)?;
72    }
73
74    reset_per_run_artifacts(&job_dir)?;
75
76    let full_log_path = job_dir.full_log_path().display().to_string();
77    let shell_wrapper = meta
78        .shell_wrapper
79        .clone()
80        .unwrap_or_else(crate::config::default_shell_wrapper);
81
82    let (supervisor_pid, started_at) = spawn_supervisor_process(
83        &job_dir,
84        SpawnSupervisorParams {
85            job_id: job_dir.job_id.clone(),
86            root: root.clone(),
87            full_log_path,
88            timeout_ms: meta.timeout_ms,
89            kill_after_ms: meta.kill_after_ms,
90            cwd: meta.cwd.clone(),
91            env_vars: meta.env_vars_runtime.clone(),
92            env_files: meta.env_files.clone(),
93            inherit_env: meta.inherit_env,
94            stdin_file: meta.stdin_file.clone(),
95            progress_every_ms: meta.progress_every_ms,
96            notify_command: meta
97                .notification
98                .as_ref()
99                .and_then(|n| n.notify_command.clone()),
100            notify_file: meta
101                .notification
102                .as_ref()
103                .and_then(|n| n.notify_file.clone()),
104            shell_wrapper,
105            command: meta.command.clone(),
106        },
107    )?;
108
109    info!(
110        job_id = %job_dir.job_id,
111        supervisor_pid,
112        started_at = %started_at,
113        "job restarted"
114    );
115
116    if !opts.no_auto_gc {
117        let mut auto_cfg = opts.auto_gc_config.clone();
118        if let Some(v) = opts.auto_gc_older_than {
119            auto_cfg.older_than = v;
120        }
121        if let Some(v) = opts.auto_gc_max_jobs {
122            auto_cfg.max_jobs = usize::try_from(v).ok();
123        }
124        if let Some(v) = opts.auto_gc_max_bytes {
125            auto_cfg.max_bytes = Some(v);
126        }
127        crate::gc::maybe_run_auto_gc(&root, &auto_cfg);
128    }
129
130    let stdout_log_path = job_dir.stdout_path().display().to_string();
131    let stderr_log_path = job_dir.stderr_path().display().to_string();
132    let masked_env_vars = mask_env_vars(&meta.env_vars_runtime, &meta.mask);
133    let observation = observe_inline_output(
134        &job_dir,
135        opts.wait,
136        opts.until_seconds,
137        opts.forever,
138        opts.max_bytes,
139    )?;
140    let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
141    let compression = crate::compress::compress(crate::compress::CompressionInput {
142        command: &meta.command,
143        stdout: &observation.stdout,
144        stderr: &observation.stderr,
145        stdout_original_bytes: observation.stdout_total_bytes,
146        stderr_original_bytes: observation.stderr_total_bytes,
147        mode: opts.compression_mode,
148    });
149
150    Response::new(
151        "restart",
152        RunData {
153            job_id: job_dir.job_id.clone(),
154            state: observation.state,
155            tags: meta.tags.clone(),
156            env_vars: masked_env_vars,
157            stdout_log_path,
158            stderr_log_path,
159            elapsed_ms,
160            waited_ms: observation.waited_ms,
161            stdout: observation.stdout,
162            stderr: observation.stderr,
163            stdout_range: observation.stdout_range,
164            stderr_range: observation.stderr_range,
165            stdout_total_bytes: observation.stdout_total_bytes,
166            stderr_total_bytes: observation.stderr_total_bytes,
167            encoding: observation.encoding,
168            exit_code: observation.exit_code,
169            finished_at: observation.finished_at,
170            signal: observation.signal,
171            duration_ms: observation.duration_ms,
172            compression,
173        },
174    )
175    .print();
176
177    Ok(())
178}
179
180fn terminate_running_job(job_dir: &JobDir, signal: &str) -> Result<()> {
181    info!(job_id = %job_dir.job_id, signal, "terminating running job before restart");
182
183    let original_pid = job_dir.read_state()?.pid;
184    let signal_result = crate::kill::execute_inner(crate::kill::KillOpts {
185        job_id: &job_dir.job_id,
186        root: job_dir.path.parent().and_then(|p| p.to_str()),
187        signal,
188        no_wait: false,
189    })?;
190
191    if matches!(signal_result.state.as_deref(), Some("running")) {
192        warn!(
193            job_id = %job_dir.job_id,
194            signal,
195            "restart termination observation still reported running; escalating to KILL"
196        );
197        crate::kill::execute_inner(crate::kill::KillOpts {
198            job_id: &job_dir.job_id,
199            root: job_dir.path.parent().and_then(|p| p.to_str()),
200            signal: "KILL",
201            no_wait: false,
202        })?;
203    }
204
205    let deadline = std::time::Instant::now() + TERMINATION_BUDGET;
206    loop {
207        let current = job_dir.read_state()?;
208        let state_is_terminal = !current.status().is_non_terminal();
209        let original_process_gone = original_pid.map(process_is_gone).unwrap_or(true);
210        if state_is_terminal && original_process_gone {
211            info!(
212                job_id = %job_dir.job_id,
213                state = %current.status().as_str(),
214                original_pid = ?original_pid,
215                "old job run reached terminal state before restart relaunch"
216            );
217            return Ok(());
218        }
219        if std::time::Instant::now() >= deadline {
220            return Err(anyhow::Error::new(InvalidJobState(format!(
221                "job {} did not terminate within restart budget (state_terminal={}, original_pid_gone={})",
222                job_dir.job_id, state_is_terminal, original_process_gone
223            ))));
224        }
225        std::thread::sleep(TERMINATION_POLL_INTERVAL);
226    }
227}
228
229fn process_is_gone(pid: u32) -> bool {
230    #[cfg(unix)]
231    {
232        // SAFETY: kill(pid, 0) does not send a signal; it only probes process existence.
233        let ret = unsafe { libc::kill(pid as libc::pid_t, 0) };
234        if ret == 0 {
235            return false;
236        }
237        std::io::Error::last_os_error().raw_os_error() == Some(libc::ESRCH)
238    }
239    #[cfg(not(unix))]
240    {
241        // On non-Unix platforms the shared kill path owns process-tree handling;
242        // state observation is the portable confirmation available here.
243        let _ = pid;
244        true
245    }
246}
247
248fn reset_per_run_artifacts(job_dir: &JobDir) -> Result<()> {
249    for path in [
250        job_dir.stdout_path(),
251        job_dir.stderr_path(),
252        job_dir.full_log_path(),
253    ] {
254        std::fs::OpenOptions::new()
255            .create(true)
256            .write(true)
257            .truncate(true)
258            .open(&path)
259            .with_context(|| format!("truncate per-run artifact {}", path.display()))?;
260    }
261
262    let completion_event_path = job_dir.completion_event_path();
263    match std::fs::remove_file(&completion_event_path) {
264        Ok(()) => {}
265        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
266        Err(e) => {
267            return Err(e)
268                .with_context(|| format!("remove stale {}", completion_event_path.display()));
269        }
270    }
271
272    Ok(())
273}