Skip to main content

agent_exec/
jobstore.rs

1//! Job directory management for agent-exec v0.1.
2//!
3//! Resolution order for the jobs root:
4//!   1. `--root` CLI flag
5//!   2. `AGENT_EXEC_ROOT` environment variable
6//!   3. `$XDG_DATA_HOME/agent-exec/jobs`
7//!   4. `~/.local/share/agent-exec/jobs`
8
9use anyhow::{Context, Result};
10use directories::BaseDirs;
11use rand::RngCore;
12use std::path::{Path, PathBuf};
13
14use crate::schema::{JobMeta, JobState, JobStatus};
15
16/// Sentinel error type to distinguish "job not found" from other I/O errors.
17/// Used by callers to emit `error.code = "job_not_found"` instead of `internal_error`.
18#[derive(Debug)]
19pub struct JobNotFound(pub String);
20
21impl std::fmt::Display for JobNotFound {
22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        write!(f, "job not found: {}", self.0)
24    }
25}
26
27impl std::error::Error for JobNotFound {}
28
29/// Sentinel error type when a job ID prefix matches multiple job directories.
30/// Used by callers to emit `error.code = "ambiguous_job_id"` instead of `internal_error`.
31#[derive(Debug)]
32pub struct AmbiguousJobId {
33    pub prefix: String,
34    pub candidates: Vec<String>,
35}
36
37impl std::fmt::Display for AmbiguousJobId {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        write!(f, "ambiguous job ID prefix '{}': matches ", self.prefix)?;
40        if self.candidates.len() <= 20 {
41            write!(f, "{}", self.candidates.join(", "))
42        } else {
43            write!(
44                f,
45                "{}, ... and {} more",
46                self.candidates[..20].join(", "),
47                self.candidates.len() - 20
48            )
49        }
50    }
51}
52
53impl std::error::Error for AmbiguousJobId {}
54
55/// Sentinel error type when job ID generation exhausts all retry attempts.
56/// Used by callers to emit `error.code = "io_error"` instead of `internal_error`.
57#[derive(Debug)]
58pub struct JobIdCollisionExhausted {
59    pub attempts: usize,
60}
61
62impl std::fmt::Display for JobIdCollisionExhausted {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        write!(
65            f,
66            "job ID generation failed: {} consecutive collisions",
67            self.attempts
68        )
69    }
70}
71
72impl std::error::Error for JobIdCollisionExhausted {}
73
74/// Sentinel error type for invalid job state transitions.
75/// Used by callers to emit `error.code = "invalid_state"` instead of `internal_error`.
76#[derive(Debug)]
77pub struct InvalidJobState(pub String);
78
79impl std::fmt::Display for InvalidJobState {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        write!(f, "invalid job state: {}", self.0)
82    }
83}
84
85impl std::error::Error for InvalidJobState {}
86
87/// Resolve the jobs root directory following the priority chain.
88const JOB_ID_HEX_BYTES: usize = 16;
89const JOB_ID_LENGTH: usize = JOB_ID_HEX_BYTES * 2;
90pub const SHORT_JOB_ID_LENGTH: usize = 7;
91
92const MAX_JOB_ID_ATTEMPTS: usize = 16;
93
94/// Generate a new hash-like job ID (`[0-9a-f]`, fixed-length) that is unique under `root`.
95///
96/// The generator retries on directory-name collisions up to 16 times.
97pub fn generate_job_id(root: &Path) -> Result<String> {
98    generate_job_id_with_rng(root, &mut rand::thread_rng())
99}
100
101fn generate_job_id_with_rng(root: &Path, rng: &mut impl RngCore) -> Result<String> {
102    for _ in 0..MAX_JOB_ID_ATTEMPTS {
103        let mut bytes = [0u8; JOB_ID_HEX_BYTES];
104        rng.fill_bytes(&mut bytes);
105        let candidate = bytes.iter().map(|b| format!("{b:02x}")).collect::<String>();
106        debug_assert_eq!(candidate.len(), JOB_ID_LENGTH);
107
108        if !root.join(&candidate).exists() {
109            return Ok(candidate);
110        }
111    }
112    Err(anyhow::Error::new(JobIdCollisionExhausted {
113        attempts: MAX_JOB_ID_ATTEMPTS,
114    }))
115}
116
117/// Human-facing short job ID for list-like displays.
118pub fn short_job_id(job_id: &str) -> String {
119    job_id.chars().take(SHORT_JOB_ID_LENGTH).collect()
120}
121
122pub fn resolve_root(cli_root: Option<&str>) -> PathBuf {
123    // 1. CLI flag
124    if let Some(root) = cli_root {
125        return PathBuf::from(root);
126    }
127
128    // 2. Environment variable
129    if let Ok(root) = std::env::var("AGENT_EXEC_ROOT")
130        && !root.is_empty()
131    {
132        return PathBuf::from(root);
133    }
134
135    // 3. XDG_DATA_HOME
136    if let Ok(xdg) = std::env::var("XDG_DATA_HOME")
137        && !xdg.is_empty()
138    {
139        return PathBuf::from(xdg).join("agent-exec").join("jobs");
140    }
141
142    // 4. Default: ~/.local/share/agent-exec/jobs
143    //    (On Windows use data_local_dir() as base)
144    if let Some(base_dirs) = BaseDirs::new() {
145        #[cfg(windows)]
146        let base = base_dirs.data_local_dir().to_path_buf();
147        #[cfg(not(windows))]
148        let base = base_dirs.home_dir().join(".local").join("share");
149        return base.join("agent-exec").join("jobs");
150    }
151
152    // Fallback if directories crate returns None
153    PathBuf::from("~/.local/share/agent-exec/jobs")
154}
155
156/// Metrics returned by [`JobDir::read_tail_metrics`].
157///
158/// Bundles the tail content together with the raw byte ranges used in the
159/// `tail` JSON responses, so that callers share the same calculation logic.
160pub struct TailMetrics {
161    /// The tail text (lossy UTF-8, last N lines / max_bytes).
162    pub tail: String,
163    /// Total file size in bytes (0 if the file does not exist).
164    pub observed_bytes: u64,
165    /// Raw byte range [begin, end) represented by the returned text.
166    pub range: [u64; 2],
167}
168
169/// Metrics for the head slice of a log file.
170pub struct HeadMetrics {
171    /// The head text (lossy UTF-8, first max_bytes bytes).
172    pub head: String,
173    /// Total file size in bytes (0 if the file does not exist).
174    pub observed_bytes: u64,
175    /// Number of bytes included in `head`.
176    pub included_bytes: u64,
177    /// Raw byte range [begin, end) represented by the returned text.
178    pub range: [u64; 2],
179}
180
181/// Handle to a specific job's directory.
182#[derive(Debug)]
183pub struct JobDir {
184    pub path: PathBuf,
185    pub job_id: String,
186}
187
188impl JobDir {
189    /// Open an existing job directory by ID or unambiguous prefix.
190    ///
191    /// Resolution order:
192    /// 1. Exact match: if `root/<job_id>` exists, return it immediately (no scan).
193    /// 2. Prefix scan: scan `root/` for directories whose name starts with `job_id`.
194    ///    - 0 matches → `Err(JobNotFound)`
195    ///    - 1 match   → resolve to that job
196    ///    - 2+ matches → `Err(AmbiguousJobId)`
197    pub fn open(root: &std::path::Path, job_id: &str) -> Result<Self> {
198        // Exact-match fast path: no directory scan needed.
199        let path = root.join(job_id);
200        if path.is_dir() {
201            return Ok(JobDir {
202                path,
203                job_id: job_id.to_string(),
204            });
205        }
206
207        // Prefix scan: collect all directories whose name starts with `job_id`.
208        let mut candidates: Vec<String> = std::fs::read_dir(root)
209            .into_iter()
210            .flatten()
211            .flatten()
212            .filter_map(|entry| {
213                let name = entry.file_name().to_string_lossy().into_owned();
214                if name.starts_with(job_id) && entry.path().is_dir() {
215                    Some(name)
216                } else {
217                    None
218                }
219            })
220            .collect();
221
222        match candidates.len() {
223            0 => Err(anyhow::Error::new(JobNotFound(job_id.to_string()))),
224            1 => {
225                let resolved = candidates.remove(0);
226                let path = root.join(&resolved);
227                Ok(JobDir {
228                    path,
229                    job_id: resolved,
230                })
231            }
232            _ => {
233                candidates.sort();
234                Err(anyhow::Error::new(AmbiguousJobId {
235                    prefix: job_id.to_string(),
236                    candidates,
237                }))
238            }
239        }
240    }
241
242    /// Create a new job directory and write `meta.json` atomically.
243    pub fn create(root: &std::path::Path, job_id: &str, meta: &JobMeta) -> Result<Self> {
244        let path = root.join(job_id);
245        std::fs::create_dir_all(&path)
246            .with_context(|| format!("create job dir {}", path.display()))?;
247
248        let job_dir = JobDir {
249            path,
250            job_id: job_id.to_string(),
251        };
252
253        job_dir.write_meta_atomic(meta)?;
254
255        Ok(job_dir)
256    }
257
258    pub fn meta_path(&self) -> PathBuf {
259        self.path.join("meta.json")
260    }
261    pub fn state_path(&self) -> PathBuf {
262        self.path.join("state.json")
263    }
264    pub fn stdout_path(&self) -> PathBuf {
265        self.path.join("stdout.log")
266    }
267    pub fn stderr_path(&self) -> PathBuf {
268        self.path.join("stderr.log")
269    }
270    pub fn full_log_path(&self) -> PathBuf {
271        self.path.join("full.log")
272    }
273    pub fn completion_event_path(&self) -> PathBuf {
274        self.path.join("completion_event.json")
275    }
276    pub fn notification_events_path(&self) -> PathBuf {
277        self.path.join("notification_events.ndjson")
278    }
279
280    /// Write `completion_event.json` atomically.
281    pub fn write_completion_event_atomic(
282        &self,
283        record: &crate::schema::CompletionEventRecord,
284    ) -> Result<()> {
285        let target = self.completion_event_path();
286        let contents = serde_json::to_string_pretty(record)?;
287        write_atomic(&self.path, &target, contents.as_bytes())?;
288        Ok(())
289    }
290
291    pub fn read_meta(&self) -> Result<JobMeta> {
292        let raw = std::fs::read(self.meta_path())?;
293        Ok(serde_json::from_slice(&raw)?)
294    }
295
296    pub fn read_state(&self) -> Result<JobState> {
297        let raw = std::fs::read(self.state_path())?;
298        Ok(serde_json::from_slice(&raw)?)
299    }
300
301    /// Write `meta.json` atomically: write to a temp file then rename.
302    pub fn write_meta_atomic(&self, meta: &JobMeta) -> Result<()> {
303        let target = self.meta_path();
304        let contents = serde_json::to_string_pretty(meta)?;
305        write_atomic(&self.path, &target, contents.as_bytes())?;
306        Ok(())
307    }
308
309    /// Write `state.json` atomically: write to a temp file then rename.
310    pub fn write_state(&self, state: &JobState) -> Result<()> {
311        let target = self.state_path();
312        let contents = serde_json::to_string_pretty(state)?;
313        write_atomic(&self.path, &target, contents.as_bytes())?;
314        Ok(())
315    }
316
317    /// Read tail content and raw byte range metrics for a single log file.
318    pub fn read_tail_metrics(
319        &self,
320        filename: &str,
321        tail_lines: u64,
322        max_bytes: u64,
323    ) -> TailMetrics {
324        let path = self.path.join(filename);
325        let Ok(data) = std::fs::read(&path) else {
326            return TailMetrics {
327                tail: String::new(),
328                observed_bytes: 0,
329                range: [0, 0],
330            };
331        };
332
333        let observed_bytes = data.len() as u64;
334        let window_start = observed_bytes.saturating_sub(max_bytes) as usize;
335        let window = &data[window_start..];
336
337        let line_start_in_window = if tail_lines == 0 {
338            0
339        } else {
340            let mut chunks: Vec<&[u8]> = window.split(|b| *b == b'\n').collect();
341            if window.ends_with(b"\n") {
342                let _ = chunks.pop();
343            }
344            let keep_from = chunks.len().saturating_sub(tail_lines as usize);
345            chunks[..keep_from]
346                .iter()
347                .map(|c| c.len() + 1)
348                .sum::<usize>()
349        };
350
351        let selected = &window[line_start_in_window..];
352        let tail = String::from_utf8_lossy(selected).into_owned();
353        let begin = (window_start + line_start_in_window) as u64;
354
355        TailMetrics {
356            tail,
357            observed_bytes,
358            range: [begin, observed_bytes],
359        }
360    }
361
362    /// Read head content and byte metrics for a single log file.
363    ///
364    /// Returns the first `max_bytes` bytes (decoded as UTF-8 lossy) with
365    /// canonical raw byte range metadata.
366    pub fn read_head_metrics(&self, filename: &str, max_bytes: u64) -> HeadMetrics {
367        let path = self.path.join(filename);
368        let Ok(data) = std::fs::read(&path) else {
369            return HeadMetrics {
370                head: String::new(),
371                observed_bytes: 0,
372                included_bytes: 0,
373                range: [0, 0],
374            };
375        };
376
377        let observed_bytes = data.len() as u64;
378        let included_len = observed_bytes.min(max_bytes) as usize;
379        let head = String::from_utf8_lossy(&data[..included_len]).into_owned();
380        let included_bytes = included_len as u64;
381
382        HeadMetrics {
383            head,
384            observed_bytes,
385            included_bytes,
386            range: [0, included_bytes],
387        }
388    }
389
390    /// Write the initial JobState for a `created` (not-yet-started) job.
391    ///
392    /// The state is `created`, no process has been spawned, and `started_at` is absent.
393    pub fn init_state_created(&self) -> Result<JobState> {
394        let state = JobState {
395            job: crate::schema::JobStateJob {
396                id: self.job_id.clone(),
397                status: JobStatus::Created,
398                started_at: None,
399            },
400            result: crate::schema::JobStateResult {
401                exit_code: None,
402                signal: None,
403                duration_ms: None,
404            },
405            pid: None,
406            finished_at: None,
407            updated_at: crate::run::now_rfc3339_pub(),
408            windows_job_name: None,
409        };
410        self.write_state(&state)?;
411        Ok(state)
412    }
413
414    /// Write the initial JobState (running, supervisor PID) to disk.
415    ///
416    /// This is called by the `run` command immediately after the supervisor
417    /// process is spawned, so `pid` is the supervisor's PID. The child process
418    /// PID and, on Windows, the Job Object name are not yet known at this point.
419    ///
420    /// On Windows, the Job Object name is derived deterministically from the
421    /// job_id as `"AgentExec-{job_id}"`. This name is written immediately to
422    /// `state.json` so that callers reading state after `run` returns can
423    /// always find the Job Object identifier, without waiting for the supervisor
424    /// to perform its first `write_state` call. The supervisor will confirm the
425    /// same name (or update to `failed`) after it successfully assigns the child
426    /// process to the named Job Object.
427    pub fn init_state(&self, pid: u32, started_at: &str) -> Result<JobState> {
428        #[cfg(windows)]
429        let windows_job_name = Some(format!("AgentExec-{}", self.job_id));
430        #[cfg(not(windows))]
431        let windows_job_name: Option<String> = None;
432
433        let state = JobState {
434            job: crate::schema::JobStateJob {
435                id: self.job_id.clone(),
436                status: JobStatus::Running,
437                started_at: Some(started_at.to_string()),
438            },
439            result: crate::schema::JobStateResult {
440                exit_code: None,
441                signal: None,
442                duration_ms: None,
443            },
444            pid: Some(pid),
445            finished_at: None,
446            updated_at: crate::run::now_rfc3339_pub(),
447            windows_job_name,
448        };
449        self.write_state(&state)?;
450        Ok(state)
451    }
452}
453
454/// Write `contents` to `target` atomically by writing to a temp file in the
455/// same directory and then renaming. This prevents readers from observing a
456/// partially-written file.
457fn write_atomic(dir: &std::path::Path, target: &std::path::Path, contents: &[u8]) -> Result<()> {
458    use std::io::Write;
459
460    // Create a named temporary file in the same directory so that rename is
461    // always on the same filesystem (required for atomic rename on POSIX).
462    let mut tmp = tempfile::Builder::new()
463        .prefix(".tmp-")
464        .tempfile_in(dir)
465        .with_context(|| format!("create temp file in {}", dir.display()))?;
466
467    tmp.write_all(contents)
468        .with_context(|| format!("write temp file for {}", target.display()))?;
469
470    // Persist moves the temp file to the target path atomically.
471    tmp.persist(target)
472        .map_err(|e| e.error)
473        .with_context(|| format!("rename temp file to {}", target.display()))?;
474
475    Ok(())
476}
477
478// ---------- Unit tests ----------
479
480#[cfg(test)]
481mod tests {
482    use super::*;
483
484    /// Global mutex to serialize tests that mutate process-wide environment variables.
485    ///
486    /// Rust runs tests in parallel by default; any test that calls `set_var` /
487    /// `remove_var` must hold this lock for the duration of the test so that
488    /// other env-reading tests do not observe a half-mutated environment.
489    static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
490
491    #[test]
492    fn resolve_root_cli_flag_wins() {
493        // CLI flag does not depend on environment variables; no lock needed.
494        let root = resolve_root(Some("/tmp/my-root"));
495        assert_eq!(root, PathBuf::from("/tmp/my-root"));
496    }
497
498    #[test]
499    fn resolve_root_env_var() {
500        let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
501        // SAFETY: guarded by ENV_LOCK; no other env-mutating test runs concurrently.
502        unsafe {
503            std::env::set_var("AGENT_EXEC_ROOT", "/tmp/env-root");
504            // Also clear XDG to avoid interference.
505            std::env::remove_var("XDG_DATA_HOME");
506        }
507        // CLI flag is None, so env var should win.
508        let root = resolve_root(None);
509        // Restore.
510        unsafe {
511            std::env::remove_var("AGENT_EXEC_ROOT");
512        }
513        assert_eq!(root, PathBuf::from("/tmp/env-root"));
514    }
515
516    #[test]
517    fn resolve_root_xdg() {
518        let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
519        // SAFETY: guarded by ENV_LOCK; no other env-mutating test runs concurrently.
520        unsafe {
521            std::env::remove_var("AGENT_EXEC_ROOT");
522            std::env::set_var("XDG_DATA_HOME", "/tmp/xdg");
523        }
524        let root = resolve_root(None);
525        unsafe {
526            std::env::remove_var("XDG_DATA_HOME");
527        }
528        assert_eq!(root, PathBuf::from("/tmp/xdg/agent-exec/jobs"));
529    }
530
531    #[test]
532    fn resolve_root_default_contains_agent_exec() {
533        let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
534        // SAFETY: guarded by ENV_LOCK; no other env-mutating test runs concurrently.
535        unsafe {
536            std::env::remove_var("AGENT_EXEC_ROOT");
537            std::env::remove_var("XDG_DATA_HOME");
538        }
539        let root = resolve_root(None);
540        let root_str = root.to_string_lossy();
541        assert!(
542            root_str.contains("agent-exec"),
543            "expected agent-exec in path, got {root_str}"
544        );
545    }
546
547    // ---------- Job directory structure tests ----------
548
549    fn make_meta(job_id: &str, root: &std::path::Path) -> crate::schema::JobMeta {
550        crate::schema::JobMeta {
551            job: crate::schema::JobMetaJob {
552                id: job_id.to_string(),
553            },
554            schema_version: "0.1".to_string(),
555            command: vec!["echo".to_string(), "hello".to_string()],
556            created_at: "2024-01-01T00:00:00Z".to_string(),
557            root: root.display().to_string(),
558            env_keys: vec!["FOO".to_string()],
559            env_vars: vec![],
560            env_vars_runtime: vec![],
561            mask: vec![],
562            cwd: None,
563            notification: None,
564            tags: vec![],
565            inherit_env: true,
566            env_files: vec![],
567            timeout_ms: 0,
568            kill_after_ms: 0,
569            progress_every_ms: 0,
570            shell_wrapper: None,
571            stdin_file: None,
572        }
573    }
574
575    /// Verify that job directory creation writes meta.json and the directory exists.
576    #[test]
577    fn job_dir_create_writes_meta_json() {
578        let tmp = tempfile::tempdir().unwrap();
579        let root = tmp.path();
580        let meta = make_meta("test-job-01", root);
581        let job_dir = JobDir::create(root, "test-job-01", &meta).unwrap();
582
583        // Directory must exist.
584        assert!(job_dir.path.is_dir(), "job directory was not created");
585
586        // meta.json must exist and be parseable.
587        assert!(job_dir.meta_path().exists(), "meta.json not found");
588        let loaded_meta = job_dir.read_meta().unwrap();
589        assert_eq!(loaded_meta.job_id(), "test-job-01");
590        assert_eq!(loaded_meta.command, vec!["echo", "hello"]);
591
592        // env_keys must contain key names only (not values).
593        assert_eq!(loaded_meta.env_keys, vec!["FOO"]);
594    }
595
596    /// Verify that meta.json does NOT contain env values (only keys).
597    #[test]
598    fn meta_json_env_keys_only_no_values() {
599        let tmp = tempfile::tempdir().unwrap();
600        let root = tmp.path();
601        let mut meta = make_meta("test-job-02", root);
602        // Simulate env_keys containing only key names (as would be extracted from KEY=VALUE pairs).
603        meta.env_keys = vec!["SECRET_KEY".to_string(), "API_TOKEN".to_string()];
604        let job_dir = JobDir::create(root, "test-job-02", &meta).unwrap();
605
606        // Read raw JSON to verify values are absent.
607        let raw = std::fs::read_to_string(job_dir.meta_path()).unwrap();
608        assert!(
609            !raw.contains("secret_value"),
610            "env value must not be stored in meta.json"
611        );
612        assert!(raw.contains("SECRET_KEY"), "env key must be stored");
613        assert!(raw.contains("API_TOKEN"), "env key must be stored");
614    }
615
616    /// Verify that state.json contains updated_at after write_state.
617    #[test]
618    fn state_json_contains_updated_at() {
619        let tmp = tempfile::tempdir().unwrap();
620        let root = tmp.path();
621        let meta = make_meta("test-job-03", root);
622        let job_dir = JobDir::create(root, "test-job-03", &meta).unwrap();
623
624        let state = crate::schema::JobState {
625            job: crate::schema::JobStateJob {
626                id: "test-job-03".to_string(),
627                status: crate::schema::JobStatus::Running,
628                started_at: Some("2024-01-01T00:00:00Z".to_string()),
629            },
630            result: crate::schema::JobStateResult {
631                exit_code: None,
632                signal: None,
633                duration_ms: None,
634            },
635            pid: Some(12345),
636            finished_at: None,
637            updated_at: "2024-01-01T00:00:01Z".to_string(),
638            windows_job_name: None,
639        };
640        job_dir.write_state(&state).unwrap();
641
642        // Read back and verify.
643        assert!(job_dir.state_path().exists(), "state.json not found");
644        let loaded = job_dir.read_state().unwrap();
645        assert_eq!(loaded.updated_at, "2024-01-01T00:00:01Z");
646        assert_eq!(loaded.job_id(), "test-job-03");
647
648        // Also verify the raw JSON contains the updated_at field.
649        let raw = std::fs::read_to_string(job_dir.state_path()).unwrap();
650        assert!(
651            raw.contains("updated_at"),
652            "updated_at field missing from state.json"
653        );
654    }
655
656    /// Verify that write_state uses atomic write (temp file + rename).
657    /// We verify this indirectly: the file must not be corrupted even if we
658    /// call write_state multiple times rapidly.
659    #[test]
660    fn state_json_atomic_write_no_corruption() {
661        let tmp = tempfile::tempdir().unwrap();
662        let root = tmp.path();
663        let meta = make_meta("test-job-04", root);
664        let job_dir = JobDir::create(root, "test-job-04", &meta).unwrap();
665
666        for i in 0..10 {
667            let state = crate::schema::JobState {
668                job: crate::schema::JobStateJob {
669                    id: "test-job-04".to_string(),
670                    status: crate::schema::JobStatus::Running,
671                    started_at: Some("2024-01-01T00:00:00Z".to_string()),
672                },
673                result: crate::schema::JobStateResult {
674                    exit_code: None,
675                    signal: None,
676                    duration_ms: None,
677                },
678                pid: Some(100 + i),
679                finished_at: None,
680                updated_at: format!("2024-01-01T00:00:{:02}Z", i),
681                windows_job_name: None,
682            };
683            job_dir.write_state(&state).unwrap();
684
685            // Each read must produce valid JSON (no corruption).
686            let loaded = job_dir.read_state().unwrap();
687            assert_eq!(
688                loaded.pid,
689                Some(100 + i),
690                "state corrupted at iteration {i}"
691            );
692        }
693    }
694
695    /// Verify that meta.json atomic write works correctly.
696    #[test]
697    fn meta_json_atomic_write() {
698        let tmp = tempfile::tempdir().unwrap();
699        let root = tmp.path();
700        let meta = make_meta("test-job-05", root);
701        let job_dir = JobDir::create(root, "test-job-05", &meta).unwrap();
702
703        // Re-write meta atomically.
704        let updated_meta = crate::schema::JobMeta {
705            job: crate::schema::JobMetaJob {
706                id: "test-job-05".to_string(),
707            },
708            schema_version: "0.1".to_string(),
709            command: vec!["ls".to_string()],
710            created_at: "2024-06-01T12:00:00Z".to_string(),
711            root: root.display().to_string(),
712            env_keys: vec!["PATH".to_string()],
713            env_vars: vec![],
714            env_vars_runtime: vec![],
715            mask: vec![],
716            cwd: None,
717            notification: None,
718            tags: vec![],
719            inherit_env: true,
720            env_files: vec![],
721            timeout_ms: 0,
722            kill_after_ms: 0,
723            progress_every_ms: 0,
724            shell_wrapper: None,
725            stdin_file: None,
726        };
727        job_dir.write_meta_atomic(&updated_meta).unwrap();
728
729        let loaded = job_dir.read_meta().unwrap();
730        assert_eq!(loaded.command, vec!["ls"]);
731        assert_eq!(loaded.created_at, "2024-06-01T12:00:00Z");
732    }
733
734    /// On non-Windows platforms, `init_state` must write `windows_job_name: None`
735    /// (the field is omitted from JSON via `skip_serializing_if`).
736    /// On Windows, `init_state` must write the deterministic Job Object name
737    /// `"AgentExec-{job_id}"` so that `state.json` always contains the identifier
738    /// immediately after `run` returns, without waiting for the supervisor update.
739    #[test]
740    fn init_state_writes_deterministic_job_name_on_windows() {
741        let tmp = tempfile::tempdir().unwrap();
742        let root = tmp.path();
743        let job_id = "01TESTJOBID0000000000000";
744        let meta = make_meta(job_id, root);
745        let job_dir = JobDir::create(root, job_id, &meta).unwrap();
746        let state = job_dir.init_state(1234, "2024-01-01T00:00:00Z").unwrap();
747
748        // Verify in-memory state.
749        #[cfg(windows)]
750        assert_eq!(
751            state.windows_job_name.as_deref(),
752            Some("AgentExec-01TESTJOBID0000000000000"),
753            "Windows: init_state must set deterministic job name immediately"
754        );
755        #[cfg(not(windows))]
756        assert_eq!(
757            state.windows_job_name, None,
758            "non-Windows: init_state must not set windows_job_name"
759        );
760
761        // Verify persisted state on disk.
762        let persisted = job_dir.read_state().unwrap();
763        #[cfg(windows)]
764        assert_eq!(
765            persisted.windows_job_name.as_deref(),
766            Some("AgentExec-01TESTJOBID0000000000000"),
767            "Windows: persisted state.json must contain windows_job_name"
768        );
769        #[cfg(not(windows))]
770        assert_eq!(
771            persisted.windows_job_name, None,
772            "non-Windows: persisted state.json must not contain windows_job_name"
773        );
774    }
775
776    // ---------- Prefix-based job ID resolution tests ----------
777
778    #[test]
779    fn job_dir_open_exact_match() {
780        let tmp = tempfile::tempdir().unwrap();
781        let root = tmp.path();
782        let job_id = "01JQXK3M8E5PQRSTVWYZ12ABCD";
783        let meta = make_meta(job_id, root);
784        JobDir::create(root, job_id, &meta).unwrap();
785
786        let result = JobDir::open(root, job_id).unwrap();
787        assert_eq!(result.job_id, job_id);
788    }
789
790    #[test]
791    fn job_dir_open_unique_prefix_resolves() {
792        let tmp = tempfile::tempdir().unwrap();
793        let root = tmp.path();
794        let job_id = "01JQXK3M8E5PQRSTVWYZ12ABCD";
795        let meta = make_meta(job_id, root);
796        JobDir::create(root, job_id, &meta).unwrap();
797
798        // Use a unique prefix
799        let result = JobDir::open(root, "01JQXK3M").unwrap();
800        assert_eq!(result.job_id, job_id);
801    }
802
803    #[test]
804    fn job_dir_open_not_found_returns_job_not_found() {
805        let tmp = tempfile::tempdir().unwrap();
806        let root = tmp.path();
807
808        let err = JobDir::open(root, "ZZZZZ").unwrap_err();
809        assert!(
810            err.downcast_ref::<JobNotFound>().is_some(),
811            "expected JobNotFound, got: {err}"
812        );
813    }
814
815    #[test]
816    fn job_dir_open_ambiguous_prefix_returns_ambiguous() {
817        let tmp = tempfile::tempdir().unwrap();
818        let root = tmp.path();
819        let id_a = "01JQXK3M8EAAA00000000000AA";
820        let id_b = "01JQXK3M8EBBB00000000000BB";
821        let meta_a = make_meta(id_a, root);
822        let meta_b = make_meta(id_b, root);
823        JobDir::create(root, id_a, &meta_a).unwrap();
824        JobDir::create(root, id_b, &meta_b).unwrap();
825
826        let err = JobDir::open(root, "01JQXK3M8E").unwrap_err();
827        let ambiguous = err
828            .downcast_ref::<AmbiguousJobId>()
829            .expect("expected AmbiguousJobId");
830        assert_eq!(ambiguous.prefix, "01JQXK3M8E");
831        assert!(ambiguous.candidates.contains(&id_a.to_string()));
832        assert!(ambiguous.candidates.contains(&id_b.to_string()));
833    }
834
835    #[test]
836    fn ambiguous_job_id_display_up_to_20_candidates() {
837        let err = AmbiguousJobId {
838            prefix: "01J".to_string(),
839            candidates: vec![
840                "01JAAA".to_string(),
841                "01JBBB".to_string(),
842                "01JCCC".to_string(),
843            ],
844        };
845        let msg = err.to_string();
846        assert!(msg.contains("01J"), "must include prefix: {msg}");
847        assert!(msg.contains("01JAAA"), "must list candidates: {msg}");
848        assert!(
849            !msg.contains("more"),
850            "3 candidates should not truncate: {msg}"
851        );
852    }
853
854    #[test]
855    fn ambiguous_job_id_display_truncates_beyond_20() {
856        let candidates: Vec<String> = (1..=25)
857            .map(|i| format!("01JCANDIDATE{i:02}0000000000"))
858            .collect();
859        let err = AmbiguousJobId {
860            prefix: "01J".to_string(),
861            candidates,
862        };
863        let msg = err.to_string();
864        assert!(msg.contains("... and 5 more"), "must truncate: {msg}");
865    }
866
867    #[test]
868    fn generate_job_id_returns_fixed_length_hex() {
869        let tmp = tempfile::tempdir().unwrap();
870        let id = generate_job_id(tmp.path()).expect("generate job id");
871        assert_eq!(id.len(), JOB_ID_LENGTH, "unexpected job id length");
872        assert!(
873            id.chars()
874                .all(|c| c.is_ascii_hexdigit() && c.is_ascii_lowercase() || c.is_ascii_digit()),
875            "job id must be lowercase hex: {id}"
876        );
877    }
878
879    #[test]
880    fn generate_job_id_retries_when_collision_exists() {
881        let tmp = tempfile::tempdir().unwrap();
882        let root = tmp.path();
883
884        // Prime the root with many IDs to make collisions on first try highly likely
885        // only if generator does not check existence. This verifies returned ID is unique.
886        for _ in 0..64 {
887            let id = generate_job_id(root).expect("seed id");
888            std::fs::create_dir(root.join(id)).expect("create seeded dir");
889        }
890
891        let id = generate_job_id(root).expect("generate non-colliding id");
892        assert!(
893            !root.join(&id).exists(),
894            "generated id must not collide with existing directory"
895        );
896    }
897
898    /// A deterministic RNG that always produces the same bytes.
899    struct FixedRng([u8; JOB_ID_HEX_BYTES]);
900
901    impl rand::RngCore for FixedRng {
902        fn next_u32(&mut self) -> u32 {
903            unimplemented!()
904        }
905        fn next_u64(&mut self) -> u64 {
906            unimplemented!()
907        }
908        fn fill_bytes(&mut self, dest: &mut [u8]) {
909            dest.copy_from_slice(&self.0[..dest.len()]);
910        }
911        fn try_fill_bytes(&mut self, dest: &mut [u8]) -> std::result::Result<(), rand::Error> {
912            self.fill_bytes(dest);
913            Ok(())
914        }
915    }
916
917    #[test]
918    fn generate_job_id_fails_after_16_collisions() {
919        let tmp = tempfile::tempdir().unwrap();
920        let root = tmp.path();
921
922        let fixed_bytes = [0xABu8; JOB_ID_HEX_BYTES];
923        let colliding_id: String = fixed_bytes.iter().map(|b| format!("{b:02x}")).collect();
924        std::fs::create_dir(root.join(&colliding_id)).unwrap();
925
926        let mut rng = FixedRng(fixed_bytes);
927        let err = generate_job_id_with_rng(root, &mut rng).unwrap_err();
928        let exhausted = err
929            .downcast_ref::<JobIdCollisionExhausted>()
930            .expect("expected JobIdCollisionExhausted");
931        assert_eq!(exhausted.attempts, MAX_JOB_ID_ATTEMPTS);
932    }
933
934    #[test]
935    fn job_dir_open_unique_prefix_with_mixed_legacy_and_hash_ids() {
936        let tmp = tempfile::tempdir().unwrap();
937        let root = tmp.path();
938        let legacy_ulid = "01JQXK3M8E5PQRSTVWYZ12ABCD";
939        let hash_id = "deadbeefcafebabe1234567890abcdef";
940
941        let meta_legacy = make_meta(legacy_ulid, root);
942        let meta_hash = make_meta(hash_id, root);
943        JobDir::create(root, legacy_ulid, &meta_legacy).unwrap();
944        JobDir::create(root, hash_id, &meta_hash).unwrap();
945
946        let resolved_legacy = JobDir::open(root, "01JQXK3M").unwrap();
947        assert_eq!(resolved_legacy.job_id, legacy_ulid);
948
949        let resolved_hash = JobDir::open(root, "deadbee").unwrap();
950        assert_eq!(resolved_hash.job_id, hash_id);
951    }
952}