1use anyhow::{Context, Result};
10use directories::BaseDirs;
11use std::path::PathBuf;
12
13use crate::schema::{JobMeta, JobState, JobStatus};
14
15#[derive(Debug)]
18pub struct JobNotFound(pub String);
19
20impl std::fmt::Display for JobNotFound {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 write!(f, "job not found: {}", self.0)
23 }
24}
25
26impl std::error::Error for JobNotFound {}
27
28#[derive(Debug)]
31pub struct AmbiguousJobId {
32 pub prefix: String,
33 pub candidates: Vec<String>,
34}
35
36impl std::fmt::Display for AmbiguousJobId {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 write!(f, "ambiguous job ID prefix '{}': matches ", self.prefix)?;
39 if self.candidates.len() <= 5 {
40 write!(f, "{}", self.candidates.join(", "))
41 } else {
42 write!(
43 f,
44 "{}, ... and {} more",
45 self.candidates[..5].join(", "),
46 self.candidates.len() - 5
47 )
48 }
49 }
50}
51
52impl std::error::Error for AmbiguousJobId {}
53
54#[derive(Debug)]
57pub struct InvalidJobState(pub String);
58
59impl std::fmt::Display for InvalidJobState {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 write!(f, "invalid job state: {}", self.0)
62 }
63}
64
65impl std::error::Error for InvalidJobState {}
66
67pub fn resolve_root(cli_root: Option<&str>) -> PathBuf {
69 if let Some(root) = cli_root {
71 return PathBuf::from(root);
72 }
73
74 if let Ok(root) = std::env::var("AGENT_EXEC_ROOT")
76 && !root.is_empty()
77 {
78 return PathBuf::from(root);
79 }
80
81 if let Ok(xdg) = std::env::var("XDG_DATA_HOME")
83 && !xdg.is_empty()
84 {
85 return PathBuf::from(xdg).join("agent-exec").join("jobs");
86 }
87
88 if let Some(base_dirs) = BaseDirs::new() {
91 #[cfg(windows)]
92 let base = base_dirs.data_local_dir().to_path_buf();
93 #[cfg(not(windows))]
94 let base = base_dirs.home_dir().join(".local").join("share");
95 return base.join("agent-exec").join("jobs");
96 }
97
98 PathBuf::from("~/.local/share/agent-exec/jobs")
100}
101
102pub struct TailMetrics {
108 pub tail: String,
110 pub truncated: bool,
112 pub observed_bytes: u64,
114 pub included_bytes: u64,
116}
117
118#[derive(Debug)]
120pub struct JobDir {
121 pub path: PathBuf,
122 pub job_id: String,
123}
124
125impl JobDir {
126 pub fn open(root: &std::path::Path, job_id: &str) -> Result<Self> {
135 let path = root.join(job_id);
137 if path.is_dir() {
138 return Ok(JobDir {
139 path,
140 job_id: job_id.to_string(),
141 });
142 }
143
144 let mut candidates: Vec<String> = std::fs::read_dir(root)
146 .into_iter()
147 .flatten()
148 .flatten()
149 .filter_map(|entry| {
150 let name = entry.file_name().to_string_lossy().into_owned();
151 if name.starts_with(job_id) && entry.path().is_dir() {
152 Some(name)
153 } else {
154 None
155 }
156 })
157 .collect();
158
159 match candidates.len() {
160 0 => Err(anyhow::Error::new(JobNotFound(job_id.to_string()))),
161 1 => {
162 let resolved = candidates.remove(0);
163 let path = root.join(&resolved);
164 Ok(JobDir {
165 path,
166 job_id: resolved,
167 })
168 }
169 _ => {
170 candidates.sort();
171 Err(anyhow::Error::new(AmbiguousJobId {
172 prefix: job_id.to_string(),
173 candidates,
174 }))
175 }
176 }
177 }
178
179 pub fn create(root: &std::path::Path, job_id: &str, meta: &JobMeta) -> Result<Self> {
181 let path = root.join(job_id);
182 std::fs::create_dir_all(&path)
183 .with_context(|| format!("create job dir {}", path.display()))?;
184
185 let job_dir = JobDir {
186 path,
187 job_id: job_id.to_string(),
188 };
189
190 job_dir.write_meta_atomic(meta)?;
191
192 Ok(job_dir)
193 }
194
195 pub fn meta_path(&self) -> PathBuf {
196 self.path.join("meta.json")
197 }
198 pub fn state_path(&self) -> PathBuf {
199 self.path.join("state.json")
200 }
201 pub fn stdout_path(&self) -> PathBuf {
202 self.path.join("stdout.log")
203 }
204 pub fn stderr_path(&self) -> PathBuf {
205 self.path.join("stderr.log")
206 }
207 pub fn full_log_path(&self) -> PathBuf {
208 self.path.join("full.log")
209 }
210 pub fn completion_event_path(&self) -> PathBuf {
211 self.path.join("completion_event.json")
212 }
213 pub fn notification_events_path(&self) -> PathBuf {
214 self.path.join("notification_events.ndjson")
215 }
216
217 pub fn write_completion_event_atomic(
219 &self,
220 record: &crate::schema::CompletionEventRecord,
221 ) -> Result<()> {
222 let target = self.completion_event_path();
223 let contents = serde_json::to_string_pretty(record)?;
224 write_atomic(&self.path, &target, contents.as_bytes())?;
225 Ok(())
226 }
227
228 pub fn read_meta(&self) -> Result<JobMeta> {
229 let raw = std::fs::read(self.meta_path())?;
230 Ok(serde_json::from_slice(&raw)?)
231 }
232
233 pub fn read_state(&self) -> Result<JobState> {
234 let raw = std::fs::read(self.state_path())?;
235 Ok(serde_json::from_slice(&raw)?)
236 }
237
238 pub fn write_meta_atomic(&self, meta: &JobMeta) -> Result<()> {
240 let target = self.meta_path();
241 let contents = serde_json::to_string_pretty(meta)?;
242 write_atomic(&self.path, &target, contents.as_bytes())?;
243 Ok(())
244 }
245
246 pub fn write_state(&self, state: &JobState) -> Result<()> {
248 let target = self.state_path();
249 let contents = serde_json::to_string_pretty(state)?;
250 write_atomic(&self.path, &target, contents.as_bytes())?;
251 Ok(())
252 }
253
254 pub fn tail_log(&self, filename: &str, tail_lines: u64, max_bytes: u64) -> String {
256 self.tail_log_with_truncated(filename, tail_lines, max_bytes)
257 .0
258 }
259
260 pub fn tail_log_with_truncated(
263 &self,
264 filename: &str,
265 tail_lines: u64,
266 max_bytes: u64,
267 ) -> (String, bool) {
268 let path = self.path.join(filename);
269 let Ok(data) = std::fs::read(&path) else {
270 return (String::new(), false);
271 };
272
273 let byte_truncated = data.len() as u64 > max_bytes;
275 let start = if byte_truncated {
276 (data.len() as u64 - max_bytes) as usize
277 } else {
278 0
279 };
280 let slice = &data[start..];
281
282 let text = String::from_utf8_lossy(slice);
284
285 if tail_lines == 0 {
287 return (text.into_owned(), byte_truncated);
288 }
289 let lines: Vec<&str> = text.lines().collect();
290 let skip = lines.len().saturating_sub(tail_lines as usize);
291 let line_truncated = skip > 0;
292 (lines[skip..].join("\n"), byte_truncated || line_truncated)
293 }
294
295 pub fn read_tail_metrics(
304 &self,
305 filename: &str,
306 tail_lines: u64,
307 max_bytes: u64,
308 ) -> TailMetrics {
309 let (tail, truncated) = self.tail_log_with_truncated(filename, tail_lines, max_bytes);
310 let included_bytes = tail.len() as u64;
311 let observed_bytes = std::fs::metadata(self.path.join(filename))
312 .map(|m| m.len())
313 .unwrap_or(0);
314 TailMetrics {
315 tail,
316 truncated,
317 observed_bytes,
318 included_bytes,
319 }
320 }
321
322 pub fn init_state_created(&self) -> Result<JobState> {
326 let state = JobState {
327 job: crate::schema::JobStateJob {
328 id: self.job_id.clone(),
329 status: JobStatus::Created,
330 started_at: None,
331 },
332 result: crate::schema::JobStateResult {
333 exit_code: None,
334 signal: None,
335 duration_ms: None,
336 },
337 pid: None,
338 finished_at: None,
339 updated_at: crate::run::now_rfc3339_pub(),
340 windows_job_name: None,
341 };
342 self.write_state(&state)?;
343 Ok(state)
344 }
345
346 pub fn init_state(&self, pid: u32, started_at: &str) -> Result<JobState> {
360 #[cfg(windows)]
361 let windows_job_name = Some(format!("AgentExec-{}", self.job_id));
362 #[cfg(not(windows))]
363 let windows_job_name: Option<String> = None;
364
365 let state = JobState {
366 job: crate::schema::JobStateJob {
367 id: self.job_id.clone(),
368 status: JobStatus::Running,
369 started_at: Some(started_at.to_string()),
370 },
371 result: crate::schema::JobStateResult {
372 exit_code: None,
373 signal: None,
374 duration_ms: None,
375 },
376 pid: Some(pid),
377 finished_at: None,
378 updated_at: crate::run::now_rfc3339_pub(),
379 windows_job_name,
380 };
381 self.write_state(&state)?;
382 Ok(state)
383 }
384}
385
386fn write_atomic(dir: &std::path::Path, target: &std::path::Path, contents: &[u8]) -> Result<()> {
390 use std::io::Write;
391
392 let mut tmp = tempfile::Builder::new()
395 .prefix(".tmp-")
396 .tempfile_in(dir)
397 .with_context(|| format!("create temp file in {}", dir.display()))?;
398
399 tmp.write_all(contents)
400 .with_context(|| format!("write temp file for {}", target.display()))?;
401
402 tmp.persist(target)
404 .map_err(|e| e.error)
405 .with_context(|| format!("rename temp file to {}", target.display()))?;
406
407 Ok(())
408}
409
410#[cfg(test)]
413mod tests {
414 use super::*;
415
416 static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
422
423 #[test]
424 fn resolve_root_cli_flag_wins() {
425 let root = resolve_root(Some("/tmp/my-root"));
427 assert_eq!(root, PathBuf::from("/tmp/my-root"));
428 }
429
430 #[test]
431 fn resolve_root_env_var() {
432 let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
433 unsafe {
435 std::env::set_var("AGENT_EXEC_ROOT", "/tmp/env-root");
436 std::env::remove_var("XDG_DATA_HOME");
438 }
439 let root = resolve_root(None);
441 unsafe {
443 std::env::remove_var("AGENT_EXEC_ROOT");
444 }
445 assert_eq!(root, PathBuf::from("/tmp/env-root"));
446 }
447
448 #[test]
449 fn resolve_root_xdg() {
450 let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
451 unsafe {
453 std::env::remove_var("AGENT_EXEC_ROOT");
454 std::env::set_var("XDG_DATA_HOME", "/tmp/xdg");
455 }
456 let root = resolve_root(None);
457 unsafe {
458 std::env::remove_var("XDG_DATA_HOME");
459 }
460 assert_eq!(root, PathBuf::from("/tmp/xdg/agent-exec/jobs"));
461 }
462
463 #[test]
464 fn resolve_root_default_contains_agent_exec() {
465 let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
466 unsafe {
468 std::env::remove_var("AGENT_EXEC_ROOT");
469 std::env::remove_var("XDG_DATA_HOME");
470 }
471 let root = resolve_root(None);
472 let root_str = root.to_string_lossy();
473 assert!(
474 root_str.contains("agent-exec"),
475 "expected agent-exec in path, got {root_str}"
476 );
477 }
478
479 fn make_meta(job_id: &str, root: &std::path::Path) -> crate::schema::JobMeta {
482 crate::schema::JobMeta {
483 job: crate::schema::JobMetaJob {
484 id: job_id.to_string(),
485 },
486 schema_version: "0.1".to_string(),
487 command: vec!["echo".to_string(), "hello".to_string()],
488 created_at: "2024-01-01T00:00:00Z".to_string(),
489 root: root.display().to_string(),
490 env_keys: vec!["FOO".to_string()],
491 env_vars: vec![],
492 env_vars_runtime: vec![],
493 mask: vec![],
494 cwd: None,
495 notification: None,
496 tags: vec![],
497 inherit_env: true,
498 env_files: vec![],
499 timeout_ms: 0,
500 kill_after_ms: 0,
501 progress_every_ms: 0,
502 shell_wrapper: None,
503 }
504 }
505
506 #[test]
508 fn job_dir_create_writes_meta_json() {
509 let tmp = tempfile::tempdir().unwrap();
510 let root = tmp.path();
511 let meta = make_meta("test-job-01", root);
512 let job_dir = JobDir::create(root, "test-job-01", &meta).unwrap();
513
514 assert!(job_dir.path.is_dir(), "job directory was not created");
516
517 assert!(job_dir.meta_path().exists(), "meta.json not found");
519 let loaded_meta = job_dir.read_meta().unwrap();
520 assert_eq!(loaded_meta.job_id(), "test-job-01");
521 assert_eq!(loaded_meta.command, vec!["echo", "hello"]);
522
523 assert_eq!(loaded_meta.env_keys, vec!["FOO"]);
525 }
526
527 #[test]
529 fn meta_json_env_keys_only_no_values() {
530 let tmp = tempfile::tempdir().unwrap();
531 let root = tmp.path();
532 let mut meta = make_meta("test-job-02", root);
533 meta.env_keys = vec!["SECRET_KEY".to_string(), "API_TOKEN".to_string()];
535 let job_dir = JobDir::create(root, "test-job-02", &meta).unwrap();
536
537 let raw = std::fs::read_to_string(job_dir.meta_path()).unwrap();
539 assert!(
540 !raw.contains("secret_value"),
541 "env value must not be stored in meta.json"
542 );
543 assert!(raw.contains("SECRET_KEY"), "env key must be stored");
544 assert!(raw.contains("API_TOKEN"), "env key must be stored");
545 }
546
547 #[test]
549 fn state_json_contains_updated_at() {
550 let tmp = tempfile::tempdir().unwrap();
551 let root = tmp.path();
552 let meta = make_meta("test-job-03", root);
553 let job_dir = JobDir::create(root, "test-job-03", &meta).unwrap();
554
555 let state = crate::schema::JobState {
556 job: crate::schema::JobStateJob {
557 id: "test-job-03".to_string(),
558 status: crate::schema::JobStatus::Running,
559 started_at: Some("2024-01-01T00:00:00Z".to_string()),
560 },
561 result: crate::schema::JobStateResult {
562 exit_code: None,
563 signal: None,
564 duration_ms: None,
565 },
566 pid: Some(12345),
567 finished_at: None,
568 updated_at: "2024-01-01T00:00:01Z".to_string(),
569 windows_job_name: None,
570 };
571 job_dir.write_state(&state).unwrap();
572
573 assert!(job_dir.state_path().exists(), "state.json not found");
575 let loaded = job_dir.read_state().unwrap();
576 assert_eq!(loaded.updated_at, "2024-01-01T00:00:01Z");
577 assert_eq!(loaded.job_id(), "test-job-03");
578
579 let raw = std::fs::read_to_string(job_dir.state_path()).unwrap();
581 assert!(
582 raw.contains("updated_at"),
583 "updated_at field missing from state.json"
584 );
585 }
586
587 #[test]
591 fn state_json_atomic_write_no_corruption() {
592 let tmp = tempfile::tempdir().unwrap();
593 let root = tmp.path();
594 let meta = make_meta("test-job-04", root);
595 let job_dir = JobDir::create(root, "test-job-04", &meta).unwrap();
596
597 for i in 0..10 {
598 let state = crate::schema::JobState {
599 job: crate::schema::JobStateJob {
600 id: "test-job-04".to_string(),
601 status: crate::schema::JobStatus::Running,
602 started_at: Some("2024-01-01T00:00:00Z".to_string()),
603 },
604 result: crate::schema::JobStateResult {
605 exit_code: None,
606 signal: None,
607 duration_ms: None,
608 },
609 pid: Some(100 + i),
610 finished_at: None,
611 updated_at: format!("2024-01-01T00:00:{:02}Z", i),
612 windows_job_name: None,
613 };
614 job_dir.write_state(&state).unwrap();
615
616 let loaded = job_dir.read_state().unwrap();
618 assert_eq!(
619 loaded.pid,
620 Some(100 + i),
621 "state corrupted at iteration {i}"
622 );
623 }
624 }
625
626 #[test]
628 fn meta_json_atomic_write() {
629 let tmp = tempfile::tempdir().unwrap();
630 let root = tmp.path();
631 let meta = make_meta("test-job-05", root);
632 let job_dir = JobDir::create(root, "test-job-05", &meta).unwrap();
633
634 let updated_meta = crate::schema::JobMeta {
636 job: crate::schema::JobMetaJob {
637 id: "test-job-05".to_string(),
638 },
639 schema_version: "0.1".to_string(),
640 command: vec!["ls".to_string()],
641 created_at: "2024-06-01T12:00:00Z".to_string(),
642 root: root.display().to_string(),
643 env_keys: vec!["PATH".to_string()],
644 env_vars: vec![],
645 env_vars_runtime: vec![],
646 mask: vec![],
647 cwd: None,
648 notification: None,
649 tags: vec![],
650 inherit_env: true,
651 env_files: vec![],
652 timeout_ms: 0,
653 kill_after_ms: 0,
654 progress_every_ms: 0,
655 shell_wrapper: None,
656 };
657 job_dir.write_meta_atomic(&updated_meta).unwrap();
658
659 let loaded = job_dir.read_meta().unwrap();
660 assert_eq!(loaded.command, vec!["ls"]);
661 assert_eq!(loaded.created_at, "2024-06-01T12:00:00Z");
662 }
663
664 #[test]
670 fn init_state_writes_deterministic_job_name_on_windows() {
671 let tmp = tempfile::tempdir().unwrap();
672 let root = tmp.path();
673 let job_id = "01TESTJOBID0000000000000";
674 let meta = make_meta(job_id, root);
675 let job_dir = JobDir::create(root, job_id, &meta).unwrap();
676 let state = job_dir.init_state(1234, "2024-01-01T00:00:00Z").unwrap();
677
678 #[cfg(windows)]
680 assert_eq!(
681 state.windows_job_name.as_deref(),
682 Some("AgentExec-01TESTJOBID0000000000000"),
683 "Windows: init_state must set deterministic job name immediately"
684 );
685 #[cfg(not(windows))]
686 assert_eq!(
687 state.windows_job_name, None,
688 "non-Windows: init_state must not set windows_job_name"
689 );
690
691 let persisted = job_dir.read_state().unwrap();
693 #[cfg(windows)]
694 assert_eq!(
695 persisted.windows_job_name.as_deref(),
696 Some("AgentExec-01TESTJOBID0000000000000"),
697 "Windows: persisted state.json must contain windows_job_name"
698 );
699 #[cfg(not(windows))]
700 assert_eq!(
701 persisted.windows_job_name, None,
702 "non-Windows: persisted state.json must not contain windows_job_name"
703 );
704 }
705
706 #[test]
709 fn job_dir_open_exact_match() {
710 let tmp = tempfile::tempdir().unwrap();
711 let root = tmp.path();
712 let job_id = "01JQXK3M8E5PQRSTVWYZ12ABCD";
713 let meta = make_meta(job_id, root);
714 JobDir::create(root, job_id, &meta).unwrap();
715
716 let result = JobDir::open(root, job_id).unwrap();
717 assert_eq!(result.job_id, job_id);
718 }
719
720 #[test]
721 fn job_dir_open_unique_prefix_resolves() {
722 let tmp = tempfile::tempdir().unwrap();
723 let root = tmp.path();
724 let job_id = "01JQXK3M8E5PQRSTVWYZ12ABCD";
725 let meta = make_meta(job_id, root);
726 JobDir::create(root, job_id, &meta).unwrap();
727
728 let result = JobDir::open(root, "01JQXK3M").unwrap();
730 assert_eq!(result.job_id, job_id);
731 }
732
733 #[test]
734 fn job_dir_open_not_found_returns_job_not_found() {
735 let tmp = tempfile::tempdir().unwrap();
736 let root = tmp.path();
737
738 let err = JobDir::open(root, "ZZZZZ").unwrap_err();
739 assert!(
740 err.downcast_ref::<JobNotFound>().is_some(),
741 "expected JobNotFound, got: {err}"
742 );
743 }
744
745 #[test]
746 fn job_dir_open_ambiguous_prefix_returns_ambiguous() {
747 let tmp = tempfile::tempdir().unwrap();
748 let root = tmp.path();
749 let id_a = "01JQXK3M8EAAA00000000000AA";
750 let id_b = "01JQXK3M8EBBB00000000000BB";
751 let meta_a = make_meta(id_a, root);
752 let meta_b = make_meta(id_b, root);
753 JobDir::create(root, id_a, &meta_a).unwrap();
754 JobDir::create(root, id_b, &meta_b).unwrap();
755
756 let err = JobDir::open(root, "01JQXK3M8E").unwrap_err();
757 let ambiguous = err
758 .downcast_ref::<AmbiguousJobId>()
759 .expect("expected AmbiguousJobId");
760 assert_eq!(ambiguous.prefix, "01JQXK3M8E");
761 assert!(ambiguous.candidates.contains(&id_a.to_string()));
762 assert!(ambiguous.candidates.contains(&id_b.to_string()));
763 }
764
765 #[test]
766 fn ambiguous_job_id_display_up_to_5_candidates() {
767 let err = AmbiguousJobId {
768 prefix: "01J".to_string(),
769 candidates: vec![
770 "01JAAA".to_string(),
771 "01JBBB".to_string(),
772 "01JCCC".to_string(),
773 ],
774 };
775 let msg = err.to_string();
776 assert!(msg.contains("01J"), "must include prefix: {msg}");
777 assert!(msg.contains("01JAAA"), "must list candidates: {msg}");
778 }
779
780 #[test]
781 fn ambiguous_job_id_display_truncates_beyond_5() {
782 let candidates: Vec<String> = (1..=8)
783 .map(|i| format!("01JCANDIDATE{i:02}0000000000"))
784 .collect();
785 let err = AmbiguousJobId {
786 prefix: "01J".to_string(),
787 candidates,
788 };
789 let msg = err.to_string();
790 assert!(msg.contains("... and 3 more"), "must truncate: {msg}");
791 }
792}