1use anyhow::{Context, Result};
10use directories::BaseDirs;
11use std::path::PathBuf;
12
13use crate::schema::{JobMeta, JobState, JobStatus};
14
15#[derive(Debug)]
18pub struct JobNotFound(pub String);
19
20impl std::fmt::Display for JobNotFound {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 write!(f, "job not found: {}", self.0)
23 }
24}
25
26impl std::error::Error for JobNotFound {}
27
28#[derive(Debug)]
31pub struct InvalidJobState(pub String);
32
33impl std::fmt::Display for InvalidJobState {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 write!(f, "invalid job state: {}", self.0)
36 }
37}
38
39impl std::error::Error for InvalidJobState {}
40
41pub fn resolve_root(cli_root: Option<&str>) -> PathBuf {
43 if let Some(root) = cli_root {
45 return PathBuf::from(root);
46 }
47
48 if let Ok(root) = std::env::var("AGENT_EXEC_ROOT")
50 && !root.is_empty()
51 {
52 return PathBuf::from(root);
53 }
54
55 if let Ok(xdg) = std::env::var("XDG_DATA_HOME")
57 && !xdg.is_empty()
58 {
59 return PathBuf::from(xdg).join("agent-exec").join("jobs");
60 }
61
62 if let Some(base_dirs) = BaseDirs::new() {
65 #[cfg(windows)]
66 let base = base_dirs.data_local_dir().to_path_buf();
67 #[cfg(not(windows))]
68 let base = base_dirs.home_dir().join(".local").join("share");
69 return base.join("agent-exec").join("jobs");
70 }
71
72 PathBuf::from("~/.local/share/agent-exec/jobs")
74}
75
76pub struct TailMetrics {
82 pub tail: String,
84 pub truncated: bool,
86 pub observed_bytes: u64,
88 pub included_bytes: u64,
90}
91
92pub struct JobDir {
94 pub path: PathBuf,
95 pub job_id: String,
96}
97
98impl JobDir {
99 pub fn open(root: &std::path::Path, job_id: &str) -> Result<Self> {
104 let path = root.join(job_id);
105 if !path.exists() {
106 return Err(anyhow::Error::new(JobNotFound(job_id.to_string())));
107 }
108 Ok(JobDir {
109 path,
110 job_id: job_id.to_string(),
111 })
112 }
113
114 pub fn create(root: &std::path::Path, job_id: &str, meta: &JobMeta) -> Result<Self> {
116 let path = root.join(job_id);
117 std::fs::create_dir_all(&path)
118 .with_context(|| format!("create job dir {}", path.display()))?;
119
120 let job_dir = JobDir {
121 path,
122 job_id: job_id.to_string(),
123 };
124
125 job_dir.write_meta_atomic(meta)?;
126
127 Ok(job_dir)
128 }
129
130 pub fn meta_path(&self) -> PathBuf {
131 self.path.join("meta.json")
132 }
133 pub fn state_path(&self) -> PathBuf {
134 self.path.join("state.json")
135 }
136 pub fn stdout_path(&self) -> PathBuf {
137 self.path.join("stdout.log")
138 }
139 pub fn stderr_path(&self) -> PathBuf {
140 self.path.join("stderr.log")
141 }
142 pub fn full_log_path(&self) -> PathBuf {
143 self.path.join("full.log")
144 }
145 pub fn completion_event_path(&self) -> PathBuf {
146 self.path.join("completion_event.json")
147 }
148 pub fn notification_events_path(&self) -> PathBuf {
149 self.path.join("notification_events.ndjson")
150 }
151
152 pub fn write_completion_event_atomic(
154 &self,
155 record: &crate::schema::CompletionEventRecord,
156 ) -> Result<()> {
157 let target = self.completion_event_path();
158 let contents = serde_json::to_string_pretty(record)?;
159 write_atomic(&self.path, &target, contents.as_bytes())?;
160 Ok(())
161 }
162
163 pub fn read_meta(&self) -> Result<JobMeta> {
164 let raw = std::fs::read(self.meta_path())?;
165 Ok(serde_json::from_slice(&raw)?)
166 }
167
168 pub fn read_state(&self) -> Result<JobState> {
169 let raw = std::fs::read(self.state_path())?;
170 Ok(serde_json::from_slice(&raw)?)
171 }
172
173 pub fn write_meta_atomic(&self, meta: &JobMeta) -> Result<()> {
175 let target = self.meta_path();
176 let contents = serde_json::to_string_pretty(meta)?;
177 write_atomic(&self.path, &target, contents.as_bytes())?;
178 Ok(())
179 }
180
181 pub fn write_state(&self, state: &JobState) -> Result<()> {
183 let target = self.state_path();
184 let contents = serde_json::to_string_pretty(state)?;
185 write_atomic(&self.path, &target, contents.as_bytes())?;
186 Ok(())
187 }
188
189 pub fn tail_log(&self, filename: &str, tail_lines: u64, max_bytes: u64) -> String {
191 self.tail_log_with_truncated(filename, tail_lines, max_bytes)
192 .0
193 }
194
195 pub fn tail_log_with_truncated(
198 &self,
199 filename: &str,
200 tail_lines: u64,
201 max_bytes: u64,
202 ) -> (String, bool) {
203 let path = self.path.join(filename);
204 let Ok(data) = std::fs::read(&path) else {
205 return (String::new(), false);
206 };
207
208 let byte_truncated = data.len() as u64 > max_bytes;
210 let start = if byte_truncated {
211 (data.len() as u64 - max_bytes) as usize
212 } else {
213 0
214 };
215 let slice = &data[start..];
216
217 let text = String::from_utf8_lossy(slice);
219
220 if tail_lines == 0 {
222 return (text.into_owned(), byte_truncated);
223 }
224 let lines: Vec<&str> = text.lines().collect();
225 let skip = lines.len().saturating_sub(tail_lines as usize);
226 let line_truncated = skip > 0;
227 (lines[skip..].join("\n"), byte_truncated || line_truncated)
228 }
229
230 pub fn read_tail_metrics(
239 &self,
240 filename: &str,
241 tail_lines: u64,
242 max_bytes: u64,
243 ) -> TailMetrics {
244 let (tail, truncated) = self.tail_log_with_truncated(filename, tail_lines, max_bytes);
245 let included_bytes = tail.len() as u64;
246 let observed_bytes = std::fs::metadata(self.path.join(filename))
247 .map(|m| m.len())
248 .unwrap_or(0);
249 TailMetrics {
250 tail,
251 truncated,
252 observed_bytes,
253 included_bytes,
254 }
255 }
256
257 pub fn init_state_created(&self) -> Result<JobState> {
261 let state = JobState {
262 job: crate::schema::JobStateJob {
263 id: self.job_id.clone(),
264 status: JobStatus::Created,
265 started_at: None,
266 },
267 result: crate::schema::JobStateResult {
268 exit_code: None,
269 signal: None,
270 duration_ms: None,
271 },
272 pid: None,
273 finished_at: None,
274 updated_at: crate::run::now_rfc3339_pub(),
275 windows_job_name: None,
276 };
277 self.write_state(&state)?;
278 Ok(state)
279 }
280
281 pub fn init_state(&self, pid: u32, started_at: &str) -> Result<JobState> {
295 #[cfg(windows)]
296 let windows_job_name = Some(format!("AgentExec-{}", self.job_id));
297 #[cfg(not(windows))]
298 let windows_job_name: Option<String> = None;
299
300 let state = JobState {
301 job: crate::schema::JobStateJob {
302 id: self.job_id.clone(),
303 status: JobStatus::Running,
304 started_at: Some(started_at.to_string()),
305 },
306 result: crate::schema::JobStateResult {
307 exit_code: None,
308 signal: None,
309 duration_ms: None,
310 },
311 pid: Some(pid),
312 finished_at: None,
313 updated_at: crate::run::now_rfc3339_pub(),
314 windows_job_name,
315 };
316 self.write_state(&state)?;
317 Ok(state)
318 }
319}
320
321fn write_atomic(dir: &std::path::Path, target: &std::path::Path, contents: &[u8]) -> Result<()> {
325 use std::io::Write;
326
327 let mut tmp = tempfile::Builder::new()
330 .prefix(".tmp-")
331 .tempfile_in(dir)
332 .with_context(|| format!("create temp file in {}", dir.display()))?;
333
334 tmp.write_all(contents)
335 .with_context(|| format!("write temp file for {}", target.display()))?;
336
337 tmp.persist(target)
339 .map_err(|e| e.error)
340 .with_context(|| format!("rename temp file to {}", target.display()))?;
341
342 Ok(())
343}
344
345#[cfg(test)]
348mod tests {
349 use super::*;
350
351 static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
357
358 #[test]
359 fn resolve_root_cli_flag_wins() {
360 let root = resolve_root(Some("/tmp/my-root"));
362 assert_eq!(root, PathBuf::from("/tmp/my-root"));
363 }
364
365 #[test]
366 fn resolve_root_env_var() {
367 let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
368 unsafe {
370 std::env::set_var("AGENT_EXEC_ROOT", "/tmp/env-root");
371 std::env::remove_var("XDG_DATA_HOME");
373 }
374 let root = resolve_root(None);
376 unsafe {
378 std::env::remove_var("AGENT_EXEC_ROOT");
379 }
380 assert_eq!(root, PathBuf::from("/tmp/env-root"));
381 }
382
383 #[test]
384 fn resolve_root_xdg() {
385 let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
386 unsafe {
388 std::env::remove_var("AGENT_EXEC_ROOT");
389 std::env::set_var("XDG_DATA_HOME", "/tmp/xdg");
390 }
391 let root = resolve_root(None);
392 unsafe {
393 std::env::remove_var("XDG_DATA_HOME");
394 }
395 assert_eq!(root, PathBuf::from("/tmp/xdg/agent-exec/jobs"));
396 }
397
398 #[test]
399 fn resolve_root_default_contains_agent_exec() {
400 let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
401 unsafe {
403 std::env::remove_var("AGENT_EXEC_ROOT");
404 std::env::remove_var("XDG_DATA_HOME");
405 }
406 let root = resolve_root(None);
407 let root_str = root.to_string_lossy();
408 assert!(
409 root_str.contains("agent-exec"),
410 "expected agent-exec in path, got {root_str}"
411 );
412 }
413
414 fn make_meta(job_id: &str, root: &std::path::Path) -> crate::schema::JobMeta {
417 crate::schema::JobMeta {
418 job: crate::schema::JobMetaJob {
419 id: job_id.to_string(),
420 },
421 schema_version: "0.1".to_string(),
422 command: vec!["echo".to_string(), "hello".to_string()],
423 created_at: "2024-01-01T00:00:00Z".to_string(),
424 root: root.display().to_string(),
425 env_keys: vec!["FOO".to_string()],
426 env_vars: vec![],
427 env_vars_runtime: vec![],
428 mask: vec![],
429 cwd: None,
430 notification: None,
431 tags: vec![],
432 inherit_env: true,
433 env_files: vec![],
434 timeout_ms: 0,
435 kill_after_ms: 0,
436 progress_every_ms: 0,
437 shell_wrapper: None,
438 }
439 }
440
441 #[test]
443 fn job_dir_create_writes_meta_json() {
444 let tmp = tempfile::tempdir().unwrap();
445 let root = tmp.path();
446 let meta = make_meta("test-job-01", root);
447 let job_dir = JobDir::create(root, "test-job-01", &meta).unwrap();
448
449 assert!(job_dir.path.is_dir(), "job directory was not created");
451
452 assert!(job_dir.meta_path().exists(), "meta.json not found");
454 let loaded_meta = job_dir.read_meta().unwrap();
455 assert_eq!(loaded_meta.job_id(), "test-job-01");
456 assert_eq!(loaded_meta.command, vec!["echo", "hello"]);
457
458 assert_eq!(loaded_meta.env_keys, vec!["FOO"]);
460 }
461
462 #[test]
464 fn meta_json_env_keys_only_no_values() {
465 let tmp = tempfile::tempdir().unwrap();
466 let root = tmp.path();
467 let mut meta = make_meta("test-job-02", root);
468 meta.env_keys = vec!["SECRET_KEY".to_string(), "API_TOKEN".to_string()];
470 let job_dir = JobDir::create(root, "test-job-02", &meta).unwrap();
471
472 let raw = std::fs::read_to_string(job_dir.meta_path()).unwrap();
474 assert!(
475 !raw.contains("secret_value"),
476 "env value must not be stored in meta.json"
477 );
478 assert!(raw.contains("SECRET_KEY"), "env key must be stored");
479 assert!(raw.contains("API_TOKEN"), "env key must be stored");
480 }
481
482 #[test]
484 fn state_json_contains_updated_at() {
485 let tmp = tempfile::tempdir().unwrap();
486 let root = tmp.path();
487 let meta = make_meta("test-job-03", root);
488 let job_dir = JobDir::create(root, "test-job-03", &meta).unwrap();
489
490 let state = crate::schema::JobState {
491 job: crate::schema::JobStateJob {
492 id: "test-job-03".to_string(),
493 status: crate::schema::JobStatus::Running,
494 started_at: Some("2024-01-01T00:00:00Z".to_string()),
495 },
496 result: crate::schema::JobStateResult {
497 exit_code: None,
498 signal: None,
499 duration_ms: None,
500 },
501 pid: Some(12345),
502 finished_at: None,
503 updated_at: "2024-01-01T00:00:01Z".to_string(),
504 windows_job_name: None,
505 };
506 job_dir.write_state(&state).unwrap();
507
508 assert!(job_dir.state_path().exists(), "state.json not found");
510 let loaded = job_dir.read_state().unwrap();
511 assert_eq!(loaded.updated_at, "2024-01-01T00:00:01Z");
512 assert_eq!(loaded.job_id(), "test-job-03");
513
514 let raw = std::fs::read_to_string(job_dir.state_path()).unwrap();
516 assert!(
517 raw.contains("updated_at"),
518 "updated_at field missing from state.json"
519 );
520 }
521
522 #[test]
526 fn state_json_atomic_write_no_corruption() {
527 let tmp = tempfile::tempdir().unwrap();
528 let root = tmp.path();
529 let meta = make_meta("test-job-04", root);
530 let job_dir = JobDir::create(root, "test-job-04", &meta).unwrap();
531
532 for i in 0..10 {
533 let state = crate::schema::JobState {
534 job: crate::schema::JobStateJob {
535 id: "test-job-04".to_string(),
536 status: crate::schema::JobStatus::Running,
537 started_at: Some("2024-01-01T00:00:00Z".to_string()),
538 },
539 result: crate::schema::JobStateResult {
540 exit_code: None,
541 signal: None,
542 duration_ms: None,
543 },
544 pid: Some(100 + i),
545 finished_at: None,
546 updated_at: format!("2024-01-01T00:00:{:02}Z", i),
547 windows_job_name: None,
548 };
549 job_dir.write_state(&state).unwrap();
550
551 let loaded = job_dir.read_state().unwrap();
553 assert_eq!(
554 loaded.pid,
555 Some(100 + i),
556 "state corrupted at iteration {i}"
557 );
558 }
559 }
560
561 #[test]
563 fn meta_json_atomic_write() {
564 let tmp = tempfile::tempdir().unwrap();
565 let root = tmp.path();
566 let meta = make_meta("test-job-05", root);
567 let job_dir = JobDir::create(root, "test-job-05", &meta).unwrap();
568
569 let updated_meta = crate::schema::JobMeta {
571 job: crate::schema::JobMetaJob {
572 id: "test-job-05".to_string(),
573 },
574 schema_version: "0.1".to_string(),
575 command: vec!["ls".to_string()],
576 created_at: "2024-06-01T12:00:00Z".to_string(),
577 root: root.display().to_string(),
578 env_keys: vec!["PATH".to_string()],
579 env_vars: vec![],
580 env_vars_runtime: vec![],
581 mask: vec![],
582 cwd: None,
583 notification: None,
584 tags: vec![],
585 inherit_env: true,
586 env_files: vec![],
587 timeout_ms: 0,
588 kill_after_ms: 0,
589 progress_every_ms: 0,
590 shell_wrapper: None,
591 };
592 job_dir.write_meta_atomic(&updated_meta).unwrap();
593
594 let loaded = job_dir.read_meta().unwrap();
595 assert_eq!(loaded.command, vec!["ls"]);
596 assert_eq!(loaded.created_at, "2024-06-01T12:00:00Z");
597 }
598
599 #[test]
605 fn init_state_writes_deterministic_job_name_on_windows() {
606 let tmp = tempfile::tempdir().unwrap();
607 let root = tmp.path();
608 let job_id = "01TESTJOBID0000000000000";
609 let meta = make_meta(job_id, root);
610 let job_dir = JobDir::create(root, job_id, &meta).unwrap();
611 let state = job_dir.init_state(1234, "2024-01-01T00:00:00Z").unwrap();
612
613 #[cfg(windows)]
615 assert_eq!(
616 state.windows_job_name.as_deref(),
617 Some("AgentExec-01TESTJOBID0000000000000"),
618 "Windows: init_state must set deterministic job name immediately"
619 );
620 #[cfg(not(windows))]
621 assert_eq!(
622 state.windows_job_name, None,
623 "non-Windows: init_state must not set windows_job_name"
624 );
625
626 let persisted = job_dir.read_state().unwrap();
628 #[cfg(windows)]
629 assert_eq!(
630 persisted.windows_job_name.as_deref(),
631 Some("AgentExec-01TESTJOBID0000000000000"),
632 "Windows: persisted state.json must contain windows_job_name"
633 );
634 #[cfg(not(windows))]
635 assert_eq!(
636 persisted.windows_job_name, None,
637 "non-Windows: persisted state.json must not contain windows_job_name"
638 );
639 }
640}