1use std::fs::{self, File, OpenOptions};
2use std::io::{self, Read, Write};
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7
8use crate::backup::hash_session;
9use crate::db::bash_tasks::BashTaskRow;
10
11use super::BgTaskStatus;
12
13pub const SCHEMA_VERSION: u32 = 4;
14
15#[derive(Debug, Clone)]
16pub struct TaskPaths {
17 pub dir: PathBuf,
18 pub json: PathBuf,
19 pub stdout: PathBuf,
20 pub stderr: PathBuf,
21 pub exit: PathBuf,
22 pub pty: PathBuf,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
26#[serde(rename_all = "lowercase")]
27pub enum BgMode {
28 #[default]
29 Pipes,
30 Pty,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct PersistedTask {
35 pub schema_version: u32,
36 pub task_id: String,
37 pub session_id: String,
38 pub command: String,
39 #[serde(default)]
40 pub mode: BgMode,
41 pub workdir: PathBuf,
42 #[serde(default)]
43 pub project_root: Option<PathBuf>,
44 pub status: BgTaskStatus,
45 pub started_at: u64,
46 pub finished_at: Option<u64>,
47 pub duration_ms: Option<u64>,
48 pub timeout_ms: Option<u64>,
49 pub exit_code: Option<i32>,
50 pub child_pid: Option<u32>,
51 pub pgid: Option<i32>,
52 pub completion_delivered: bool,
53 #[serde(default = "default_notify_on_completion")]
54 pub notify_on_completion: bool,
55 #[serde(default = "default_compressed")]
60 pub compressed: bool,
61 #[serde(default)]
62 pub pty_rows: Option<u16>,
63 #[serde(default)]
64 pub pty_cols: Option<u16>,
65 pub status_reason: Option<String>,
66}
67
68fn default_notify_on_completion() -> bool {
69 true
70}
71
72fn default_compressed() -> bool {
73 true
74}
75
76#[derive(Debug, Clone, PartialEq, Eq)]
77pub enum ExitMarker {
78 Code(i32),
79 Killed,
80}
81
82impl PersistedTask {
83 pub fn starting(
84 task_id: String,
85 session_id: String,
86 command: String,
87 workdir: PathBuf,
88 project_root: Option<PathBuf>,
89 timeout_ms: Option<u64>,
90 notify_on_completion: bool,
91 compressed: bool,
92 ) -> Self {
93 Self {
94 schema_version: SCHEMA_VERSION,
95 task_id,
96 session_id,
97 command,
98 mode: BgMode::Pipes,
99 workdir,
100 project_root,
101 status: BgTaskStatus::Starting,
102 started_at: unix_millis(),
103 finished_at: None,
104 duration_ms: None,
105 timeout_ms,
106 exit_code: None,
107 child_pid: None,
108 pgid: None,
109 completion_delivered: !notify_on_completion,
110 notify_on_completion,
111 compressed,
112 pty_rows: None,
113 pty_cols: None,
114 status_reason: None,
115 }
116 }
117
118 pub fn is_terminal(&self) -> bool {
119 self.status.is_terminal()
120 }
121
122 pub fn mark_running(&mut self, child_pid: u32, pgid: i32) {
123 self.status = BgTaskStatus::Running;
124 self.child_pid = Some(child_pid);
125 self.pgid = Some(pgid);
126 }
127
128 pub fn mark_terminal(
129 &mut self,
130 status: BgTaskStatus,
131 exit_code: Option<i32>,
132 reason: Option<String>,
133 ) {
134 let finished_at = unix_millis();
135 self.status = status;
136 self.exit_code = exit_code;
137 self.finished_at = Some(finished_at);
138 self.duration_ms = Some(finished_at.saturating_sub(self.started_at));
139 self.child_pid = None;
140 self.status_reason = reason;
141 self.completion_delivered = !self.notify_on_completion;
142 }
143
144 pub fn to_bash_task_row(
145 &self,
146 harness: &str,
147 paths: &TaskPaths,
148 ) -> Result<BashTaskRow, serde_json::Error> {
149 let project_root = self.project_root.as_deref().unwrap_or(&self.workdir);
150 let output_bytes = capture_output_bytes(&self.mode, paths);
151 let stdout_path = match self.mode {
152 BgMode::Pipes => Some(paths.stdout.display().to_string()),
153 BgMode::Pty => Some(paths.pty.display().to_string()),
154 };
155 let stderr_path = match self.mode {
156 BgMode::Pipes => Some(paths.stderr.display().to_string()),
157 BgMode::Pty => None,
158 };
159 let mut metadata = self.clone();
160 metadata.schema_version = SCHEMA_VERSION;
161 Ok(BashTaskRow {
162 harness: harness.to_string(),
163 session_id: self.session_id.clone(),
164 task_id: self.task_id.clone(),
165 project_key: crate::search_index::project_cache_key(project_root),
166 command: self.command.clone(),
167 cwd: self.workdir.display().to_string(),
168 status: status_name(&self.status).to_string(),
169 exit_code: self.exit_code,
170 pid: self.child_pid.map(i64::from),
171 pgid: self.pgid.map(i64::from),
172 started_at: self.started_at as i64,
173 completed_at: self.finished_at.map(|value| value as i64),
174 stdout_path,
175 stderr_path,
176 compressed: self.compressed,
177 timeout_ms: self.timeout_ms.map(|value| value as i64),
178 completion_delivered: self.completion_delivered,
179 output_bytes,
180 metadata: serde_json::to_string(&metadata)?,
181 })
182 }
183}
184
185impl From<BashTaskRow> for PersistedTask {
186 fn from(row: BashTaskRow) -> Self {
187 if let Ok(task) = serde_json::from_str::<PersistedTask>(&row.metadata) {
188 return task;
189 }
190
191 let status = match row.status.as_str() {
192 "starting" => BgTaskStatus::Starting,
193 "running" => BgTaskStatus::Running,
194 "killing" => BgTaskStatus::Killing,
195 "completed" => BgTaskStatus::Completed,
196 "failed" => BgTaskStatus::Failed,
197 "killed" => BgTaskStatus::Killed,
198 "timed_out" => BgTaskStatus::TimedOut,
199 _ => BgTaskStatus::Failed,
200 };
201 let started_at = u64::try_from(row.started_at).unwrap_or_default();
202 let finished_at = row.completed_at.and_then(|value| u64::try_from(value).ok());
203
204 PersistedTask {
205 schema_version: SCHEMA_VERSION,
206 task_id: row.task_id,
207 session_id: row.session_id,
208 command: row.command,
209 mode: BgMode::Pipes,
210 workdir: PathBuf::from(row.cwd),
211 project_root: None,
212 status,
213 started_at,
214 finished_at,
215 duration_ms: finished_at.map(|finished_at| finished_at.saturating_sub(started_at)),
216 timeout_ms: row.timeout_ms.and_then(|value| u64::try_from(value).ok()),
217 exit_code: row.exit_code,
218 child_pid: row.pid.and_then(|value| u32::try_from(value).ok()),
219 pgid: row.pgid.and_then(|value| i32::try_from(value).ok()),
220 completion_delivered: row.completion_delivered,
221 notify_on_completion: !row.completion_delivered,
222 compressed: row.compressed,
223 pty_rows: None,
224 pty_cols: None,
225 status_reason: None,
226 }
227 }
228}
229
230fn status_name(status: &BgTaskStatus) -> &'static str {
231 match status {
232 BgTaskStatus::Starting => "starting",
233 BgTaskStatus::Running => "running",
234 BgTaskStatus::Killing => "killing",
235 BgTaskStatus::Completed => "completed",
236 BgTaskStatus::Failed => "failed",
237 BgTaskStatus::Killed => "killed",
238 BgTaskStatus::TimedOut => "timed_out",
239 }
240}
241
242fn capture_output_bytes(mode: &BgMode, paths: &TaskPaths) -> Option<i64> {
243 match mode {
244 BgMode::Pipes => {
245 let stdout = fs::metadata(&paths.stdout)
246 .ok()
247 .map(|metadata| metadata.len());
248 let stderr = fs::metadata(&paths.stderr)
249 .ok()
250 .map(|metadata| metadata.len());
251 match (stdout, stderr) {
252 (Some(stdout), Some(stderr)) => Some(stdout.saturating_add(stderr) as i64),
253 (Some(bytes), None) | (None, Some(bytes)) => Some(bytes as i64),
254 (None, None) => None,
255 }
256 }
257 BgMode::Pty => fs::metadata(&paths.pty)
258 .ok()
259 .map(|metadata| metadata.len() as i64),
260 }
261}
262
263pub fn session_tasks_dir(storage_dir: &Path, session_id: &str) -> PathBuf {
264 storage_dir
265 .join("bash-tasks")
266 .join(hash_session(session_id))
267}
268
269pub fn task_paths(storage_dir: &Path, session_id: &str, task_id: &str) -> TaskPaths {
270 let dir = session_tasks_dir(storage_dir, session_id);
271 TaskPaths {
272 json: dir.join(format!("{task_id}.json")),
273 stdout: dir.join(format!("{task_id}.stdout")),
274 stderr: dir.join(format!("{task_id}.stderr")),
275 exit: dir.join(format!("{task_id}.exit")),
276 pty: dir.join(format!("{task_id}.pty")),
277 dir,
278 }
279}
280
281pub fn read_task(path: &Path) -> io::Result<PersistedTask> {
282 let content = fs::read_to_string(path)?;
283 let task: PersistedTask = serde_json::from_str(&content).map_err(io::Error::other)?;
284 if !matches!(task.schema_version, 2 | 3 | SCHEMA_VERSION) {
285 return Err(io::Error::new(
286 io::ErrorKind::InvalidData,
287 format!(
288 "unsupported background task schema_version {} (expected 2, 3, or {SCHEMA_VERSION})",
289 task.schema_version
290 ),
291 ));
292 }
293 Ok(task)
294}
295
296pub fn write_task(path: &Path, task: &PersistedTask) -> io::Result<()> {
297 if let Some(parent) = path.parent() {
298 fs::create_dir_all(parent)?;
299 }
300 let mut upgraded = task.clone();
301 upgraded.schema_version = SCHEMA_VERSION;
302 let content = serde_json::to_vec_pretty(&upgraded).map_err(io::Error::other)?;
303 atomic_write(path, &content, &upgraded.task_id)
304}
305
306pub(super) fn delete_task_bundle(paths: &TaskPaths) -> io::Result<()> {
307 let mut first_error = None;
308 for path in task_bundle_files(paths) {
309 if let Err(error) = remove_file_if_present(&path) {
310 if first_error.is_none() {
311 first_error = Some(error);
312 }
313 }
314 }
315
316 if let Some(error) = first_error {
317 return Err(error);
318 }
319
320 match fs::remove_dir(&paths.dir) {
321 Ok(()) => Ok(()),
322 Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
323 Err(error) if error.kind() == io::ErrorKind::DirectoryNotEmpty => Ok(()),
324 Err(error) => Err(error),
325 }
326}
327
328pub fn task_bundle_files(paths: &TaskPaths) -> Vec<PathBuf> {
329 let mut files = vec![
330 paths.json.clone(),
331 paths.stdout.clone(),
332 paths.stderr.clone(),
333 paths.exit.clone(),
334 paths.pty.clone(),
335 ];
336 if let Some(stem) = paths.json.file_stem().and_then(|stem| stem.to_str()) {
337 for extension in ["ps1", "bat", "sh"] {
341 files.push(paths.dir.join(format!("{stem}.{extension}")));
342 }
343 }
344 files
345}
346
347fn remove_file_if_present(path: &Path) -> io::Result<()> {
348 match fs::remove_file(path) {
349 Ok(()) => Ok(()),
350 Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
351 Err(error) => Err(error),
352 }
353}
354
355pub fn update_task<F>(path: &Path, update: F) -> io::Result<PersistedTask>
356where
357 F: FnOnce(&mut PersistedTask),
358{
359 let mut task = read_task(path)?;
360 let original_terminal = task.is_terminal();
361 let original = task.clone();
362 update(&mut task);
363 task.schema_version = SCHEMA_VERSION;
364 if original_terminal {
365 let completion_delivered = task.completion_delivered;
366 task = original;
367 task.completion_delivered = completion_delivered;
368 task.schema_version = SCHEMA_VERSION;
369 }
370 write_task(path, &task)?;
371 Ok(task)
372}
373
374pub fn write_kill_marker_if_absent(path: &Path) -> io::Result<()> {
375 if path.exists() {
376 return Ok(());
377 }
378 atomic_write(path, b"killed", "kill")
379}
380
381pub fn read_exit_marker(path: &Path) -> io::Result<Option<ExitMarker>> {
382 let mut file = match File::open(path) {
383 Ok(file) => file,
384 Err(error) if error.kind() == io::ErrorKind::NotFound => return Ok(None),
385 Err(error) => return Err(error),
386 };
387 let mut content = String::new();
388 file.read_to_string(&mut content)?;
389 let content = content.trim();
390 if content.is_empty() {
391 return Ok(None);
392 }
393 if content == "killed" {
394 return Ok(Some(ExitMarker::Killed));
395 }
396 match content.parse::<i32>() {
397 Ok(code) => Ok(Some(ExitMarker::Code(code))),
398 Err(_) => Ok(None),
399 }
400}
401
402pub fn atomic_write(path: &Path, content: &[u8], task_id: &str) -> io::Result<()> {
403 let parent = path.parent().unwrap_or_else(|| Path::new("."));
404 fs::create_dir_all(parent)?;
405 let file_name = path
406 .file_name()
407 .and_then(|name| name.to_str())
408 .unwrap_or("task");
409 let tmp = parent.join(format!(
410 ".{file_name}.tmp.{}.{}",
411 std::process::id(),
412 sanitize_task_id(task_id)
413 ));
414 {
415 let mut file = OpenOptions::new()
416 .create(true)
417 .truncate(true)
418 .write(true)
419 .open(&tmp)?;
420 file.write_all(content)?;
421 file.sync_all()?;
422 }
423 fs::rename(&tmp, path)?;
424 Ok(())
425}
426
427fn sanitize_task_id(task_id: &str) -> String {
428 task_id
429 .chars()
430 .map(|ch| match ch {
431 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
432 _ => '_',
433 })
434 .collect()
435}
436
437pub fn create_capture_file(path: &Path) -> io::Result<File> {
438 if let Some(parent) = path.parent() {
439 fs::create_dir_all(parent)?;
440 }
441 File::create(path)
442}
443
444pub fn unix_millis() -> u64 {
445 SystemTime::now()
446 .duration_since(UNIX_EPOCH)
447 .map(|duration| duration.as_millis() as u64)
448 .unwrap_or(0)
449}
450
451#[cfg(test)]
452mod tests {
453 use std::thread;
454
455 use super::*;
456
457 #[test]
458 fn atomic_write_temp_names_include_task_id() {
459 let dir = tempfile::tempdir().expect("create temp dir");
460 let path = dir.path().join("task.json");
461
462 let left_path = path.clone();
463 let left = thread::spawn(move || atomic_write(&left_path, b"left", "task-left"));
464 let right_path = path.clone();
465 let right = thread::spawn(move || atomic_write(&right_path, b"right", "task-right"));
466
467 left.join().expect("join left").expect("write left");
468 right.join().expect("join right").expect("write right");
469
470 let content = fs::read_to_string(&path).expect("read final content");
471 assert!(content == "left" || content == "right");
472 assert!(!dir
473 .path()
474 .join(format!(".task.json.tmp.{}", std::process::id()))
475 .exists());
476 }
477}