Skip to main content

agent_exec/
jobstore.rs

1//! Job directory management for agent-exec v0.1.
2//!
3//! Resolution order for the jobs root:
4//!   1. `--root` CLI flag
5//!   2. `AGENT_EXEC_ROOT` environment variable
6//!   3. `$XDG_DATA_HOME/agent-exec/jobs`
7//!   4. `~/.local/share/agent-exec/jobs`
8
9use anyhow::{Context, Result};
10use directories::BaseDirs;
11use rand::RngCore;
12use std::path::{Path, PathBuf};
13
14use crate::schema::{JobMeta, JobState, JobStatus};
15
16/// Sentinel error type to distinguish "job not found" from other I/O errors.
17/// Used by callers to emit `error.code = "job_not_found"` instead of `internal_error`.
18#[derive(Debug)]
19pub struct JobNotFound(pub String);
20
21impl std::fmt::Display for JobNotFound {
22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        write!(f, "job not found: {}", self.0)
24    }
25}
26
27impl std::error::Error for JobNotFound {}
28
29/// Sentinel error type when a job ID prefix matches multiple job directories.
30/// Used by callers to emit `error.code = "ambiguous_job_id"` instead of `internal_error`.
31#[derive(Debug)]
32pub struct AmbiguousJobId {
33    pub prefix: String,
34    pub candidates: Vec<String>,
35}
36
37impl std::fmt::Display for AmbiguousJobId {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        write!(f, "ambiguous job ID prefix '{}': matches ", self.prefix)?;
40        if self.candidates.len() <= 20 {
41            write!(f, "{}", self.candidates.join(", "))
42        } else {
43            write!(
44                f,
45                "{}, ... and {} more",
46                self.candidates[..20].join(", "),
47                self.candidates.len() - 20
48            )
49        }
50    }
51}
52
53impl std::error::Error for AmbiguousJobId {}
54
55/// Sentinel error type when job ID generation exhausts all retry attempts.
56/// Used by callers to emit `error.code = "io_error"` instead of `internal_error`.
57#[derive(Debug)]
58pub struct JobIdCollisionExhausted {
59    pub attempts: usize,
60}
61
62impl std::fmt::Display for JobIdCollisionExhausted {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        write!(
65            f,
66            "job ID generation failed: {} consecutive collisions",
67            self.attempts
68        )
69    }
70}
71
72impl std::error::Error for JobIdCollisionExhausted {}
73
74/// Sentinel error type for invalid job state transitions.
75/// Used by callers to emit `error.code = "invalid_state"` instead of `internal_error`.
76#[derive(Debug)]
77pub struct InvalidJobState(pub String);
78
79impl std::fmt::Display for InvalidJobState {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        write!(f, "invalid job state: {}", self.0)
82    }
83}
84
85impl std::error::Error for InvalidJobState {}
86
87/// Resolve the jobs root directory following the priority chain.
88const JOB_ID_HEX_BYTES: usize = 16;
89const JOB_ID_LENGTH: usize = JOB_ID_HEX_BYTES * 2;
90pub const SHORT_JOB_ID_LENGTH: usize = 7;
91
92const MAX_JOB_ID_ATTEMPTS: usize = 16;
93
94/// Generate a new hash-like job ID (`[0-9a-f]`, fixed-length) that is unique under `root`.
95///
96/// The generator retries on directory-name collisions up to 16 times.
97pub fn generate_job_id(root: &Path) -> Result<String> {
98    generate_job_id_with_rng(root, &mut rand::thread_rng())
99}
100
101fn generate_job_id_with_rng(root: &Path, rng: &mut impl RngCore) -> Result<String> {
102    for _ in 0..MAX_JOB_ID_ATTEMPTS {
103        let mut bytes = [0u8; JOB_ID_HEX_BYTES];
104        rng.fill_bytes(&mut bytes);
105        let candidate = bytes.iter().map(|b| format!("{b:02x}")).collect::<String>();
106        debug_assert_eq!(candidate.len(), JOB_ID_LENGTH);
107
108        if !root.join(&candidate).exists() {
109            return Ok(candidate);
110        }
111    }
112    Err(anyhow::Error::new(JobIdCollisionExhausted {
113        attempts: MAX_JOB_ID_ATTEMPTS,
114    }))
115}
116
117/// Human-facing short job ID for list-like displays.
118pub fn short_job_id(job_id: &str) -> String {
119    job_id.chars().take(SHORT_JOB_ID_LENGTH).collect()
120}
121
122pub fn resolve_root(cli_root: Option<&str>) -> PathBuf {
123    // 1. CLI flag
124    if let Some(root) = cli_root {
125        return PathBuf::from(root);
126    }
127
128    // 2. Environment variable
129    if let Ok(root) = std::env::var("AGENT_EXEC_ROOT")
130        && !root.is_empty()
131    {
132        return PathBuf::from(root);
133    }
134
135    // 3. XDG_DATA_HOME
136    if let Ok(xdg) = std::env::var("XDG_DATA_HOME")
137        && !xdg.is_empty()
138    {
139        return PathBuf::from(xdg).join("agent-exec").join("jobs");
140    }
141
142    // 4. Default: ~/.local/share/agent-exec/jobs
143    //    (On Windows use data_local_dir() as base)
144    if let Some(base_dirs) = BaseDirs::new() {
145        #[cfg(windows)]
146        let base = base_dirs.data_local_dir().to_path_buf();
147        #[cfg(not(windows))]
148        let base = base_dirs.home_dir().join(".local").join("share");
149        return base.join("agent-exec").join("jobs");
150    }
151
152    // Fallback if directories crate returns None
153    PathBuf::from("~/.local/share/agent-exec/jobs")
154}
155
156/// Metrics returned by [`JobDir::read_tail_metrics`].
157///
158/// Bundles the tail content together with the raw byte ranges used in the
159/// `tail` JSON responses, so that callers share the same calculation logic.
160pub struct TailMetrics {
161    /// The tail text (lossy UTF-8, last N lines / max_bytes).
162    pub tail: String,
163    /// Total file size in bytes (0 if the file does not exist).
164    pub observed_bytes: u64,
165    /// Raw byte range [begin, end) represented by the returned text.
166    pub range: [u64; 2],
167}
168
169/// Metrics for the head slice of a log file.
170pub struct HeadMetrics {
171    /// The head text (lossy UTF-8, first max_bytes bytes).
172    pub head: String,
173    /// Total file size in bytes (0 if the file does not exist).
174    pub observed_bytes: u64,
175    /// Number of bytes included in `head`.
176    pub included_bytes: u64,
177    /// Raw byte range [begin, end) represented by the returned text.
178    pub range: [u64; 2],
179}
180
181/// Handle to a specific job's directory.
182#[derive(Debug)]
183pub struct JobDir {
184    pub path: PathBuf,
185    pub job_id: String,
186}
187
188impl JobDir {
189    /// Open an existing job directory by ID or unambiguous prefix.
190    ///
191    /// Resolution order:
192    /// 1. Exact match: if `root/<job_id>` exists, return it immediately (no scan).
193    /// 2. Prefix scan: scan `root/` for directories whose name starts with `job_id`.
194    ///    - 0 matches → `Err(JobNotFound)`
195    ///    - 1 match   → resolve to that job
196    ///    - 2+ matches → `Err(AmbiguousJobId)`
197    pub fn open(root: &std::path::Path, job_id: &str) -> Result<Self> {
198        Self::resolve_matching(root, job_id, |_| true)
199    }
200
201    /// Open an existing job directory by ID or unambiguous prefix, restricted to
202    /// jobs whose persisted state matches `predicate`.
203    pub fn open_matching<F>(root: &std::path::Path, job_id: &str, predicate: F) -> Result<Self>
204    where
205        F: Fn(&JobStatus) -> bool,
206    {
207        Self::resolve_matching(root, job_id, |job_dir| match job_dir.read_state() {
208            Ok(state) => predicate(state.status()),
209            Err(_) => false,
210        })
211    }
212
213    fn resolve_matching<F>(root: &std::path::Path, job_id: &str, predicate: F) -> Result<Self>
214    where
215        F: Fn(&JobDir) -> bool,
216    {
217        let path = root.join(job_id);
218        if path.is_dir() {
219            let job_dir = JobDir {
220                path,
221                job_id: job_id.to_string(),
222            };
223            if predicate(&job_dir) {
224                return Ok(job_dir);
225            }
226            return Err(anyhow::Error::new(JobNotFound(job_id.to_string())));
227        }
228
229        let mut candidates: Vec<String> = std::fs::read_dir(root)
230            .into_iter()
231            .flatten()
232            .flatten()
233            .filter_map(|entry| {
234                let name = entry.file_name().to_string_lossy().into_owned();
235                if !name.starts_with(job_id) || !entry.path().is_dir() {
236                    return None;
237                }
238                let job_dir = JobDir {
239                    path: entry.path(),
240                    job_id: name.clone(),
241                };
242                if predicate(&job_dir) {
243                    Some(name)
244                } else {
245                    None
246                }
247            })
248            .collect();
249
250        match candidates.len() {
251            0 => Err(anyhow::Error::new(JobNotFound(job_id.to_string()))),
252            1 => {
253                let resolved = candidates.remove(0);
254                let path = root.join(&resolved);
255                Ok(JobDir {
256                    path,
257                    job_id: resolved,
258                })
259            }
260            _ => {
261                candidates.sort();
262                Err(anyhow::Error::new(AmbiguousJobId {
263                    prefix: job_id.to_string(),
264                    candidates,
265                }))
266            }
267        }
268    }
269
270    /// Create a new job directory and write `meta.json` atomically.
271    pub fn create(root: &std::path::Path, job_id: &str, meta: &JobMeta) -> Result<Self> {
272        let path = root.join(job_id);
273        std::fs::create_dir_all(&path)
274            .with_context(|| format!("create job dir {}", path.display()))?;
275
276        let job_dir = JobDir {
277            path,
278            job_id: job_id.to_string(),
279        };
280
281        job_dir.write_meta_atomic(meta)?;
282
283        Ok(job_dir)
284    }
285
286    pub fn meta_path(&self) -> PathBuf {
287        self.path.join("meta.json")
288    }
289    pub fn state_path(&self) -> PathBuf {
290        self.path.join("state.json")
291    }
292    pub fn stdout_path(&self) -> PathBuf {
293        self.path.join("stdout.log")
294    }
295    pub fn stderr_path(&self) -> PathBuf {
296        self.path.join("stderr.log")
297    }
298    pub fn full_log_path(&self) -> PathBuf {
299        self.path.join("full.log")
300    }
301    pub fn completion_event_path(&self) -> PathBuf {
302        self.path.join("completion_event.json")
303    }
304    pub fn notification_events_path(&self) -> PathBuf {
305        self.path.join("notification_events.ndjson")
306    }
307
308    /// Write `completion_event.json` atomically.
309    pub fn write_completion_event_atomic(
310        &self,
311        record: &crate::schema::CompletionEventRecord,
312    ) -> Result<()> {
313        let target = self.completion_event_path();
314        let contents = serde_json::to_string_pretty(record)?;
315        write_atomic(&self.path, &target, contents.as_bytes())?;
316        Ok(())
317    }
318
319    pub fn read_meta(&self) -> Result<JobMeta> {
320        let raw = std::fs::read(self.meta_path())?;
321        Ok(serde_json::from_slice(&raw)?)
322    }
323
324    pub fn read_state(&self) -> Result<JobState> {
325        let raw = std::fs::read(self.state_path())?;
326        Ok(serde_json::from_slice(&raw)?)
327    }
328
329    /// Write `meta.json` atomically: write to a temp file then rename.
330    pub fn write_meta_atomic(&self, meta: &JobMeta) -> Result<()> {
331        let target = self.meta_path();
332        let contents = serde_json::to_string_pretty(meta)?;
333        write_atomic(&self.path, &target, contents.as_bytes())?;
334        Ok(())
335    }
336
337    /// Write `state.json` atomically: write to a temp file then rename.
338    pub fn write_state(&self, state: &JobState) -> Result<()> {
339        let target = self.state_path();
340        let contents = serde_json::to_string_pretty(state)?;
341        write_atomic(&self.path, &target, contents.as_bytes())?;
342        Ok(())
343    }
344
345    /// Read tail content and raw byte range metrics for a single log file.
346    pub fn read_tail_metrics(
347        &self,
348        filename: &str,
349        tail_lines: u64,
350        max_bytes: u64,
351    ) -> TailMetrics {
352        let path = self.path.join(filename);
353        let Ok(data) = std::fs::read(&path) else {
354            return TailMetrics {
355                tail: String::new(),
356                observed_bytes: 0,
357                range: [0, 0],
358            };
359        };
360
361        let observed_bytes = data.len() as u64;
362        let window_start = observed_bytes.saturating_sub(max_bytes) as usize;
363        let window = &data[window_start..];
364
365        let line_start_in_window = if tail_lines == 0 {
366            0
367        } else {
368            let mut chunks: Vec<&[u8]> = window.split(|b| *b == b'\n').collect();
369            if window.ends_with(b"\n") {
370                let _ = chunks.pop();
371            }
372            let keep_from = chunks.len().saturating_sub(tail_lines as usize);
373            chunks[..keep_from]
374                .iter()
375                .map(|c| c.len() + 1)
376                .sum::<usize>()
377        };
378
379        let selected = &window[line_start_in_window..];
380        let tail = String::from_utf8_lossy(selected).into_owned();
381        let begin = (window_start + line_start_in_window) as u64;
382
383        TailMetrics {
384            tail,
385            observed_bytes,
386            range: [begin, observed_bytes],
387        }
388    }
389
390    /// Read head content and byte metrics for a single log file.
391    ///
392    /// Returns the first `max_bytes` bytes (decoded as UTF-8 lossy) with
393    /// canonical raw byte range metadata.
394    pub fn read_head_metrics(&self, filename: &str, max_bytes: u64) -> HeadMetrics {
395        let path = self.path.join(filename);
396        let Ok(data) = std::fs::read(&path) else {
397            return HeadMetrics {
398                head: String::new(),
399                observed_bytes: 0,
400                included_bytes: 0,
401                range: [0, 0],
402            };
403        };
404
405        let observed_bytes = data.len() as u64;
406        let included_len = observed_bytes.min(max_bytes) as usize;
407        let head = String::from_utf8_lossy(&data[..included_len]).into_owned();
408        let included_bytes = included_len as u64;
409
410        HeadMetrics {
411            head,
412            observed_bytes,
413            included_bytes,
414            range: [0, included_bytes],
415        }
416    }
417
418    /// Write the initial JobState for a `created` (not-yet-started) job.
419    ///
420    /// The state is `created`, no process has been spawned, and `started_at` is absent.
421    pub fn init_state_created(&self) -> Result<JobState> {
422        let state = JobState {
423            job: crate::schema::JobStateJob {
424                id: self.job_id.clone(),
425                status: JobStatus::Created,
426                started_at: None,
427            },
428            result: crate::schema::JobStateResult {
429                exit_code: None,
430                signal: None,
431                duration_ms: None,
432            },
433            pid: None,
434            finished_at: None,
435            updated_at: crate::run::now_rfc3339_pub(),
436            windows_job_name: None,
437        };
438        self.write_state(&state)?;
439        Ok(state)
440    }
441
442    /// Write the initial JobState (running, supervisor PID) to disk.
443    ///
444    /// This is called by the `run` command immediately after the supervisor
445    /// process is spawned, so `pid` is the supervisor's PID. The child process
446    /// PID and, on Windows, the Job Object name are not yet known at this point.
447    ///
448    /// On Windows, the Job Object name is derived deterministically from the
449    /// job_id as `"AgentExec-{job_id}"`. This name is written immediately to
450    /// `state.json` so that callers reading state after `run` returns can
451    /// always find the Job Object identifier, without waiting for the supervisor
452    /// to perform its first `write_state` call. The supervisor will confirm the
453    /// same name (or update to `failed`) after it successfully assigns the child
454    /// process to the named Job Object.
455    pub fn init_state(&self, pid: u32, started_at: &str) -> Result<JobState> {
456        #[cfg(windows)]
457        let windows_job_name = Some(format!("AgentExec-{}", self.job_id));
458        #[cfg(not(windows))]
459        let windows_job_name: Option<String> = None;
460
461        let state = JobState {
462            job: crate::schema::JobStateJob {
463                id: self.job_id.clone(),
464                status: JobStatus::Running,
465                started_at: Some(started_at.to_string()),
466            },
467            result: crate::schema::JobStateResult {
468                exit_code: None,
469                signal: None,
470                duration_ms: None,
471            },
472            pid: Some(pid),
473            finished_at: None,
474            updated_at: crate::run::now_rfc3339_pub(),
475            windows_job_name,
476        };
477        self.write_state(&state)?;
478        Ok(state)
479    }
480}
481
482/// Write `contents` to `target` atomically by writing to a temp file in the
483/// same directory and then renaming. This prevents readers from observing a
484/// partially-written file.
485fn write_atomic(dir: &std::path::Path, target: &std::path::Path, contents: &[u8]) -> Result<()> {
486    use std::io::Write;
487
488    // Create a named temporary file in the same directory so that rename is
489    // always on the same filesystem (required for atomic rename on POSIX).
490    let mut tmp = tempfile::Builder::new()
491        .prefix(".tmp-")
492        .tempfile_in(dir)
493        .with_context(|| format!("create temp file in {}", dir.display()))?;
494
495    tmp.write_all(contents)
496        .with_context(|| format!("write temp file for {}", target.display()))?;
497
498    // Persist moves the temp file to the target path atomically.
499    tmp.persist(target)
500        .map_err(|e| e.error)
501        .with_context(|| format!("rename temp file to {}", target.display()))?;
502
503    Ok(())
504}
505
506// ---------- Unit tests ----------
507
508#[cfg(test)]
509mod tests {
510    use super::*;
511
512    /// Global mutex to serialize tests that mutate process-wide environment variables.
513    ///
514    /// Rust runs tests in parallel by default; any test that calls `set_var` /
515    /// `remove_var` must hold this lock for the duration of the test so that
516    /// other env-reading tests do not observe a half-mutated environment.
517    static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
518
519    #[test]
520    fn resolve_root_cli_flag_wins() {
521        // CLI flag does not depend on environment variables; no lock needed.
522        let root = resolve_root(Some("/tmp/my-root"));
523        assert_eq!(root, PathBuf::from("/tmp/my-root"));
524    }
525
526    #[test]
527    fn resolve_root_env_var() {
528        let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
529        // SAFETY: guarded by ENV_LOCK; no other env-mutating test runs concurrently.
530        unsafe {
531            std::env::set_var("AGENT_EXEC_ROOT", "/tmp/env-root");
532            // Also clear XDG to avoid interference.
533            std::env::remove_var("XDG_DATA_HOME");
534        }
535        // CLI flag is None, so env var should win.
536        let root = resolve_root(None);
537        // Restore.
538        unsafe {
539            std::env::remove_var("AGENT_EXEC_ROOT");
540        }
541        assert_eq!(root, PathBuf::from("/tmp/env-root"));
542    }
543
544    #[test]
545    fn resolve_root_xdg() {
546        let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
547        // SAFETY: guarded by ENV_LOCK; no other env-mutating test runs concurrently.
548        unsafe {
549            std::env::remove_var("AGENT_EXEC_ROOT");
550            std::env::set_var("XDG_DATA_HOME", "/tmp/xdg");
551        }
552        let root = resolve_root(None);
553        unsafe {
554            std::env::remove_var("XDG_DATA_HOME");
555        }
556        assert_eq!(root, PathBuf::from("/tmp/xdg/agent-exec/jobs"));
557    }
558
559    #[test]
560    fn resolve_root_default_contains_agent_exec() {
561        let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
562        // SAFETY: guarded by ENV_LOCK; no other env-mutating test runs concurrently.
563        unsafe {
564            std::env::remove_var("AGENT_EXEC_ROOT");
565            std::env::remove_var("XDG_DATA_HOME");
566        }
567        let root = resolve_root(None);
568        let root_str = root.to_string_lossy();
569        assert!(
570            root_str.contains("agent-exec"),
571            "expected agent-exec in path, got {root_str}"
572        );
573    }
574
575    // ---------- Job directory structure tests ----------
576
577    fn make_meta(job_id: &str, root: &std::path::Path) -> crate::schema::JobMeta {
578        crate::schema::JobMeta {
579            job: crate::schema::JobMetaJob {
580                id: job_id.to_string(),
581            },
582            schema_version: "0.1".to_string(),
583            command: vec!["echo".to_string(), "hello".to_string()],
584            created_at: "2024-01-01T00:00:00Z".to_string(),
585            root: root.display().to_string(),
586            env_keys: vec!["FOO".to_string()],
587            env_vars: vec![],
588            env_vars_runtime: vec![],
589            mask: vec![],
590            cwd: None,
591            notification: None,
592            tags: vec![],
593            inherit_env: true,
594            env_files: vec![],
595            timeout_ms: 0,
596            kill_after_ms: 0,
597            progress_every_ms: 0,
598            shell_wrapper: None,
599            stdin_file: None,
600        }
601    }
602
603    /// Verify that job directory creation writes meta.json and the directory exists.
604    #[test]
605    fn job_dir_create_writes_meta_json() {
606        let tmp = tempfile::tempdir().unwrap();
607        let root = tmp.path();
608        let meta = make_meta("test-job-01", root);
609        let job_dir = JobDir::create(root, "test-job-01", &meta).unwrap();
610
611        // Directory must exist.
612        assert!(job_dir.path.is_dir(), "job directory was not created");
613
614        // meta.json must exist and be parseable.
615        assert!(job_dir.meta_path().exists(), "meta.json not found");
616        let loaded_meta = job_dir.read_meta().unwrap();
617        assert_eq!(loaded_meta.job_id(), "test-job-01");
618        assert_eq!(loaded_meta.command, vec!["echo", "hello"]);
619
620        // env_keys must contain key names only (not values).
621        assert_eq!(loaded_meta.env_keys, vec!["FOO"]);
622    }
623
624    /// Verify that meta.json does NOT contain env values (only keys).
625    #[test]
626    fn meta_json_env_keys_only_no_values() {
627        let tmp = tempfile::tempdir().unwrap();
628        let root = tmp.path();
629        let mut meta = make_meta("test-job-02", root);
630        // Simulate env_keys containing only key names (as would be extracted from KEY=VALUE pairs).
631        meta.env_keys = vec!["SECRET_KEY".to_string(), "API_TOKEN".to_string()];
632        let job_dir = JobDir::create(root, "test-job-02", &meta).unwrap();
633
634        // Read raw JSON to verify values are absent.
635        let raw = std::fs::read_to_string(job_dir.meta_path()).unwrap();
636        assert!(
637            !raw.contains("secret_value"),
638            "env value must not be stored in meta.json"
639        );
640        assert!(raw.contains("SECRET_KEY"), "env key must be stored");
641        assert!(raw.contains("API_TOKEN"), "env key must be stored");
642    }
643
644    /// Verify that state.json contains updated_at after write_state.
645    #[test]
646    fn state_json_contains_updated_at() {
647        let tmp = tempfile::tempdir().unwrap();
648        let root = tmp.path();
649        let meta = make_meta("test-job-03", root);
650        let job_dir = JobDir::create(root, "test-job-03", &meta).unwrap();
651
652        let state = crate::schema::JobState {
653            job: crate::schema::JobStateJob {
654                id: "test-job-03".to_string(),
655                status: crate::schema::JobStatus::Running,
656                started_at: Some("2024-01-01T00:00:00Z".to_string()),
657            },
658            result: crate::schema::JobStateResult {
659                exit_code: None,
660                signal: None,
661                duration_ms: None,
662            },
663            pid: Some(12345),
664            finished_at: None,
665            updated_at: "2024-01-01T00:00:01Z".to_string(),
666            windows_job_name: None,
667        };
668        job_dir.write_state(&state).unwrap();
669
670        // Read back and verify.
671        assert!(job_dir.state_path().exists(), "state.json not found");
672        let loaded = job_dir.read_state().unwrap();
673        assert_eq!(loaded.updated_at, "2024-01-01T00:00:01Z");
674        assert_eq!(loaded.job_id(), "test-job-03");
675
676        // Also verify the raw JSON contains the updated_at field.
677        let raw = std::fs::read_to_string(job_dir.state_path()).unwrap();
678        assert!(
679            raw.contains("updated_at"),
680            "updated_at field missing from state.json"
681        );
682    }
683
684    /// Verify that write_state uses atomic write (temp file + rename).
685    /// We verify this indirectly: the file must not be corrupted even if we
686    /// call write_state multiple times rapidly.
687    #[test]
688    fn state_json_atomic_write_no_corruption() {
689        let tmp = tempfile::tempdir().unwrap();
690        let root = tmp.path();
691        let meta = make_meta("test-job-04", root);
692        let job_dir = JobDir::create(root, "test-job-04", &meta).unwrap();
693
694        for i in 0..10 {
695            let state = crate::schema::JobState {
696                job: crate::schema::JobStateJob {
697                    id: "test-job-04".to_string(),
698                    status: crate::schema::JobStatus::Running,
699                    started_at: Some("2024-01-01T00:00:00Z".to_string()),
700                },
701                result: crate::schema::JobStateResult {
702                    exit_code: None,
703                    signal: None,
704                    duration_ms: None,
705                },
706                pid: Some(100 + i),
707                finished_at: None,
708                updated_at: format!("2024-01-01T00:00:{:02}Z", i),
709                windows_job_name: None,
710            };
711            job_dir.write_state(&state).unwrap();
712
713            // Each read must produce valid JSON (no corruption).
714            let loaded = job_dir.read_state().unwrap();
715            assert_eq!(
716                loaded.pid,
717                Some(100 + i),
718                "state corrupted at iteration {i}"
719            );
720        }
721    }
722
723    /// Verify that meta.json atomic write works correctly.
724    #[test]
725    fn meta_json_atomic_write() {
726        let tmp = tempfile::tempdir().unwrap();
727        let root = tmp.path();
728        let meta = make_meta("test-job-05", root);
729        let job_dir = JobDir::create(root, "test-job-05", &meta).unwrap();
730
731        // Re-write meta atomically.
732        let updated_meta = crate::schema::JobMeta {
733            job: crate::schema::JobMetaJob {
734                id: "test-job-05".to_string(),
735            },
736            schema_version: "0.1".to_string(),
737            command: vec!["ls".to_string()],
738            created_at: "2024-06-01T12:00:00Z".to_string(),
739            root: root.display().to_string(),
740            env_keys: vec!["PATH".to_string()],
741            env_vars: vec![],
742            env_vars_runtime: vec![],
743            mask: vec![],
744            cwd: None,
745            notification: None,
746            tags: vec![],
747            inherit_env: true,
748            env_files: vec![],
749            timeout_ms: 0,
750            kill_after_ms: 0,
751            progress_every_ms: 0,
752            shell_wrapper: None,
753            stdin_file: None,
754        };
755        job_dir.write_meta_atomic(&updated_meta).unwrap();
756
757        let loaded = job_dir.read_meta().unwrap();
758        assert_eq!(loaded.command, vec!["ls"]);
759        assert_eq!(loaded.created_at, "2024-06-01T12:00:00Z");
760    }
761
762    /// On non-Windows platforms, `init_state` must write `windows_job_name: None`
763    /// (the field is omitted from JSON via `skip_serializing_if`).
764    /// On Windows, `init_state` must write the deterministic Job Object name
765    /// `"AgentExec-{job_id}"` so that `state.json` always contains the identifier
766    /// immediately after `run` returns, without waiting for the supervisor update.
767    #[test]
768    fn init_state_writes_deterministic_job_name_on_windows() {
769        let tmp = tempfile::tempdir().unwrap();
770        let root = tmp.path();
771        let job_id = "01TESTJOBID0000000000000";
772        let meta = make_meta(job_id, root);
773        let job_dir = JobDir::create(root, job_id, &meta).unwrap();
774        let state = job_dir.init_state(1234, "2024-01-01T00:00:00Z").unwrap();
775
776        // Verify in-memory state.
777        #[cfg(windows)]
778        assert_eq!(
779            state.windows_job_name.as_deref(),
780            Some("AgentExec-01TESTJOBID0000000000000"),
781            "Windows: init_state must set deterministic job name immediately"
782        );
783        #[cfg(not(windows))]
784        assert_eq!(
785            state.windows_job_name, None,
786            "non-Windows: init_state must not set windows_job_name"
787        );
788
789        // Verify persisted state on disk.
790        let persisted = job_dir.read_state().unwrap();
791        #[cfg(windows)]
792        assert_eq!(
793            persisted.windows_job_name.as_deref(),
794            Some("AgentExec-01TESTJOBID0000000000000"),
795            "Windows: persisted state.json must contain windows_job_name"
796        );
797        #[cfg(not(windows))]
798        assert_eq!(
799            persisted.windows_job_name, None,
800            "non-Windows: persisted state.json must not contain windows_job_name"
801        );
802    }
803
804    // ---------- Prefix-based job ID resolution tests ----------
805
806    #[test]
807    fn job_dir_open_exact_match() {
808        let tmp = tempfile::tempdir().unwrap();
809        let root = tmp.path();
810        let job_id = "01JQXK3M8E5PQRSTVWYZ12ABCD";
811        let meta = make_meta(job_id, root);
812        JobDir::create(root, job_id, &meta).unwrap();
813
814        let result = JobDir::open(root, job_id).unwrap();
815        assert_eq!(result.job_id, job_id);
816    }
817
818    #[test]
819    fn job_dir_open_unique_prefix_resolves() {
820        let tmp = tempfile::tempdir().unwrap();
821        let root = tmp.path();
822        let job_id = "01JQXK3M8E5PQRSTVWYZ12ABCD";
823        let meta = make_meta(job_id, root);
824        JobDir::create(root, job_id, &meta).unwrap();
825
826        // Use a unique prefix
827        let result = JobDir::open(root, "01JQXK3M").unwrap();
828        assert_eq!(result.job_id, job_id);
829    }
830
831    #[test]
832    fn job_dir_open_not_found_returns_job_not_found() {
833        let tmp = tempfile::tempdir().unwrap();
834        let root = tmp.path();
835
836        let err = JobDir::open(root, "ZZZZZ").unwrap_err();
837        assert!(
838            err.downcast_ref::<JobNotFound>().is_some(),
839            "expected JobNotFound, got: {err}"
840        );
841    }
842
843    #[test]
844    fn job_dir_open_ambiguous_prefix_returns_ambiguous() {
845        let tmp = tempfile::tempdir().unwrap();
846        let root = tmp.path();
847        let id_a = "01JQXK3M8EAAA00000000000AA";
848        let id_b = "01JQXK3M8EBBB00000000000BB";
849        let meta_a = make_meta(id_a, root);
850        let meta_b = make_meta(id_b, root);
851        JobDir::create(root, id_a, &meta_a).unwrap();
852        JobDir::create(root, id_b, &meta_b).unwrap();
853
854        let err = JobDir::open(root, "01JQXK3M8E").unwrap_err();
855        let ambiguous = err
856            .downcast_ref::<AmbiguousJobId>()
857            .expect("expected AmbiguousJobId");
858        assert_eq!(ambiguous.prefix, "01JQXK3M8E");
859        assert!(ambiguous.candidates.contains(&id_a.to_string()));
860        assert!(ambiguous.candidates.contains(&id_b.to_string()));
861    }
862
863    #[test]
864    fn ambiguous_job_id_display_up_to_20_candidates() {
865        let err = AmbiguousJobId {
866            prefix: "01J".to_string(),
867            candidates: vec![
868                "01JAAA".to_string(),
869                "01JBBB".to_string(),
870                "01JCCC".to_string(),
871            ],
872        };
873        let msg = err.to_string();
874        assert!(msg.contains("01J"), "must include prefix: {msg}");
875        assert!(msg.contains("01JAAA"), "must list candidates: {msg}");
876        assert!(
877            !msg.contains("more"),
878            "3 candidates should not truncate: {msg}"
879        );
880    }
881
882    #[test]
883    fn ambiguous_job_id_display_truncates_beyond_20() {
884        let candidates: Vec<String> = (1..=25)
885            .map(|i| format!("01JCANDIDATE{i:02}0000000000"))
886            .collect();
887        let err = AmbiguousJobId {
888            prefix: "01J".to_string(),
889            candidates,
890        };
891        let msg = err.to_string();
892        assert!(msg.contains("... and 5 more"), "must truncate: {msg}");
893    }
894
895    #[test]
896    fn generate_job_id_returns_fixed_length_hex() {
897        let tmp = tempfile::tempdir().unwrap();
898        let id = generate_job_id(tmp.path()).expect("generate job id");
899        assert_eq!(id.len(), JOB_ID_LENGTH, "unexpected job id length");
900        assert!(
901            id.chars()
902                .all(|c| c.is_ascii_hexdigit() && c.is_ascii_lowercase() || c.is_ascii_digit()),
903            "job id must be lowercase hex: {id}"
904        );
905    }
906
907    #[test]
908    fn generate_job_id_retries_when_collision_exists() {
909        let tmp = tempfile::tempdir().unwrap();
910        let root = tmp.path();
911
912        // Prime the root with many IDs to make collisions on first try highly likely
913        // only if generator does not check existence. This verifies returned ID is unique.
914        for _ in 0..64 {
915            let id = generate_job_id(root).expect("seed id");
916            std::fs::create_dir(root.join(id)).expect("create seeded dir");
917        }
918
919        let id = generate_job_id(root).expect("generate non-colliding id");
920        assert!(
921            !root.join(&id).exists(),
922            "generated id must not collide with existing directory"
923        );
924    }
925
926    /// A deterministic RNG that always produces the same bytes.
927    struct FixedRng([u8; JOB_ID_HEX_BYTES]);
928
929    impl rand::RngCore for FixedRng {
930        fn next_u32(&mut self) -> u32 {
931            unimplemented!()
932        }
933        fn next_u64(&mut self) -> u64 {
934            unimplemented!()
935        }
936        fn fill_bytes(&mut self, dest: &mut [u8]) {
937            dest.copy_from_slice(&self.0[..dest.len()]);
938        }
939        fn try_fill_bytes(&mut self, dest: &mut [u8]) -> std::result::Result<(), rand::Error> {
940            self.fill_bytes(dest);
941            Ok(())
942        }
943    }
944
945    #[test]
946    fn generate_job_id_fails_after_16_collisions() {
947        let tmp = tempfile::tempdir().unwrap();
948        let root = tmp.path();
949
950        let fixed_bytes = [0xABu8; JOB_ID_HEX_BYTES];
951        let colliding_id: String = fixed_bytes.iter().map(|b| format!("{b:02x}")).collect();
952        std::fs::create_dir(root.join(&colliding_id)).unwrap();
953
954        let mut rng = FixedRng(fixed_bytes);
955        let err = generate_job_id_with_rng(root, &mut rng).unwrap_err();
956        let exhausted = err
957            .downcast_ref::<JobIdCollisionExhausted>()
958            .expect("expected JobIdCollisionExhausted");
959        assert_eq!(exhausted.attempts, MAX_JOB_ID_ATTEMPTS);
960    }
961
962    #[test]
963    fn job_dir_open_unique_prefix_with_mixed_legacy_and_hash_ids() {
964        let tmp = tempfile::tempdir().unwrap();
965        let root = tmp.path();
966        let legacy_ulid = "01JQXK3M8E5PQRSTVWYZ12ABCD";
967        let hash_id = "deadbeefcafebabe1234567890abcdef";
968
969        let meta_legacy = make_meta(legacy_ulid, root);
970        let meta_hash = make_meta(hash_id, root);
971        JobDir::create(root, legacy_ulid, &meta_legacy).unwrap();
972        JobDir::create(root, hash_id, &meta_hash).unwrap();
973
974        let resolved_legacy = JobDir::open(root, "01JQXK3M").unwrap();
975        assert_eq!(resolved_legacy.job_id, legacy_ulid);
976
977        let resolved_hash = JobDir::open(root, "deadbee").unwrap();
978        assert_eq!(resolved_hash.job_id, hash_id);
979    }
980}