1use anyhow::{Context, Result};
7use tracing::{info, warn};
8
9use crate::jobstore::{InvalidJobState, JobDir, resolve_root};
10use crate::run::{
11 SpawnSupervisorParams, mask_env_vars, observe_inline_output, spawn_supervisor_process,
12};
13use crate::schema::{JobStatus, Response, RunData};
14
15const TERMINATION_BUDGET: std::time::Duration = std::time::Duration::from_secs(5);
16const TERMINATION_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
17
18#[derive(Debug)]
20pub struct RestartOpts<'a> {
21 pub job_id: &'a str,
23 pub root: Option<&'a str>,
25 pub signal: &'a str,
27 pub no_auto_gc: bool,
29 pub auto_gc_older_than: Option<String>,
31 pub auto_gc_max_jobs: Option<u64>,
33 pub auto_gc_max_bytes: Option<u64>,
35 pub auto_gc_config: crate::gc::AutoGcConfig,
37 pub wait: bool,
39 pub until_seconds: u64,
41 pub forever: bool,
43 pub max_bytes: u64,
45 pub compression_mode: crate::compress::CompressionMode,
46}
47
48pub fn execute(opts: RestartOpts) -> Result<()> {
50 let elapsed_start = std::time::Instant::now();
51 let root = resolve_root(opts.root);
52 let job_dir = JobDir::open(&root, opts.job_id)?;
53
54 let meta = job_dir.read_meta()?;
55 if meta.job_id() != job_dir.job_id {
56 return Err(anyhow::Error::new(InvalidJobState(format!(
57 "job {} metadata identity mismatch: meta.json has {}",
58 job_dir.job_id,
59 meta.job_id()
60 ))));
61 }
62
63 let state = job_dir.read_state()?;
64 info!(
65 job_id = %job_dir.job_id,
66 state = %state.status().as_str(),
67 "restarting job"
68 );
69
70 if *state.status() == JobStatus::Running {
71 terminate_running_job(&job_dir, opts.signal)?;
72 }
73
74 reset_per_run_artifacts(&job_dir)?;
75
76 let full_log_path = job_dir.full_log_path().display().to_string();
77 let shell_wrapper = meta
78 .shell_wrapper
79 .clone()
80 .unwrap_or_else(crate::config::default_shell_wrapper);
81
82 let (supervisor_pid, started_at) = spawn_supervisor_process(
83 &job_dir,
84 SpawnSupervisorParams {
85 job_id: job_dir.job_id.clone(),
86 root: root.clone(),
87 full_log_path,
88 timeout_ms: meta.timeout_ms,
89 kill_after_ms: meta.kill_after_ms,
90 cwd: meta.cwd.clone(),
91 env_vars: meta.env_vars_runtime.clone(),
92 env_files: meta.env_files.clone(),
93 inherit_env: meta.inherit_env,
94 stdin_file: meta.stdin_file.clone(),
95 progress_every_ms: meta.progress_every_ms,
96 notify_command: meta
97 .notification
98 .as_ref()
99 .and_then(|n| n.notify_command.clone()),
100 notify_file: meta
101 .notification
102 .as_ref()
103 .and_then(|n| n.notify_file.clone()),
104 shell_wrapper,
105 command: meta.command.clone(),
106 },
107 )?;
108
109 info!(
110 job_id = %job_dir.job_id,
111 supervisor_pid,
112 started_at = %started_at,
113 "job restarted"
114 );
115
116 if !opts.no_auto_gc {
117 let mut auto_cfg = opts.auto_gc_config.clone();
118 if let Some(v) = opts.auto_gc_older_than {
119 auto_cfg.older_than = v;
120 }
121 if let Some(v) = opts.auto_gc_max_jobs {
122 auto_cfg.max_jobs = usize::try_from(v).ok();
123 }
124 if let Some(v) = opts.auto_gc_max_bytes {
125 auto_cfg.max_bytes = Some(v);
126 }
127 crate::gc::maybe_run_auto_gc(&root, &auto_cfg);
128 }
129
130 let stdout_log_path = job_dir.stdout_path().display().to_string();
131 let stderr_log_path = job_dir.stderr_path().display().to_string();
132 let masked_env_vars = mask_env_vars(&meta.env_vars_runtime, &meta.mask);
133 let observation = observe_inline_output(
134 &job_dir,
135 opts.wait,
136 opts.until_seconds,
137 opts.forever,
138 opts.max_bytes,
139 )?;
140 let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
141 let compression = crate::compress::compress(crate::compress::CompressionInput {
142 command: &meta.command,
143 stdout: &observation.stdout,
144 stderr: &observation.stderr,
145 stdout_original_bytes: observation.stdout_total_bytes,
146 stderr_original_bytes: observation.stderr_total_bytes,
147 mode: opts.compression_mode,
148 });
149
150 Response::new(
151 "restart",
152 RunData {
153 job_id: job_dir.job_id.clone(),
154 state: observation.state,
155 tags: meta.tags.clone(),
156 env_vars: masked_env_vars,
157 stdout_log_path,
158 stderr_log_path,
159 elapsed_ms,
160 waited_ms: observation.waited_ms,
161 stdout: observation.stdout,
162 stderr: observation.stderr,
163 stdout_range: observation.stdout_range,
164 stderr_range: observation.stderr_range,
165 stdout_total_bytes: observation.stdout_total_bytes,
166 stderr_total_bytes: observation.stderr_total_bytes,
167 encoding: observation.encoding,
168 exit_code: observation.exit_code,
169 finished_at: observation.finished_at,
170 signal: observation.signal,
171 duration_ms: observation.duration_ms,
172 compression,
173 },
174 )
175 .print();
176
177 Ok(())
178}
179
180fn terminate_running_job(job_dir: &JobDir, signal: &str) -> Result<()> {
181 info!(job_id = %job_dir.job_id, signal, "terminating running job before restart");
182
183 let original_pid = job_dir.read_state()?.pid;
184 let signal_result = crate::kill::execute_inner(crate::kill::KillOpts {
185 job_id: &job_dir.job_id,
186 root: job_dir.path.parent().and_then(|p| p.to_str()),
187 signal,
188 no_wait: false,
189 })?;
190
191 if matches!(signal_result.state.as_deref(), Some("running")) {
192 warn!(
193 job_id = %job_dir.job_id,
194 signal,
195 "restart termination observation still reported running; escalating to KILL"
196 );
197 crate::kill::execute_inner(crate::kill::KillOpts {
198 job_id: &job_dir.job_id,
199 root: job_dir.path.parent().and_then(|p| p.to_str()),
200 signal: "KILL",
201 no_wait: false,
202 })?;
203 }
204
205 let deadline = std::time::Instant::now() + TERMINATION_BUDGET;
206 loop {
207 let current = job_dir.read_state()?;
208 let state_is_terminal = !current.status().is_non_terminal();
209 let original_process_gone = original_pid.map(process_is_gone).unwrap_or(true);
210 if state_is_terminal && original_process_gone {
211 info!(
212 job_id = %job_dir.job_id,
213 state = %current.status().as_str(),
214 original_pid = ?original_pid,
215 "old job run reached terminal state before restart relaunch"
216 );
217 return Ok(());
218 }
219 if std::time::Instant::now() >= deadline {
220 return Err(anyhow::Error::new(InvalidJobState(format!(
221 "job {} did not terminate within restart budget (state_terminal={}, original_pid_gone={})",
222 job_dir.job_id, state_is_terminal, original_process_gone
223 ))));
224 }
225 std::thread::sleep(TERMINATION_POLL_INTERVAL);
226 }
227}
228
229fn process_is_gone(pid: u32) -> bool {
230 #[cfg(unix)]
231 {
232 let ret = unsafe { libc::kill(pid as libc::pid_t, 0) };
234 if ret == 0 {
235 return false;
236 }
237 std::io::Error::last_os_error().raw_os_error() == Some(libc::ESRCH)
238 }
239 #[cfg(not(unix))]
240 {
241 let _ = pid;
244 true
245 }
246}
247
248fn reset_per_run_artifacts(job_dir: &JobDir) -> Result<()> {
249 for path in [
250 job_dir.stdout_path(),
251 job_dir.stderr_path(),
252 job_dir.full_log_path(),
253 ] {
254 std::fs::OpenOptions::new()
255 .create(true)
256 .write(true)
257 .truncate(true)
258 .open(&path)
259 .with_context(|| format!("truncate per-run artifact {}", path.display()))?;
260 }
261
262 let completion_event_path = job_dir.completion_event_path();
263 match std::fs::remove_file(&completion_event_path) {
264 Ok(()) => {}
265 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
266 Err(e) => {
267 return Err(e)
268 .with_context(|| format!("remove stale {}", completion_event_path.display()));
269 }
270 }
271
272 Ok(())
273}