Skip to main content

agent_exec/
jobstore.rs

1//! Job directory management for agent-exec v0.1.
2//!
3//! Resolution order for the jobs root:
4//!   1. `--root` CLI flag
5//!   2. `AGENT_EXEC_ROOT` environment variable
6//!   3. `$XDG_DATA_HOME/agent-exec/jobs`
7//!   4. `~/.local/share/agent-exec/jobs`
8
9use anyhow::{Context, Result};
10use directories::BaseDirs;
11use std::path::PathBuf;
12
13use crate::schema::{JobMeta, JobState, JobStatus};
14
15/// Sentinel error type to distinguish "job not found" from other I/O errors.
16/// Used by callers to emit `error.code = "job_not_found"` instead of `internal_error`.
17#[derive(Debug)]
18pub struct JobNotFound(pub String);
19
20impl std::fmt::Display for JobNotFound {
21    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        write!(f, "job not found: {}", self.0)
23    }
24}
25
26impl std::error::Error for JobNotFound {}
27
28/// Sentinel error type for invalid job state transitions.
29/// Used by callers to emit `error.code = "invalid_state"` instead of `internal_error`.
30#[derive(Debug)]
31pub struct InvalidJobState(pub String);
32
33impl std::fmt::Display for InvalidJobState {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        write!(f, "invalid job state: {}", self.0)
36    }
37}
38
39impl std::error::Error for InvalidJobState {}
40
41/// Resolve the jobs root directory following the priority chain.
42pub fn resolve_root(cli_root: Option<&str>) -> PathBuf {
43    // 1. CLI flag
44    if let Some(root) = cli_root {
45        return PathBuf::from(root);
46    }
47
48    // 2. Environment variable
49    if let Ok(root) = std::env::var("AGENT_EXEC_ROOT")
50        && !root.is_empty()
51    {
52        return PathBuf::from(root);
53    }
54
55    // 3. XDG_DATA_HOME
56    if let Ok(xdg) = std::env::var("XDG_DATA_HOME")
57        && !xdg.is_empty()
58    {
59        return PathBuf::from(xdg).join("agent-exec").join("jobs");
60    }
61
62    // 4. Default: ~/.local/share/agent-exec/jobs
63    //    (On Windows use data_local_dir() as base)
64    if let Some(base_dirs) = BaseDirs::new() {
65        #[cfg(windows)]
66        let base = base_dirs.data_local_dir().to_path_buf();
67        #[cfg(not(windows))]
68        let base = base_dirs.home_dir().join(".local").join("share");
69        return base.join("agent-exec").join("jobs");
70    }
71
72    // Fallback if directories crate returns None
73    PathBuf::from("~/.local/share/agent-exec/jobs")
74}
75
76/// Metrics returned by [`JobDir::read_tail_metrics`].
77///
78/// Bundles the tail content together with the byte counts used in the
79/// `run` snapshot and `tail` JSON responses, so that both callers share
80/// the same calculation logic.
81pub struct TailMetrics {
82    /// The tail text (lossy UTF-8, last N lines / max_bytes).
83    pub tail: String,
84    /// Whether the content was truncated by bytes or lines constraints.
85    pub truncated: bool,
86    /// Total file size in bytes (0 if the file does not exist).
87    pub observed_bytes: u64,
88    /// Number of bytes included in `tail`.
89    pub included_bytes: u64,
90}
91
92/// Handle to a specific job's directory.
93pub struct JobDir {
94    pub path: PathBuf,
95    pub job_id: String,
96}
97
98impl JobDir {
99    /// Open an existing job directory by ID.
100    ///
101    /// Returns `Err` wrapping `JobNotFound` when the directory does not exist,
102    /// so callers can emit `error.code = "job_not_found"` rather than `internal_error`.
103    pub fn open(root: &std::path::Path, job_id: &str) -> Result<Self> {
104        let path = root.join(job_id);
105        if !path.exists() {
106            return Err(anyhow::Error::new(JobNotFound(job_id.to_string())));
107        }
108        Ok(JobDir {
109            path,
110            job_id: job_id.to_string(),
111        })
112    }
113
114    /// Create a new job directory and write `meta.json` atomically.
115    pub fn create(root: &std::path::Path, job_id: &str, meta: &JobMeta) -> Result<Self> {
116        let path = root.join(job_id);
117        std::fs::create_dir_all(&path)
118            .with_context(|| format!("create job dir {}", path.display()))?;
119
120        let job_dir = JobDir {
121            path,
122            job_id: job_id.to_string(),
123        };
124
125        job_dir.write_meta_atomic(meta)?;
126
127        Ok(job_dir)
128    }
129
130    pub fn meta_path(&self) -> PathBuf {
131        self.path.join("meta.json")
132    }
133    pub fn state_path(&self) -> PathBuf {
134        self.path.join("state.json")
135    }
136    pub fn stdout_path(&self) -> PathBuf {
137        self.path.join("stdout.log")
138    }
139    pub fn stderr_path(&self) -> PathBuf {
140        self.path.join("stderr.log")
141    }
142    pub fn full_log_path(&self) -> PathBuf {
143        self.path.join("full.log")
144    }
145    pub fn completion_event_path(&self) -> PathBuf {
146        self.path.join("completion_event.json")
147    }
148    pub fn notification_events_path(&self) -> PathBuf {
149        self.path.join("notification_events.ndjson")
150    }
151
152    /// Write `completion_event.json` atomically.
153    pub fn write_completion_event_atomic(
154        &self,
155        record: &crate::schema::CompletionEventRecord,
156    ) -> Result<()> {
157        let target = self.completion_event_path();
158        let contents = serde_json::to_string_pretty(record)?;
159        write_atomic(&self.path, &target, contents.as_bytes())?;
160        Ok(())
161    }
162
163    pub fn read_meta(&self) -> Result<JobMeta> {
164        let raw = std::fs::read(self.meta_path())?;
165        Ok(serde_json::from_slice(&raw)?)
166    }
167
168    pub fn read_state(&self) -> Result<JobState> {
169        let raw = std::fs::read(self.state_path())?;
170        Ok(serde_json::from_slice(&raw)?)
171    }
172
173    /// Write `meta.json` atomically: write to a temp file then rename.
174    pub fn write_meta_atomic(&self, meta: &JobMeta) -> Result<()> {
175        let target = self.meta_path();
176        let contents = serde_json::to_string_pretty(meta)?;
177        write_atomic(&self.path, &target, contents.as_bytes())?;
178        Ok(())
179    }
180
181    /// Write `state.json` atomically: write to a temp file then rename.
182    pub fn write_state(&self, state: &JobState) -> Result<()> {
183        let target = self.state_path();
184        let contents = serde_json::to_string_pretty(state)?;
185        write_atomic(&self.path, &target, contents.as_bytes())?;
186        Ok(())
187    }
188
189    /// Read the last `max_bytes` of a log file, returning lossy UTF-8.
190    pub fn tail_log(&self, filename: &str, tail_lines: u64, max_bytes: u64) -> String {
191        self.tail_log_with_truncated(filename, tail_lines, max_bytes)
192            .0
193    }
194
195    /// Read the last `max_bytes` of a log file, returning (content, truncated).
196    /// `truncated` is true when the content was cut by bytes or lines constraints.
197    pub fn tail_log_with_truncated(
198        &self,
199        filename: &str,
200        tail_lines: u64,
201        max_bytes: u64,
202    ) -> (String, bool) {
203        let path = self.path.join(filename);
204        let Ok(data) = std::fs::read(&path) else {
205            return (String::new(), false);
206        };
207
208        // Truncate to max_bytes from the end.
209        let byte_truncated = data.len() as u64 > max_bytes;
210        let start = if byte_truncated {
211            (data.len() as u64 - max_bytes) as usize
212        } else {
213            0
214        };
215        let slice = &data[start..];
216
217        // Lossy UTF-8 decode.
218        let text = String::from_utf8_lossy(slice);
219
220        // Keep only the last tail_lines.
221        if tail_lines == 0 {
222            return (text.into_owned(), byte_truncated);
223        }
224        let lines: Vec<&str> = text.lines().collect();
225        let skip = lines.len().saturating_sub(tail_lines as usize);
226        let line_truncated = skip > 0;
227        (lines[skip..].join("\n"), byte_truncated || line_truncated)
228    }
229
230    /// Read tail content and byte metrics for a single log file.
231    ///
232    /// Returns a [`TailMetrics`] that bundles the tail text, truncation flag,
233    /// observed file size, and included byte count.  Both `run`'s snapshot
234    /// generation and `tail`'s JSON generation use this helper so that the
235    /// metric calculation is defined in exactly one place.
236    ///
237    /// `encoding` is always `"utf-8-lossy"` (as required by the contract).
238    pub fn read_tail_metrics(
239        &self,
240        filename: &str,
241        tail_lines: u64,
242        max_bytes: u64,
243    ) -> TailMetrics {
244        let (tail, truncated) = self.tail_log_with_truncated(filename, tail_lines, max_bytes);
245        let included_bytes = tail.len() as u64;
246        let observed_bytes = std::fs::metadata(self.path.join(filename))
247            .map(|m| m.len())
248            .unwrap_or(0);
249        TailMetrics {
250            tail,
251            truncated,
252            observed_bytes,
253            included_bytes,
254        }
255    }
256
257    /// Write the initial JobState for a `created` (not-yet-started) job.
258    ///
259    /// The state is `created`, no process has been spawned, and `started_at` is absent.
260    pub fn init_state_created(&self) -> Result<JobState> {
261        let state = JobState {
262            job: crate::schema::JobStateJob {
263                id: self.job_id.clone(),
264                status: JobStatus::Created,
265                started_at: None,
266            },
267            result: crate::schema::JobStateResult {
268                exit_code: None,
269                signal: None,
270                duration_ms: None,
271            },
272            pid: None,
273            finished_at: None,
274            updated_at: crate::run::now_rfc3339_pub(),
275            windows_job_name: None,
276        };
277        self.write_state(&state)?;
278        Ok(state)
279    }
280
281    /// Write the initial JobState (running, supervisor PID) to disk.
282    ///
283    /// This is called by the `run` command immediately after the supervisor
284    /// process is spawned, so `pid` is the supervisor's PID. The child process
285    /// PID and, on Windows, the Job Object name are not yet known at this point.
286    ///
287    /// On Windows, the Job Object name is derived deterministically from the
288    /// job_id as `"AgentExec-{job_id}"`. This name is written immediately to
289    /// `state.json` so that callers reading state after `run` returns can
290    /// always find the Job Object identifier, without waiting for the supervisor
291    /// to perform its first `write_state` call. The supervisor will confirm the
292    /// same name (or update to `failed`) after it successfully assigns the child
293    /// process to the named Job Object.
294    pub fn init_state(&self, pid: u32, started_at: &str) -> Result<JobState> {
295        #[cfg(windows)]
296        let windows_job_name = Some(format!("AgentExec-{}", self.job_id));
297        #[cfg(not(windows))]
298        let windows_job_name: Option<String> = None;
299
300        let state = JobState {
301            job: crate::schema::JobStateJob {
302                id: self.job_id.clone(),
303                status: JobStatus::Running,
304                started_at: Some(started_at.to_string()),
305            },
306            result: crate::schema::JobStateResult {
307                exit_code: None,
308                signal: None,
309                duration_ms: None,
310            },
311            pid: Some(pid),
312            finished_at: None,
313            updated_at: crate::run::now_rfc3339_pub(),
314            windows_job_name,
315        };
316        self.write_state(&state)?;
317        Ok(state)
318    }
319}
320
321/// Write `contents` to `target` atomically by writing to a temp file in the
322/// same directory and then renaming. This prevents readers from observing a
323/// partially-written file.
324fn write_atomic(dir: &std::path::Path, target: &std::path::Path, contents: &[u8]) -> Result<()> {
325    use std::io::Write;
326
327    // Create a named temporary file in the same directory so that rename is
328    // always on the same filesystem (required for atomic rename on POSIX).
329    let mut tmp = tempfile::Builder::new()
330        .prefix(".tmp-")
331        .tempfile_in(dir)
332        .with_context(|| format!("create temp file in {}", dir.display()))?;
333
334    tmp.write_all(contents)
335        .with_context(|| format!("write temp file for {}", target.display()))?;
336
337    // Persist moves the temp file to the target path atomically.
338    tmp.persist(target)
339        .map_err(|e| e.error)
340        .with_context(|| format!("rename temp file to {}", target.display()))?;
341
342    Ok(())
343}
344
345// ---------- Unit tests ----------
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350
351    /// Global mutex to serialize tests that mutate process-wide environment variables.
352    ///
353    /// Rust runs tests in parallel by default; any test that calls `set_var` /
354    /// `remove_var` must hold this lock for the duration of the test so that
355    /// other env-reading tests do not observe a half-mutated environment.
356    static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
357
358    #[test]
359    fn resolve_root_cli_flag_wins() {
360        // CLI flag does not depend on environment variables; no lock needed.
361        let root = resolve_root(Some("/tmp/my-root"));
362        assert_eq!(root, PathBuf::from("/tmp/my-root"));
363    }
364
365    #[test]
366    fn resolve_root_env_var() {
367        let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
368        // SAFETY: guarded by ENV_LOCK; no other env-mutating test runs concurrently.
369        unsafe {
370            std::env::set_var("AGENT_EXEC_ROOT", "/tmp/env-root");
371            // Also clear XDG to avoid interference.
372            std::env::remove_var("XDG_DATA_HOME");
373        }
374        // CLI flag is None, so env var should win.
375        let root = resolve_root(None);
376        // Restore.
377        unsafe {
378            std::env::remove_var("AGENT_EXEC_ROOT");
379        }
380        assert_eq!(root, PathBuf::from("/tmp/env-root"));
381    }
382
383    #[test]
384    fn resolve_root_xdg() {
385        let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
386        // SAFETY: guarded by ENV_LOCK; no other env-mutating test runs concurrently.
387        unsafe {
388            std::env::remove_var("AGENT_EXEC_ROOT");
389            std::env::set_var("XDG_DATA_HOME", "/tmp/xdg");
390        }
391        let root = resolve_root(None);
392        unsafe {
393            std::env::remove_var("XDG_DATA_HOME");
394        }
395        assert_eq!(root, PathBuf::from("/tmp/xdg/agent-exec/jobs"));
396    }
397
398    #[test]
399    fn resolve_root_default_contains_agent_exec() {
400        let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
401        // SAFETY: guarded by ENV_LOCK; no other env-mutating test runs concurrently.
402        unsafe {
403            std::env::remove_var("AGENT_EXEC_ROOT");
404            std::env::remove_var("XDG_DATA_HOME");
405        }
406        let root = resolve_root(None);
407        let root_str = root.to_string_lossy();
408        assert!(
409            root_str.contains("agent-exec"),
410            "expected agent-exec in path, got {root_str}"
411        );
412    }
413
414    // ---------- Job directory structure tests ----------
415
416    fn make_meta(job_id: &str, root: &std::path::Path) -> crate::schema::JobMeta {
417        crate::schema::JobMeta {
418            job: crate::schema::JobMetaJob {
419                id: job_id.to_string(),
420            },
421            schema_version: "0.1".to_string(),
422            command: vec!["echo".to_string(), "hello".to_string()],
423            created_at: "2024-01-01T00:00:00Z".to_string(),
424            root: root.display().to_string(),
425            env_keys: vec!["FOO".to_string()],
426            env_vars: vec![],
427            env_vars_runtime: vec![],
428            mask: vec![],
429            cwd: None,
430            notification: None,
431            tags: vec![],
432            inherit_env: true,
433            env_files: vec![],
434            timeout_ms: 0,
435            kill_after_ms: 0,
436            progress_every_ms: 0,
437            shell_wrapper: None,
438        }
439    }
440
441    /// Verify that job directory creation writes meta.json and the directory exists.
442    #[test]
443    fn job_dir_create_writes_meta_json() {
444        let tmp = tempfile::tempdir().unwrap();
445        let root = tmp.path();
446        let meta = make_meta("test-job-01", root);
447        let job_dir = JobDir::create(root, "test-job-01", &meta).unwrap();
448
449        // Directory must exist.
450        assert!(job_dir.path.is_dir(), "job directory was not created");
451
452        // meta.json must exist and be parseable.
453        assert!(job_dir.meta_path().exists(), "meta.json not found");
454        let loaded_meta = job_dir.read_meta().unwrap();
455        assert_eq!(loaded_meta.job_id(), "test-job-01");
456        assert_eq!(loaded_meta.command, vec!["echo", "hello"]);
457
458        // env_keys must contain key names only (not values).
459        assert_eq!(loaded_meta.env_keys, vec!["FOO"]);
460    }
461
462    /// Verify that meta.json does NOT contain env values (only keys).
463    #[test]
464    fn meta_json_env_keys_only_no_values() {
465        let tmp = tempfile::tempdir().unwrap();
466        let root = tmp.path();
467        let mut meta = make_meta("test-job-02", root);
468        // Simulate env_keys containing only key names (as would be extracted from KEY=VALUE pairs).
469        meta.env_keys = vec!["SECRET_KEY".to_string(), "API_TOKEN".to_string()];
470        let job_dir = JobDir::create(root, "test-job-02", &meta).unwrap();
471
472        // Read raw JSON to verify values are absent.
473        let raw = std::fs::read_to_string(job_dir.meta_path()).unwrap();
474        assert!(
475            !raw.contains("secret_value"),
476            "env value must not be stored in meta.json"
477        );
478        assert!(raw.contains("SECRET_KEY"), "env key must be stored");
479        assert!(raw.contains("API_TOKEN"), "env key must be stored");
480    }
481
482    /// Verify that state.json contains updated_at after write_state.
483    #[test]
484    fn state_json_contains_updated_at() {
485        let tmp = tempfile::tempdir().unwrap();
486        let root = tmp.path();
487        let meta = make_meta("test-job-03", root);
488        let job_dir = JobDir::create(root, "test-job-03", &meta).unwrap();
489
490        let state = crate::schema::JobState {
491            job: crate::schema::JobStateJob {
492                id: "test-job-03".to_string(),
493                status: crate::schema::JobStatus::Running,
494                started_at: Some("2024-01-01T00:00:00Z".to_string()),
495            },
496            result: crate::schema::JobStateResult {
497                exit_code: None,
498                signal: None,
499                duration_ms: None,
500            },
501            pid: Some(12345),
502            finished_at: None,
503            updated_at: "2024-01-01T00:00:01Z".to_string(),
504            windows_job_name: None,
505        };
506        job_dir.write_state(&state).unwrap();
507
508        // Read back and verify.
509        assert!(job_dir.state_path().exists(), "state.json not found");
510        let loaded = job_dir.read_state().unwrap();
511        assert_eq!(loaded.updated_at, "2024-01-01T00:00:01Z");
512        assert_eq!(loaded.job_id(), "test-job-03");
513
514        // Also verify the raw JSON contains the updated_at field.
515        let raw = std::fs::read_to_string(job_dir.state_path()).unwrap();
516        assert!(
517            raw.contains("updated_at"),
518            "updated_at field missing from state.json"
519        );
520    }
521
522    /// Verify that write_state uses atomic write (temp file + rename).
523    /// We verify this indirectly: the file must not be corrupted even if we
524    /// call write_state multiple times rapidly.
525    #[test]
526    fn state_json_atomic_write_no_corruption() {
527        let tmp = tempfile::tempdir().unwrap();
528        let root = tmp.path();
529        let meta = make_meta("test-job-04", root);
530        let job_dir = JobDir::create(root, "test-job-04", &meta).unwrap();
531
532        for i in 0..10 {
533            let state = crate::schema::JobState {
534                job: crate::schema::JobStateJob {
535                    id: "test-job-04".to_string(),
536                    status: crate::schema::JobStatus::Running,
537                    started_at: Some("2024-01-01T00:00:00Z".to_string()),
538                },
539                result: crate::schema::JobStateResult {
540                    exit_code: None,
541                    signal: None,
542                    duration_ms: None,
543                },
544                pid: Some(100 + i),
545                finished_at: None,
546                updated_at: format!("2024-01-01T00:00:{:02}Z", i),
547                windows_job_name: None,
548            };
549            job_dir.write_state(&state).unwrap();
550
551            // Each read must produce valid JSON (no corruption).
552            let loaded = job_dir.read_state().unwrap();
553            assert_eq!(
554                loaded.pid,
555                Some(100 + i),
556                "state corrupted at iteration {i}"
557            );
558        }
559    }
560
561    /// Verify that meta.json atomic write works correctly.
562    #[test]
563    fn meta_json_atomic_write() {
564        let tmp = tempfile::tempdir().unwrap();
565        let root = tmp.path();
566        let meta = make_meta("test-job-05", root);
567        let job_dir = JobDir::create(root, "test-job-05", &meta).unwrap();
568
569        // Re-write meta atomically.
570        let updated_meta = crate::schema::JobMeta {
571            job: crate::schema::JobMetaJob {
572                id: "test-job-05".to_string(),
573            },
574            schema_version: "0.1".to_string(),
575            command: vec!["ls".to_string()],
576            created_at: "2024-06-01T12:00:00Z".to_string(),
577            root: root.display().to_string(),
578            env_keys: vec!["PATH".to_string()],
579            env_vars: vec![],
580            env_vars_runtime: vec![],
581            mask: vec![],
582            cwd: None,
583            notification: None,
584            tags: vec![],
585            inherit_env: true,
586            env_files: vec![],
587            timeout_ms: 0,
588            kill_after_ms: 0,
589            progress_every_ms: 0,
590            shell_wrapper: None,
591        };
592        job_dir.write_meta_atomic(&updated_meta).unwrap();
593
594        let loaded = job_dir.read_meta().unwrap();
595        assert_eq!(loaded.command, vec!["ls"]);
596        assert_eq!(loaded.created_at, "2024-06-01T12:00:00Z");
597    }
598
599    /// On non-Windows platforms, `init_state` must write `windows_job_name: None`
600    /// (the field is omitted from JSON via `skip_serializing_if`).
601    /// On Windows, `init_state` must write the deterministic Job Object name
602    /// `"AgentExec-{job_id}"` so that `state.json` always contains the identifier
603    /// immediately after `run` returns, without waiting for the supervisor update.
604    #[test]
605    fn init_state_writes_deterministic_job_name_on_windows() {
606        let tmp = tempfile::tempdir().unwrap();
607        let root = tmp.path();
608        let job_id = "01TESTJOBID0000000000000";
609        let meta = make_meta(job_id, root);
610        let job_dir = JobDir::create(root, job_id, &meta).unwrap();
611        let state = job_dir.init_state(1234, "2024-01-01T00:00:00Z").unwrap();
612
613        // Verify in-memory state.
614        #[cfg(windows)]
615        assert_eq!(
616            state.windows_job_name.as_deref(),
617            Some("AgentExec-01TESTJOBID0000000000000"),
618            "Windows: init_state must set deterministic job name immediately"
619        );
620        #[cfg(not(windows))]
621        assert_eq!(
622            state.windows_job_name, None,
623            "non-Windows: init_state must not set windows_job_name"
624        );
625
626        // Verify persisted state on disk.
627        let persisted = job_dir.read_state().unwrap();
628        #[cfg(windows)]
629        assert_eq!(
630            persisted.windows_job_name.as_deref(),
631            Some("AgentExec-01TESTJOBID0000000000000"),
632            "Windows: persisted state.json must contain windows_job_name"
633        );
634        #[cfg(not(windows))]
635        assert_eq!(
636            persisted.windows_job_name, None,
637            "non-Windows: persisted state.json must not contain windows_job_name"
638        );
639    }
640}