1use std::fs::{self, File, OpenOptions};
2use std::io::{self, Read, Write};
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7
8use crate::backup::hash_session;
9
10use super::BgTaskStatus;
11
12const SCHEMA_VERSION: u32 = 2;
13
14#[derive(Debug, Clone)]
15pub struct TaskPaths {
16 pub dir: PathBuf,
17 pub json: PathBuf,
18 pub stdout: PathBuf,
19 pub stderr: PathBuf,
20 pub exit: PathBuf,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct PersistedTask {
25 pub schema_version: u32,
26 pub task_id: String,
27 pub session_id: String,
28 pub command: String,
29 pub workdir: PathBuf,
30 #[serde(default)]
31 pub project_root: Option<PathBuf>,
32 pub status: BgTaskStatus,
33 pub started_at: u64,
34 pub finished_at: Option<u64>,
35 pub duration_ms: Option<u64>,
36 pub timeout_ms: Option<u64>,
37 pub exit_code: Option<i32>,
38 pub child_pid: Option<u32>,
39 pub pgid: Option<i32>,
40 pub completion_delivered: bool,
41 #[serde(default = "default_notify_on_completion")]
42 pub notify_on_completion: bool,
43 #[serde(default = "default_compressed")]
48 pub compressed: bool,
49 pub status_reason: Option<String>,
50}
51
52fn default_notify_on_completion() -> bool {
53 true
54}
55
56fn default_compressed() -> bool {
57 true
58}
59
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub enum ExitMarker {
62 Code(i32),
63 Killed,
64}
65
66impl PersistedTask {
67 pub fn starting(
68 task_id: String,
69 session_id: String,
70 command: String,
71 workdir: PathBuf,
72 project_root: Option<PathBuf>,
73 timeout_ms: Option<u64>,
74 notify_on_completion: bool,
75 compressed: bool,
76 ) -> Self {
77 Self {
78 schema_version: SCHEMA_VERSION,
79 task_id,
80 session_id,
81 command,
82 workdir,
83 project_root,
84 status: BgTaskStatus::Starting,
85 started_at: unix_millis(),
86 finished_at: None,
87 duration_ms: None,
88 timeout_ms,
89 exit_code: None,
90 child_pid: None,
91 pgid: None,
92 completion_delivered: !notify_on_completion,
93 notify_on_completion,
94 compressed,
95 status_reason: None,
96 }
97 }
98
99 pub fn is_terminal(&self) -> bool {
100 self.status.is_terminal()
101 }
102
103 pub fn mark_running(&mut self, child_pid: u32, pgid: i32) {
104 self.status = BgTaskStatus::Running;
105 self.child_pid = Some(child_pid);
106 self.pgid = Some(pgid);
107 }
108
109 pub fn mark_terminal(
110 &mut self,
111 status: BgTaskStatus,
112 exit_code: Option<i32>,
113 reason: Option<String>,
114 ) {
115 let finished_at = unix_millis();
116 self.status = status;
117 self.exit_code = exit_code;
118 self.finished_at = Some(finished_at);
119 self.duration_ms = Some(finished_at.saturating_sub(self.started_at));
120 self.child_pid = None;
121 self.status_reason = reason;
122 self.completion_delivered = !self.notify_on_completion;
123 }
124}
125
126pub fn session_tasks_dir(storage_dir: &Path, session_id: &str) -> PathBuf {
127 storage_dir
128 .join("bash-tasks")
129 .join(hash_session(session_id))
130}
131
132pub fn task_paths(storage_dir: &Path, session_id: &str, task_id: &str) -> TaskPaths {
133 let dir = session_tasks_dir(storage_dir, session_id);
134 TaskPaths {
135 json: dir.join(format!("{task_id}.json")),
136 stdout: dir.join(format!("{task_id}.stdout")),
137 stderr: dir.join(format!("{task_id}.stderr")),
138 exit: dir.join(format!("{task_id}.exit")),
139 dir,
140 }
141}
142
143pub fn read_task(path: &Path) -> io::Result<PersistedTask> {
144 let content = fs::read_to_string(path)?;
145 serde_json::from_str(&content).map_err(io::Error::other)
146}
147
148pub fn write_task(path: &Path, task: &PersistedTask) -> io::Result<()> {
149 if let Some(parent) = path.parent() {
150 fs::create_dir_all(parent)?;
151 }
152 let content = serde_json::to_vec_pretty(task).map_err(io::Error::other)?;
153 atomic_write(path, &content, &task.task_id)
154}
155
156pub(super) fn delete_task_bundle(paths: &TaskPaths) -> io::Result<()> {
157 let mut first_error = None;
158 for path in task_bundle_files(paths) {
159 if let Err(error) = remove_file_if_present(&path) {
160 if first_error.is_none() {
161 first_error = Some(error);
162 }
163 }
164 }
165
166 if let Some(error) = first_error {
167 return Err(error);
168 }
169
170 match fs::remove_dir(&paths.dir) {
171 Ok(()) => Ok(()),
172 Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
173 Err(error) if error.kind() == io::ErrorKind::DirectoryNotEmpty => Ok(()),
174 Err(error) => Err(error),
175 }
176}
177
178fn task_bundle_files(paths: &TaskPaths) -> Vec<PathBuf> {
179 let mut files = vec![
180 paths.json.clone(),
181 paths.stdout.clone(),
182 paths.stderr.clone(),
183 paths.exit.clone(),
184 ];
185 if let Some(stem) = paths.json.file_stem().and_then(|stem| stem.to_str()) {
186 for extension in ["ps1", "bat", "sh"] {
190 files.push(paths.dir.join(format!("{stem}.{extension}")));
191 }
192 }
193 files
194}
195
196fn remove_file_if_present(path: &Path) -> io::Result<()> {
197 match fs::remove_file(path) {
198 Ok(()) => Ok(()),
199 Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
200 Err(error) => Err(error),
201 }
202}
203
204pub fn update_task<F>(path: &Path, update: F) -> io::Result<PersistedTask>
205where
206 F: FnOnce(&mut PersistedTask),
207{
208 let mut task = read_task(path)?;
209 let original_terminal = task.is_terminal();
210 let original = task.clone();
211 update(&mut task);
212 if original_terminal {
213 let completion_delivered = task.completion_delivered;
214 task = original;
215 task.completion_delivered = completion_delivered;
216 }
217 write_task(path, &task)?;
218 Ok(task)
219}
220
221pub fn write_kill_marker_if_absent(path: &Path) -> io::Result<()> {
222 if path.exists() {
223 return Ok(());
224 }
225 atomic_write(path, b"killed", "kill")
226}
227
228pub fn read_exit_marker(path: &Path) -> io::Result<Option<ExitMarker>> {
229 let mut file = match File::open(path) {
230 Ok(file) => file,
231 Err(error) if error.kind() == io::ErrorKind::NotFound => return Ok(None),
232 Err(error) => return Err(error),
233 };
234 let mut content = String::new();
235 file.read_to_string(&mut content)?;
236 let content = content.trim();
237 if content.is_empty() {
238 return Ok(None);
239 }
240 if content == "killed" {
241 return Ok(Some(ExitMarker::Killed));
242 }
243 match content.parse::<i32>() {
244 Ok(code) => Ok(Some(ExitMarker::Code(code))),
245 Err(_) => Ok(None),
246 }
247}
248
249pub fn atomic_write(path: &Path, content: &[u8], task_id: &str) -> io::Result<()> {
250 let parent = path.parent().unwrap_or_else(|| Path::new("."));
251 fs::create_dir_all(parent)?;
252 let file_name = path
253 .file_name()
254 .and_then(|name| name.to_str())
255 .unwrap_or("task");
256 let tmp = parent.join(format!(
257 ".{file_name}.tmp.{}.{}",
258 std::process::id(),
259 sanitize_task_id(task_id)
260 ));
261 {
262 let mut file = OpenOptions::new()
263 .create(true)
264 .truncate(true)
265 .write(true)
266 .open(&tmp)?;
267 file.write_all(content)?;
268 file.sync_all()?;
269 }
270 fs::rename(&tmp, path)?;
271 Ok(())
272}
273
274fn sanitize_task_id(task_id: &str) -> String {
275 task_id
276 .chars()
277 .map(|ch| match ch {
278 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
279 _ => '_',
280 })
281 .collect()
282}
283
284pub fn create_capture_file(path: &Path) -> io::Result<File> {
285 if let Some(parent) = path.parent() {
286 fs::create_dir_all(parent)?;
287 }
288 File::create(path)
289}
290
291pub fn unix_millis() -> u64 {
292 SystemTime::now()
293 .duration_since(UNIX_EPOCH)
294 .map(|duration| duration.as_millis() as u64)
295 .unwrap_or(0)
296}
297
298#[cfg(test)]
299mod tests {
300 use std::thread;
301
302 use super::*;
303
304 #[test]
305 fn atomic_write_temp_names_include_task_id() {
306 let dir = tempfile::tempdir().expect("create temp dir");
307 let path = dir.path().join("task.json");
308
309 let left_path = path.clone();
310 let left = thread::spawn(move || atomic_write(&left_path, b"left", "task-left"));
311 let right_path = path.clone();
312 let right = thread::spawn(move || atomic_write(&right_path, b"right", "task-right"));
313
314 left.join().expect("join left").expect("write left");
315 right.join().expect("join right").expect("write right");
316
317 let content = fs::read_to_string(&path).expect("read final content");
318 assert!(content == "left" || content == "right");
319 assert!(!dir
320 .path()
321 .join(format!(".task.json.tmp.{}", std::process::id()))
322 .exists());
323 }
324}