Skip to main content

agent_exec/
jobstore.rs

1//! Job directory management for agent-exec v0.1.
2//!
3//! Resolution order for the jobs root:
4//!   1. `--root` CLI flag
5//!   2. `AGENT_EXEC_ROOT` environment variable
6//!   3. `$XDG_DATA_HOME/agent-exec/jobs`
7//!   4. `~/.local/share/agent-exec/jobs`
8
9use anyhow::{Context, Result};
10use directories::BaseDirs;
11use std::path::PathBuf;
12
13use crate::schema::{JobMeta, JobState, JobStatus};
14
15/// Sentinel error type to distinguish "job not found" from other I/O errors.
16/// Used by callers to emit `error.code = "job_not_found"` instead of `internal_error`.
17#[derive(Debug)]
18pub struct JobNotFound(pub String);
19
20impl std::fmt::Display for JobNotFound {
21    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        write!(f, "job not found: {}", self.0)
23    }
24}
25
26impl std::error::Error for JobNotFound {}
27
28/// Resolve the jobs root directory following the priority chain.
29pub fn resolve_root(cli_root: Option<&str>) -> PathBuf {
30    // 1. CLI flag
31    if let Some(root) = cli_root {
32        return PathBuf::from(root);
33    }
34
35    // 2. Environment variable
36    if let Ok(root) = std::env::var("AGENT_EXEC_ROOT")
37        && !root.is_empty()
38    {
39        return PathBuf::from(root);
40    }
41
42    // 3. XDG_DATA_HOME
43    if let Ok(xdg) = std::env::var("XDG_DATA_HOME")
44        && !xdg.is_empty()
45    {
46        return PathBuf::from(xdg).join("agent-exec").join("jobs");
47    }
48
49    // 4. Default: ~/.local/share/agent-exec/jobs
50    //    (On Windows use data_local_dir() as base)
51    if let Some(base_dirs) = BaseDirs::new() {
52        #[cfg(windows)]
53        let base = base_dirs.data_local_dir().to_path_buf();
54        #[cfg(not(windows))]
55        let base = base_dirs.home_dir().join(".local").join("share");
56        return base.join("agent-exec").join("jobs");
57    }
58
59    // Fallback if directories crate returns None
60    PathBuf::from("~/.local/share/agent-exec/jobs")
61}
62
63/// Metrics returned by [`JobDir::read_tail_metrics`].
64///
65/// Bundles the tail content together with the byte counts used in the
66/// `run` snapshot and `tail` JSON responses, so that both callers share
67/// the same calculation logic.
68pub struct TailMetrics {
69    /// The tail text (lossy UTF-8, last N lines / max_bytes).
70    pub tail: String,
71    /// Whether the content was truncated by bytes or lines constraints.
72    pub truncated: bool,
73    /// Total file size in bytes (0 if the file does not exist).
74    pub observed_bytes: u64,
75    /// Number of bytes included in `tail`.
76    pub included_bytes: u64,
77}
78
79/// Handle to a specific job's directory.
80pub struct JobDir {
81    pub path: PathBuf,
82    pub job_id: String,
83}
84
85impl JobDir {
86    /// Open an existing job directory by ID.
87    ///
88    /// Returns `Err` wrapping `JobNotFound` when the directory does not exist,
89    /// so callers can emit `error.code = "job_not_found"` rather than `internal_error`.
90    pub fn open(root: &std::path::Path, job_id: &str) -> Result<Self> {
91        let path = root.join(job_id);
92        if !path.exists() {
93            return Err(anyhow::Error::new(JobNotFound(job_id.to_string())));
94        }
95        Ok(JobDir {
96            path,
97            job_id: job_id.to_string(),
98        })
99    }
100
101    /// Create a new job directory and write `meta.json` atomically.
102    pub fn create(root: &std::path::Path, job_id: &str, meta: &JobMeta) -> Result<Self> {
103        let path = root.join(job_id);
104        std::fs::create_dir_all(&path)
105            .with_context(|| format!("create job dir {}", path.display()))?;
106
107        let job_dir = JobDir {
108            path,
109            job_id: job_id.to_string(),
110        };
111
112        job_dir.write_meta_atomic(meta)?;
113
114        Ok(job_dir)
115    }
116
117    pub fn meta_path(&self) -> PathBuf {
118        self.path.join("meta.json")
119    }
120    pub fn state_path(&self) -> PathBuf {
121        self.path.join("state.json")
122    }
123    pub fn stdout_path(&self) -> PathBuf {
124        self.path.join("stdout.log")
125    }
126    pub fn stderr_path(&self) -> PathBuf {
127        self.path.join("stderr.log")
128    }
129    pub fn full_log_path(&self) -> PathBuf {
130        self.path.join("full.log")
131    }
132    pub fn completion_event_path(&self) -> PathBuf {
133        self.path.join("completion_event.json")
134    }
135
136    /// Write `completion_event.json` atomically.
137    pub fn write_completion_event_atomic(
138        &self,
139        record: &crate::schema::CompletionEventRecord,
140    ) -> Result<()> {
141        let target = self.completion_event_path();
142        let contents = serde_json::to_string_pretty(record)?;
143        write_atomic(&self.path, &target, contents.as_bytes())?;
144        Ok(())
145    }
146
147    pub fn read_meta(&self) -> Result<JobMeta> {
148        let raw = std::fs::read(self.meta_path())?;
149        Ok(serde_json::from_slice(&raw)?)
150    }
151
152    pub fn read_state(&self) -> Result<JobState> {
153        let raw = std::fs::read(self.state_path())?;
154        Ok(serde_json::from_slice(&raw)?)
155    }
156
157    /// Write `meta.json` atomically: write to a temp file then rename.
158    pub fn write_meta_atomic(&self, meta: &JobMeta) -> Result<()> {
159        let target = self.meta_path();
160        let contents = serde_json::to_string_pretty(meta)?;
161        write_atomic(&self.path, &target, contents.as_bytes())?;
162        Ok(())
163    }
164
165    /// Write `state.json` atomically: write to a temp file then rename.
166    pub fn write_state(&self, state: &JobState) -> Result<()> {
167        let target = self.state_path();
168        let contents = serde_json::to_string_pretty(state)?;
169        write_atomic(&self.path, &target, contents.as_bytes())?;
170        Ok(())
171    }
172
173    /// Read the last `max_bytes` of a log file, returning lossy UTF-8.
174    pub fn tail_log(&self, filename: &str, tail_lines: u64, max_bytes: u64) -> String {
175        self.tail_log_with_truncated(filename, tail_lines, max_bytes)
176            .0
177    }
178
179    /// Read the last `max_bytes` of a log file, returning (content, truncated).
180    /// `truncated` is true when the content was cut by bytes or lines constraints.
181    pub fn tail_log_with_truncated(
182        &self,
183        filename: &str,
184        tail_lines: u64,
185        max_bytes: u64,
186    ) -> (String, bool) {
187        let path = self.path.join(filename);
188        let Ok(data) = std::fs::read(&path) else {
189            return (String::new(), false);
190        };
191
192        // Truncate to max_bytes from the end.
193        let byte_truncated = data.len() as u64 > max_bytes;
194        let start = if byte_truncated {
195            (data.len() as u64 - max_bytes) as usize
196        } else {
197            0
198        };
199        let slice = &data[start..];
200
201        // Lossy UTF-8 decode.
202        let text = String::from_utf8_lossy(slice);
203
204        // Keep only the last tail_lines.
205        if tail_lines == 0 {
206            return (text.into_owned(), byte_truncated);
207        }
208        let lines: Vec<&str> = text.lines().collect();
209        let skip = lines.len().saturating_sub(tail_lines as usize);
210        let line_truncated = skip > 0;
211        (lines[skip..].join("\n"), byte_truncated || line_truncated)
212    }
213
214    /// Read tail content and byte metrics for a single log file.
215    ///
216    /// Returns a [`TailMetrics`] that bundles the tail text, truncation flag,
217    /// observed file size, and included byte count.  Both `run`'s snapshot
218    /// generation and `tail`'s JSON generation use this helper so that the
219    /// metric calculation is defined in exactly one place.
220    ///
221    /// `encoding` is always `"utf-8-lossy"` (as required by the contract).
222    pub fn read_tail_metrics(
223        &self,
224        filename: &str,
225        tail_lines: u64,
226        max_bytes: u64,
227    ) -> TailMetrics {
228        let (tail, truncated) = self.tail_log_with_truncated(filename, tail_lines, max_bytes);
229        let included_bytes = tail.len() as u64;
230        let observed_bytes = std::fs::metadata(self.path.join(filename))
231            .map(|m| m.len())
232            .unwrap_or(0);
233        TailMetrics {
234            tail,
235            truncated,
236            observed_bytes,
237            included_bytes,
238        }
239    }
240
241    /// Write the initial JobState (running, supervisor PID) to disk.
242    ///
243    /// This is called by the `run` command immediately after the supervisor
244    /// process is spawned, so `pid` is the supervisor's PID. The child process
245    /// PID and, on Windows, the Job Object name are not yet known at this point.
246    ///
247    /// On Windows, the Job Object name is derived deterministically from the
248    /// job_id as `"AgentExec-{job_id}"`. This name is written immediately to
249    /// `state.json` so that callers reading state after `run` returns can
250    /// always find the Job Object identifier, without waiting for the supervisor
251    /// to perform its first `write_state` call. The supervisor will confirm the
252    /// same name (or update to `failed`) after it successfully assigns the child
253    /// process to the named Job Object.
254    pub fn init_state(&self, pid: u32, started_at: &str) -> Result<JobState> {
255        #[cfg(windows)]
256        let windows_job_name = Some(format!("AgentExec-{}", self.job_id));
257        #[cfg(not(windows))]
258        let windows_job_name: Option<String> = None;
259
260        let state = JobState {
261            job: crate::schema::JobStateJob {
262                id: self.job_id.clone(),
263                status: JobStatus::Running,
264                started_at: started_at.to_string(),
265            },
266            result: crate::schema::JobStateResult {
267                exit_code: None,
268                signal: None,
269                duration_ms: None,
270            },
271            pid: Some(pid),
272            finished_at: None,
273            updated_at: crate::run::now_rfc3339_pub(),
274            windows_job_name,
275        };
276        self.write_state(&state)?;
277        Ok(state)
278    }
279}
280
281/// Write `contents` to `target` atomically by writing to a temp file in the
282/// same directory and then renaming. This prevents readers from observing a
283/// partially-written file.
284fn write_atomic(dir: &std::path::Path, target: &std::path::Path, contents: &[u8]) -> Result<()> {
285    use std::io::Write;
286
287    // Create a named temporary file in the same directory so that rename is
288    // always on the same filesystem (required for atomic rename on POSIX).
289    let mut tmp = tempfile::Builder::new()
290        .prefix(".tmp-")
291        .tempfile_in(dir)
292        .with_context(|| format!("create temp file in {}", dir.display()))?;
293
294    tmp.write_all(contents)
295        .with_context(|| format!("write temp file for {}", target.display()))?;
296
297    // Persist moves the temp file to the target path atomically.
298    tmp.persist(target)
299        .map_err(|e| e.error)
300        .with_context(|| format!("rename temp file to {}", target.display()))?;
301
302    Ok(())
303}
304
305// ---------- Unit tests ----------
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310
311    /// Global mutex to serialize tests that mutate process-wide environment variables.
312    ///
313    /// Rust runs tests in parallel by default; any test that calls `set_var` /
314    /// `remove_var` must hold this lock for the duration of the test so that
315    /// other env-reading tests do not observe a half-mutated environment.
316    static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
317
318    #[test]
319    fn resolve_root_cli_flag_wins() {
320        // CLI flag does not depend on environment variables; no lock needed.
321        let root = resolve_root(Some("/tmp/my-root"));
322        assert_eq!(root, PathBuf::from("/tmp/my-root"));
323    }
324
325    #[test]
326    fn resolve_root_env_var() {
327        let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
328        // SAFETY: guarded by ENV_LOCK; no other env-mutating test runs concurrently.
329        unsafe {
330            std::env::set_var("AGENT_EXEC_ROOT", "/tmp/env-root");
331            // Also clear XDG to avoid interference.
332            std::env::remove_var("XDG_DATA_HOME");
333        }
334        // CLI flag is None, so env var should win.
335        let root = resolve_root(None);
336        // Restore.
337        unsafe {
338            std::env::remove_var("AGENT_EXEC_ROOT");
339        }
340        assert_eq!(root, PathBuf::from("/tmp/env-root"));
341    }
342
343    #[test]
344    fn resolve_root_xdg() {
345        let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
346        // SAFETY: guarded by ENV_LOCK; no other env-mutating test runs concurrently.
347        unsafe {
348            std::env::remove_var("AGENT_EXEC_ROOT");
349            std::env::set_var("XDG_DATA_HOME", "/tmp/xdg");
350        }
351        let root = resolve_root(None);
352        unsafe {
353            std::env::remove_var("XDG_DATA_HOME");
354        }
355        assert_eq!(root, PathBuf::from("/tmp/xdg/agent-exec/jobs"));
356    }
357
358    #[test]
359    fn resolve_root_default_contains_agent_exec() {
360        let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
361        // SAFETY: guarded by ENV_LOCK; no other env-mutating test runs concurrently.
362        unsafe {
363            std::env::remove_var("AGENT_EXEC_ROOT");
364            std::env::remove_var("XDG_DATA_HOME");
365        }
366        let root = resolve_root(None);
367        let root_str = root.to_string_lossy();
368        assert!(
369            root_str.contains("agent-exec"),
370            "expected agent-exec in path, got {root_str}"
371        );
372    }
373
374    // ---------- Job directory structure tests ----------
375
376    fn make_meta(job_id: &str, root: &std::path::Path) -> crate::schema::JobMeta {
377        crate::schema::JobMeta {
378            job: crate::schema::JobMetaJob {
379                id: job_id.to_string(),
380            },
381            schema_version: "0.1".to_string(),
382            command: vec!["echo".to_string(), "hello".to_string()],
383            created_at: "2024-01-01T00:00:00Z".to_string(),
384            root: root.display().to_string(),
385            env_keys: vec!["FOO".to_string()],
386            env_vars: vec![],
387            mask: vec![],
388            cwd: None,
389            notification: None,
390        }
391    }
392
393    /// Verify that job directory creation writes meta.json and the directory exists.
394    #[test]
395    fn job_dir_create_writes_meta_json() {
396        let tmp = tempfile::tempdir().unwrap();
397        let root = tmp.path();
398        let meta = make_meta("test-job-01", root);
399        let job_dir = JobDir::create(root, "test-job-01", &meta).unwrap();
400
401        // Directory must exist.
402        assert!(job_dir.path.is_dir(), "job directory was not created");
403
404        // meta.json must exist and be parseable.
405        assert!(job_dir.meta_path().exists(), "meta.json not found");
406        let loaded_meta = job_dir.read_meta().unwrap();
407        assert_eq!(loaded_meta.job_id(), "test-job-01");
408        assert_eq!(loaded_meta.command, vec!["echo", "hello"]);
409
410        // env_keys must contain key names only (not values).
411        assert_eq!(loaded_meta.env_keys, vec!["FOO"]);
412    }
413
414    /// Verify that meta.json does NOT contain env values (only keys).
415    #[test]
416    fn meta_json_env_keys_only_no_values() {
417        let tmp = tempfile::tempdir().unwrap();
418        let root = tmp.path();
419        let mut meta = make_meta("test-job-02", root);
420        // Simulate env_keys containing only key names (as would be extracted from KEY=VALUE pairs).
421        meta.env_keys = vec!["SECRET_KEY".to_string(), "API_TOKEN".to_string()];
422        let job_dir = JobDir::create(root, "test-job-02", &meta).unwrap();
423
424        // Read raw JSON to verify values are absent.
425        let raw = std::fs::read_to_string(job_dir.meta_path()).unwrap();
426        assert!(
427            !raw.contains("secret_value"),
428            "env value must not be stored in meta.json"
429        );
430        assert!(raw.contains("SECRET_KEY"), "env key must be stored");
431        assert!(raw.contains("API_TOKEN"), "env key must be stored");
432    }
433
434    /// Verify that state.json contains updated_at after write_state.
435    #[test]
436    fn state_json_contains_updated_at() {
437        let tmp = tempfile::tempdir().unwrap();
438        let root = tmp.path();
439        let meta = make_meta("test-job-03", root);
440        let job_dir = JobDir::create(root, "test-job-03", &meta).unwrap();
441
442        let state = crate::schema::JobState {
443            job: crate::schema::JobStateJob {
444                id: "test-job-03".to_string(),
445                status: crate::schema::JobStatus::Running,
446                started_at: "2024-01-01T00:00:00Z".to_string(),
447            },
448            result: crate::schema::JobStateResult {
449                exit_code: None,
450                signal: None,
451                duration_ms: None,
452            },
453            pid: Some(12345),
454            finished_at: None,
455            updated_at: "2024-01-01T00:00:01Z".to_string(),
456            windows_job_name: None,
457        };
458        job_dir.write_state(&state).unwrap();
459
460        // Read back and verify.
461        assert!(job_dir.state_path().exists(), "state.json not found");
462        let loaded = job_dir.read_state().unwrap();
463        assert_eq!(loaded.updated_at, "2024-01-01T00:00:01Z");
464        assert_eq!(loaded.job_id(), "test-job-03");
465
466        // Also verify the raw JSON contains the updated_at field.
467        let raw = std::fs::read_to_string(job_dir.state_path()).unwrap();
468        assert!(
469            raw.contains("updated_at"),
470            "updated_at field missing from state.json"
471        );
472    }
473
474    /// Verify that write_state uses atomic write (temp file + rename).
475    /// We verify this indirectly: the file must not be corrupted even if we
476    /// call write_state multiple times rapidly.
477    #[test]
478    fn state_json_atomic_write_no_corruption() {
479        let tmp = tempfile::tempdir().unwrap();
480        let root = tmp.path();
481        let meta = make_meta("test-job-04", root);
482        let job_dir = JobDir::create(root, "test-job-04", &meta).unwrap();
483
484        for i in 0..10 {
485            let state = crate::schema::JobState {
486                job: crate::schema::JobStateJob {
487                    id: "test-job-04".to_string(),
488                    status: crate::schema::JobStatus::Running,
489                    started_at: "2024-01-01T00:00:00Z".to_string(),
490                },
491                result: crate::schema::JobStateResult {
492                    exit_code: None,
493                    signal: None,
494                    duration_ms: None,
495                },
496                pid: Some(100 + i),
497                finished_at: None,
498                updated_at: format!("2024-01-01T00:00:{:02}Z", i),
499                windows_job_name: None,
500            };
501            job_dir.write_state(&state).unwrap();
502
503            // Each read must produce valid JSON (no corruption).
504            let loaded = job_dir.read_state().unwrap();
505            assert_eq!(
506                loaded.pid,
507                Some(100 + i),
508                "state corrupted at iteration {i}"
509            );
510        }
511    }
512
513    /// Verify that meta.json atomic write works correctly.
514    #[test]
515    fn meta_json_atomic_write() {
516        let tmp = tempfile::tempdir().unwrap();
517        let root = tmp.path();
518        let meta = make_meta("test-job-05", root);
519        let job_dir = JobDir::create(root, "test-job-05", &meta).unwrap();
520
521        // Re-write meta atomically.
522        let updated_meta = crate::schema::JobMeta {
523            job: crate::schema::JobMetaJob {
524                id: "test-job-05".to_string(),
525            },
526            schema_version: "0.1".to_string(),
527            command: vec!["ls".to_string()],
528            created_at: "2024-06-01T12:00:00Z".to_string(),
529            root: root.display().to_string(),
530            env_keys: vec!["PATH".to_string()],
531            env_vars: vec![],
532            mask: vec![],
533            cwd: None,
534            notification: None,
535        };
536        job_dir.write_meta_atomic(&updated_meta).unwrap();
537
538        let loaded = job_dir.read_meta().unwrap();
539        assert_eq!(loaded.command, vec!["ls"]);
540        assert_eq!(loaded.created_at, "2024-06-01T12:00:00Z");
541    }
542
543    /// On non-Windows platforms, `init_state` must write `windows_job_name: None`
544    /// (the field is omitted from JSON via `skip_serializing_if`).
545    /// On Windows, `init_state` must write the deterministic Job Object name
546    /// `"AgentExec-{job_id}"` so that `state.json` always contains the identifier
547    /// immediately after `run` returns, without waiting for the supervisor update.
548    #[test]
549    fn init_state_writes_deterministic_job_name_on_windows() {
550        let tmp = tempfile::tempdir().unwrap();
551        let root = tmp.path();
552        let job_id = "01TESTJOBID0000000000000";
553        let meta = make_meta(job_id, root);
554        let job_dir = JobDir::create(root, job_id, &meta).unwrap();
555        let state = job_dir.init_state(1234, "2024-01-01T00:00:00Z").unwrap();
556
557        // Verify in-memory state.
558        #[cfg(windows)]
559        assert_eq!(
560            state.windows_job_name.as_deref(),
561            Some("AgentExec-01TESTJOBID0000000000000"),
562            "Windows: init_state must set deterministic job name immediately"
563        );
564        #[cfg(not(windows))]
565        assert_eq!(
566            state.windows_job_name, None,
567            "non-Windows: init_state must not set windows_job_name"
568        );
569
570        // Verify persisted state on disk.
571        let persisted = job_dir.read_state().unwrap();
572        #[cfg(windows)]
573        assert_eq!(
574            persisted.windows_job_name.as_deref(),
575            Some("AgentExec-01TESTJOBID0000000000000"),
576            "Windows: persisted state.json must contain windows_job_name"
577        );
578        #[cfg(not(windows))]
579        assert_eq!(
580            persisted.windows_job_name, None,
581            "non-Windows: persisted state.json must not contain windows_job_name"
582        );
583    }
584}