Skip to main content

agent_exec/
jobstore.rs

1//! Job directory management for agent-exec v0.1.
2//!
3//! Resolution order for the jobs root:
4//!   1. `--root` CLI flag
5//!   2. `AGENT_EXEC_ROOT` environment variable
6//!   3. `$XDG_DATA_HOME/agent-exec/jobs`
7//!   4. `~/.local/share/agent-exec/jobs`
8
9use anyhow::{Context, Result};
10use directories::BaseDirs;
11use std::path::PathBuf;
12
13use crate::schema::{JobMeta, JobState, JobStatus};
14
15/// Sentinel error type to distinguish "job not found" from other I/O errors.
16/// Used by callers to emit `error.code = "job_not_found"` instead of `internal_error`.
17#[derive(Debug)]
18pub struct JobNotFound(pub String);
19
20impl std::fmt::Display for JobNotFound {
21    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        write!(f, "job not found: {}", self.0)
23    }
24}
25
26impl std::error::Error for JobNotFound {}
27
28/// Resolve the jobs root directory following the priority chain.
29pub fn resolve_root(cli_root: Option<&str>) -> PathBuf {
30    // 1. CLI flag
31    if let Some(root) = cli_root {
32        return PathBuf::from(root);
33    }
34
35    // 2. Environment variable
36    if let Ok(root) = std::env::var("AGENT_EXEC_ROOT")
37        && !root.is_empty()
38    {
39        return PathBuf::from(root);
40    }
41
42    // 3. XDG_DATA_HOME
43    if let Ok(xdg) = std::env::var("XDG_DATA_HOME")
44        && !xdg.is_empty()
45    {
46        return PathBuf::from(xdg).join("agent-exec").join("jobs");
47    }
48
49    // 4. Default: ~/.local/share/agent-exec/jobs
50    //    (On Windows use data_local_dir() as base)
51    if let Some(base_dirs) = BaseDirs::new() {
52        #[cfg(windows)]
53        let base = base_dirs.data_local_dir().to_path_buf();
54        #[cfg(not(windows))]
55        let base = base_dirs.home_dir().join(".local").join("share");
56        return base.join("agent-exec").join("jobs");
57    }
58
59    // Fallback if directories crate returns None
60    PathBuf::from("~/.local/share/agent-exec/jobs")
61}
62
63/// Metrics returned by [`JobDir::read_tail_metrics`].
64///
65/// Bundles the tail content together with the byte counts used in the
66/// `run` snapshot and `tail` JSON responses, so that both callers share
67/// the same calculation logic.
68pub struct TailMetrics {
69    /// The tail text (lossy UTF-8, last N lines / max_bytes).
70    pub tail: String,
71    /// Whether the content was truncated by bytes or lines constraints.
72    pub truncated: bool,
73    /// Total file size in bytes (0 if the file does not exist).
74    pub observed_bytes: u64,
75    /// Number of bytes included in `tail`.
76    pub included_bytes: u64,
77}
78
79/// Handle to a specific job's directory.
80pub struct JobDir {
81    pub path: PathBuf,
82    pub job_id: String,
83}
84
85impl JobDir {
86    /// Open an existing job directory by ID.
87    ///
88    /// Returns `Err` wrapping `JobNotFound` when the directory does not exist,
89    /// so callers can emit `error.code = "job_not_found"` rather than `internal_error`.
90    pub fn open(root: &std::path::Path, job_id: &str) -> Result<Self> {
91        let path = root.join(job_id);
92        if !path.exists() {
93            return Err(anyhow::Error::new(JobNotFound(job_id.to_string())));
94        }
95        Ok(JobDir {
96            path,
97            job_id: job_id.to_string(),
98        })
99    }
100
101    /// Create a new job directory and write `meta.json` atomically.
102    pub fn create(root: &std::path::Path, job_id: &str, meta: &JobMeta) -> Result<Self> {
103        let path = root.join(job_id);
104        std::fs::create_dir_all(&path)
105            .with_context(|| format!("create job dir {}", path.display()))?;
106
107        let job_dir = JobDir {
108            path,
109            job_id: job_id.to_string(),
110        };
111
112        job_dir.write_meta_atomic(meta)?;
113
114        Ok(job_dir)
115    }
116
117    pub fn meta_path(&self) -> PathBuf {
118        self.path.join("meta.json")
119    }
120    pub fn state_path(&self) -> PathBuf {
121        self.path.join("state.json")
122    }
123    pub fn stdout_path(&self) -> PathBuf {
124        self.path.join("stdout.log")
125    }
126    pub fn stderr_path(&self) -> PathBuf {
127        self.path.join("stderr.log")
128    }
129    pub fn full_log_path(&self) -> PathBuf {
130        self.path.join("full.log")
131    }
132
133    pub fn read_meta(&self) -> Result<JobMeta> {
134        let raw = std::fs::read(self.meta_path())?;
135        Ok(serde_json::from_slice(&raw)?)
136    }
137
138    pub fn read_state(&self) -> Result<JobState> {
139        let raw = std::fs::read(self.state_path())?;
140        Ok(serde_json::from_slice(&raw)?)
141    }
142
143    /// Write `meta.json` atomically: write to a temp file then rename.
144    pub fn write_meta_atomic(&self, meta: &JobMeta) -> Result<()> {
145        let target = self.meta_path();
146        let contents = serde_json::to_string_pretty(meta)?;
147        write_atomic(&self.path, &target, contents.as_bytes())?;
148        Ok(())
149    }
150
151    /// Write `state.json` atomically: write to a temp file then rename.
152    pub fn write_state(&self, state: &JobState) -> Result<()> {
153        let target = self.state_path();
154        let contents = serde_json::to_string_pretty(state)?;
155        write_atomic(&self.path, &target, contents.as_bytes())?;
156        Ok(())
157    }
158
159    /// Read the last `max_bytes` of a log file, returning lossy UTF-8.
160    pub fn tail_log(&self, filename: &str, tail_lines: u64, max_bytes: u64) -> String {
161        self.tail_log_with_truncated(filename, tail_lines, max_bytes)
162            .0
163    }
164
165    /// Read the last `max_bytes` of a log file, returning (content, truncated).
166    /// `truncated` is true when the content was cut by bytes or lines constraints.
167    pub fn tail_log_with_truncated(
168        &self,
169        filename: &str,
170        tail_lines: u64,
171        max_bytes: u64,
172    ) -> (String, bool) {
173        let path = self.path.join(filename);
174        let Ok(data) = std::fs::read(&path) else {
175            return (String::new(), false);
176        };
177
178        // Truncate to max_bytes from the end.
179        let byte_truncated = data.len() as u64 > max_bytes;
180        let start = if byte_truncated {
181            (data.len() as u64 - max_bytes) as usize
182        } else {
183            0
184        };
185        let slice = &data[start..];
186
187        // Lossy UTF-8 decode.
188        let text = String::from_utf8_lossy(slice);
189
190        // Keep only the last tail_lines.
191        if tail_lines == 0 {
192            return (text.into_owned(), byte_truncated);
193        }
194        let lines: Vec<&str> = text.lines().collect();
195        let skip = lines.len().saturating_sub(tail_lines as usize);
196        let line_truncated = skip > 0;
197        (lines[skip..].join("\n"), byte_truncated || line_truncated)
198    }
199
200    /// Read tail content and byte metrics for a single log file.
201    ///
202    /// Returns a [`TailMetrics`] that bundles the tail text, truncation flag,
203    /// observed file size, and included byte count.  Both `run`'s snapshot
204    /// generation and `tail`'s JSON generation use this helper so that the
205    /// metric calculation is defined in exactly one place.
206    ///
207    /// `encoding` is always `"utf-8-lossy"` (as required by the contract).
208    pub fn read_tail_metrics(
209        &self,
210        filename: &str,
211        tail_lines: u64,
212        max_bytes: u64,
213    ) -> TailMetrics {
214        let (tail, truncated) = self.tail_log_with_truncated(filename, tail_lines, max_bytes);
215        let included_bytes = tail.len() as u64;
216        let observed_bytes = std::fs::metadata(self.path.join(filename))
217            .map(|m| m.len())
218            .unwrap_or(0);
219        TailMetrics {
220            tail,
221            truncated,
222            observed_bytes,
223            included_bytes,
224        }
225    }
226
227    /// Write the initial JobState (running, supervisor PID) to disk.
228    ///
229    /// This is called by the `run` command immediately after the supervisor
230    /// process is spawned, so `pid` is the supervisor's PID. The child process
231    /// PID and, on Windows, the Job Object name are not yet known at this point.
232    ///
233    /// On Windows, the Job Object name is derived deterministically from the
234    /// job_id as `"AgentExec-{job_id}"`. This name is written immediately to
235    /// `state.json` so that callers reading state after `run` returns can
236    /// always find the Job Object identifier, without waiting for the supervisor
237    /// to perform its first `write_state` call. The supervisor will confirm the
238    /// same name (or update to `failed`) after it successfully assigns the child
239    /// process to the named Job Object.
240    pub fn init_state(&self, pid: u32, started_at: &str) -> Result<JobState> {
241        #[cfg(windows)]
242        let windows_job_name = Some(format!("AgentExec-{}", self.job_id));
243        #[cfg(not(windows))]
244        let windows_job_name: Option<String> = None;
245
246        let state = JobState {
247            job: crate::schema::JobStateJob {
248                id: self.job_id.clone(),
249                status: JobStatus::Running,
250                started_at: started_at.to_string(),
251            },
252            result: crate::schema::JobStateResult {
253                exit_code: None,
254                signal: None,
255                duration_ms: None,
256            },
257            pid: Some(pid),
258            finished_at: None,
259            updated_at: crate::run::now_rfc3339_pub(),
260            windows_job_name,
261        };
262        self.write_state(&state)?;
263        Ok(state)
264    }
265}
266
267/// Write `contents` to `target` atomically by writing to a temp file in the
268/// same directory and then renaming. This prevents readers from observing a
269/// partially-written file.
270fn write_atomic(dir: &std::path::Path, target: &std::path::Path, contents: &[u8]) -> Result<()> {
271    use std::io::Write;
272
273    // Create a named temporary file in the same directory so that rename is
274    // always on the same filesystem (required for atomic rename on POSIX).
275    let mut tmp = tempfile::Builder::new()
276        .prefix(".tmp-")
277        .tempfile_in(dir)
278        .with_context(|| format!("create temp file in {}", dir.display()))?;
279
280    tmp.write_all(contents)
281        .with_context(|| format!("write temp file for {}", target.display()))?;
282
283    // Persist moves the temp file to the target path atomically.
284    tmp.persist(target)
285        .map_err(|e| e.error)
286        .with_context(|| format!("rename temp file to {}", target.display()))?;
287
288    Ok(())
289}
290
291// ---------- Unit tests ----------
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296
297    /// Global mutex to serialize tests that mutate process-wide environment variables.
298    ///
299    /// Rust runs tests in parallel by default; any test that calls `set_var` /
300    /// `remove_var` must hold this lock for the duration of the test so that
301    /// other env-reading tests do not observe a half-mutated environment.
302    static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
303
304    #[test]
305    fn resolve_root_cli_flag_wins() {
306        // CLI flag does not depend on environment variables; no lock needed.
307        let root = resolve_root(Some("/tmp/my-root"));
308        assert_eq!(root, PathBuf::from("/tmp/my-root"));
309    }
310
311    #[test]
312    fn resolve_root_env_var() {
313        let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
314        // SAFETY: guarded by ENV_LOCK; no other env-mutating test runs concurrently.
315        unsafe {
316            std::env::set_var("AGENT_EXEC_ROOT", "/tmp/env-root");
317            // Also clear XDG to avoid interference.
318            std::env::remove_var("XDG_DATA_HOME");
319        }
320        // CLI flag is None, so env var should win.
321        let root = resolve_root(None);
322        // Restore.
323        unsafe {
324            std::env::remove_var("AGENT_EXEC_ROOT");
325        }
326        assert_eq!(root, PathBuf::from("/tmp/env-root"));
327    }
328
329    #[test]
330    fn resolve_root_xdg() {
331        let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
332        // SAFETY: guarded by ENV_LOCK; no other env-mutating test runs concurrently.
333        unsafe {
334            std::env::remove_var("AGENT_EXEC_ROOT");
335            std::env::set_var("XDG_DATA_HOME", "/tmp/xdg");
336        }
337        let root = resolve_root(None);
338        unsafe {
339            std::env::remove_var("XDG_DATA_HOME");
340        }
341        assert_eq!(root, PathBuf::from("/tmp/xdg/agent-exec/jobs"));
342    }
343
344    #[test]
345    fn resolve_root_default_contains_agent_exec() {
346        let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
347        // SAFETY: guarded by ENV_LOCK; no other env-mutating test runs concurrently.
348        unsafe {
349            std::env::remove_var("AGENT_EXEC_ROOT");
350            std::env::remove_var("XDG_DATA_HOME");
351        }
352        let root = resolve_root(None);
353        let root_str = root.to_string_lossy();
354        assert!(
355            root_str.contains("agent-exec"),
356            "expected agent-exec in path, got {root_str}"
357        );
358    }
359
360    // ---------- Job directory structure tests ----------
361
362    fn make_meta(job_id: &str, root: &std::path::Path) -> crate::schema::JobMeta {
363        crate::schema::JobMeta {
364            job: crate::schema::JobMetaJob {
365                id: job_id.to_string(),
366            },
367            schema_version: "0.1".to_string(),
368            command: vec!["echo".to_string(), "hello".to_string()],
369            created_at: "2024-01-01T00:00:00Z".to_string(),
370            root: root.display().to_string(),
371            env_keys: vec!["FOO".to_string()],
372            env_vars: vec![],
373            mask: vec![],
374            cwd: None,
375        }
376    }
377
378    /// Verify that job directory creation writes meta.json and the directory exists.
379    #[test]
380    fn job_dir_create_writes_meta_json() {
381        let tmp = tempfile::tempdir().unwrap();
382        let root = tmp.path();
383        let meta = make_meta("test-job-01", root);
384        let job_dir = JobDir::create(root, "test-job-01", &meta).unwrap();
385
386        // Directory must exist.
387        assert!(job_dir.path.is_dir(), "job directory was not created");
388
389        // meta.json must exist and be parseable.
390        assert!(job_dir.meta_path().exists(), "meta.json not found");
391        let loaded_meta = job_dir.read_meta().unwrap();
392        assert_eq!(loaded_meta.job_id(), "test-job-01");
393        assert_eq!(loaded_meta.command, vec!["echo", "hello"]);
394
395        // env_keys must contain key names only (not values).
396        assert_eq!(loaded_meta.env_keys, vec!["FOO"]);
397    }
398
399    /// Verify that meta.json does NOT contain env values (only keys).
400    #[test]
401    fn meta_json_env_keys_only_no_values() {
402        let tmp = tempfile::tempdir().unwrap();
403        let root = tmp.path();
404        let mut meta = make_meta("test-job-02", root);
405        // Simulate env_keys containing only key names (as would be extracted from KEY=VALUE pairs).
406        meta.env_keys = vec!["SECRET_KEY".to_string(), "API_TOKEN".to_string()];
407        let job_dir = JobDir::create(root, "test-job-02", &meta).unwrap();
408
409        // Read raw JSON to verify values are absent.
410        let raw = std::fs::read_to_string(job_dir.meta_path()).unwrap();
411        assert!(
412            !raw.contains("secret_value"),
413            "env value must not be stored in meta.json"
414        );
415        assert!(raw.contains("SECRET_KEY"), "env key must be stored");
416        assert!(raw.contains("API_TOKEN"), "env key must be stored");
417    }
418
419    /// Verify that state.json contains updated_at after write_state.
420    #[test]
421    fn state_json_contains_updated_at() {
422        let tmp = tempfile::tempdir().unwrap();
423        let root = tmp.path();
424        let meta = make_meta("test-job-03", root);
425        let job_dir = JobDir::create(root, "test-job-03", &meta).unwrap();
426
427        let state = crate::schema::JobState {
428            job: crate::schema::JobStateJob {
429                id: "test-job-03".to_string(),
430                status: crate::schema::JobStatus::Running,
431                started_at: "2024-01-01T00:00:00Z".to_string(),
432            },
433            result: crate::schema::JobStateResult {
434                exit_code: None,
435                signal: None,
436                duration_ms: None,
437            },
438            pid: Some(12345),
439            finished_at: None,
440            updated_at: "2024-01-01T00:00:01Z".to_string(),
441            windows_job_name: None,
442        };
443        job_dir.write_state(&state).unwrap();
444
445        // Read back and verify.
446        assert!(job_dir.state_path().exists(), "state.json not found");
447        let loaded = job_dir.read_state().unwrap();
448        assert_eq!(loaded.updated_at, "2024-01-01T00:00:01Z");
449        assert_eq!(loaded.job_id(), "test-job-03");
450
451        // Also verify the raw JSON contains the updated_at field.
452        let raw = std::fs::read_to_string(job_dir.state_path()).unwrap();
453        assert!(
454            raw.contains("updated_at"),
455            "updated_at field missing from state.json"
456        );
457    }
458
459    /// Verify that write_state uses atomic write (temp file + rename).
460    /// We verify this indirectly: the file must not be corrupted even if we
461    /// call write_state multiple times rapidly.
462    #[test]
463    fn state_json_atomic_write_no_corruption() {
464        let tmp = tempfile::tempdir().unwrap();
465        let root = tmp.path();
466        let meta = make_meta("test-job-04", root);
467        let job_dir = JobDir::create(root, "test-job-04", &meta).unwrap();
468
469        for i in 0..10 {
470            let state = crate::schema::JobState {
471                job: crate::schema::JobStateJob {
472                    id: "test-job-04".to_string(),
473                    status: crate::schema::JobStatus::Running,
474                    started_at: "2024-01-01T00:00:00Z".to_string(),
475                },
476                result: crate::schema::JobStateResult {
477                    exit_code: None,
478                    signal: None,
479                    duration_ms: None,
480                },
481                pid: Some(100 + i),
482                finished_at: None,
483                updated_at: format!("2024-01-01T00:00:{:02}Z", i),
484                windows_job_name: None,
485            };
486            job_dir.write_state(&state).unwrap();
487
488            // Each read must produce valid JSON (no corruption).
489            let loaded = job_dir.read_state().unwrap();
490            assert_eq!(
491                loaded.pid,
492                Some(100 + i),
493                "state corrupted at iteration {i}"
494            );
495        }
496    }
497
498    /// Verify that meta.json atomic write works correctly.
499    #[test]
500    fn meta_json_atomic_write() {
501        let tmp = tempfile::tempdir().unwrap();
502        let root = tmp.path();
503        let meta = make_meta("test-job-05", root);
504        let job_dir = JobDir::create(root, "test-job-05", &meta).unwrap();
505
506        // Re-write meta atomically.
507        let updated_meta = crate::schema::JobMeta {
508            job: crate::schema::JobMetaJob {
509                id: "test-job-05".to_string(),
510            },
511            schema_version: "0.1".to_string(),
512            command: vec!["ls".to_string()],
513            created_at: "2024-06-01T12:00:00Z".to_string(),
514            root: root.display().to_string(),
515            env_keys: vec!["PATH".to_string()],
516            env_vars: vec![],
517            mask: vec![],
518            cwd: None,
519        };
520        job_dir.write_meta_atomic(&updated_meta).unwrap();
521
522        let loaded = job_dir.read_meta().unwrap();
523        assert_eq!(loaded.command, vec!["ls"]);
524        assert_eq!(loaded.created_at, "2024-06-01T12:00:00Z");
525    }
526
527    /// On non-Windows platforms, `init_state` must write `windows_job_name: None`
528    /// (the field is omitted from JSON via `skip_serializing_if`).
529    /// On Windows, `init_state` must write the deterministic Job Object name
530    /// `"AgentExec-{job_id}"` so that `state.json` always contains the identifier
531    /// immediately after `run` returns, without waiting for the supervisor update.
532    #[test]
533    fn init_state_writes_deterministic_job_name_on_windows() {
534        let tmp = tempfile::tempdir().unwrap();
535        let root = tmp.path();
536        let job_id = "01TESTJOBID0000000000000";
537        let meta = make_meta(job_id, root);
538        let job_dir = JobDir::create(root, job_id, &meta).unwrap();
539        let state = job_dir.init_state(1234, "2024-01-01T00:00:00Z").unwrap();
540
541        // Verify in-memory state.
542        #[cfg(windows)]
543        assert_eq!(
544            state.windows_job_name.as_deref(),
545            Some("AgentExec-01TESTJOBID0000000000000"),
546            "Windows: init_state must set deterministic job name immediately"
547        );
548        #[cfg(not(windows))]
549        assert_eq!(
550            state.windows_job_name, None,
551            "non-Windows: init_state must not set windows_job_name"
552        );
553
554        // Verify persisted state on disk.
555        let persisted = job_dir.read_state().unwrap();
556        #[cfg(windows)]
557        assert_eq!(
558            persisted.windows_job_name.as_deref(),
559            Some("AgentExec-01TESTJOBID0000000000000"),
560            "Windows: persisted state.json must contain windows_job_name"
561        );
562        #[cfg(not(windows))]
563        assert_eq!(
564            persisted.windows_job_name, None,
565            "non-Windows: persisted state.json must not contain windows_job_name"
566        );
567    }
568}