Skip to main content

treeship_core/journal/
mod.rs

1//! Local Approval Use Journal -- v0.9.9 PR 2.
2//!
3//! Per-workspace append-only memory of consumed Approval Grants. The
4//! journal turns the v0.9.6 "package-local only" replay finding into a
5//! local-journal replay finding: with this module wired through, verify
6//! can say "use 1/1 -- local Approval Use Journal passed" instead of
7//! "no global ledger consulted."
8//!
9//! Scope of THIS PR:
10//!   * journal storage (records/, heads/, indexes/, locks/)
11//!   * append-only writes with file lock + atomic temp+rename
12//!   * hash chain via `previous_record_digest`
13//!   * read-only `check_replay` lookup
14//!   * `verify_integrity` chain walk
15//!   * `rebuild_indexes` from records (records are truth)
16//!
17//! Out of scope (later PRs):
18//!   * consume-before-action wiring inside `treeship attest action` (PR 3)
19//!   * package export of journal records (PR 4)
20//!   * Hub checkpoint signing (PR 6 scaffold)
21//!
22//! Privacy rules baked into the layout:
23//!   * `nonce_digest`, never raw nonce
24//!   * no commands, prompts, file contents, bearer tokens, or API keys
25//!     are stored. The journal answers the single question "has this
26//!     (grant_id, nonce_digest) been consumed before, and if so how
27//!     many times?" -- everything else stays in the signed grant +
28//!     receipt where it already is.
29
30use std::fs::{self, File, OpenOptions};
31use std::io::Write;
32use std::path::{Path, PathBuf};
33
34// fs2 is gated to non-wasm targets at the workspace Cargo.toml; the WASM
35// build has no concurrent writers and no real filesystem, so journal
36// operations fall back to a deterministic "no-op write" mode that still
37// keeps the public API building. Same pattern session::event_log uses.
38#[cfg(not(target_family = "wasm"))]
39use fs2::FileExt;
40
41use crate::statements::{
42    ApprovalRevocation, ApprovalUse, JournalCheckpoint, ReplayCheck, ReplayCheckLevel,
43    TYPE_APPROVAL_REVOCATION, TYPE_APPROVAL_USE, TYPE_JOURNAL_CHECKPOINT,
44    approval_revocation_record_digest, approval_use_record_digest,
45    journal_checkpoint_record_digest,
46};
47
48// ---------------------------------------------------------------------------
49// Errors
50// ---------------------------------------------------------------------------
51
52#[derive(Debug)]
53pub enum JournalError {
54    Io(std::io::Error),
55    Json(serde_json::Error),
56    /// `previous_record_digest` on a record didn't match the prior
57    /// record's `record_digest`. The chain is broken.
58    BrokenChain {
59        index:    u64,
60        expected: String,
61        actual:   String,
62    },
63    /// A record's stored `record_digest` didn't match the recomputed
64    /// digest. The record was tampered after write.
65    RecordTampered {
66        index:    u64,
67        expected: String,
68        actual:   String,
69    },
70    /// A record file referenced by the head no longer exists.
71    MissingRecord {
72        index: u64,
73    },
74    /// The journal's append lock could not be acquired.
75    LockBusy,
76    /// The append exceeds `max_uses` recorded on prior uses for this
77    /// grant. Surfaced as an error so callers (PR 3) refuse to sign
78    /// the action; PR 2 itself only writes uses passed in by callers,
79    /// so this only fires from `append_use` when the caller didn't
80    /// preflight via `check_replay`.
81    MaxUsesExceeded {
82        grant_id:   String,
83        max_uses:   u32,
84        current:    u32,
85    },
86}
87
88impl std::fmt::Display for JournalError {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        match self {
91            Self::Io(e)            => write!(f, "journal io: {e}"),
92            Self::Json(e)          => write!(f, "journal json: {e}"),
93            Self::BrokenChain { index, expected, actual } => write!(
94                f,
95                "journal broken at record {index}: previous_record_digest = {actual}, expected {expected}",
96            ),
97            Self::RecordTampered { index, expected, actual } => write!(
98                f,
99                "journal record {index} tampered: stored digest {expected}, recomputed {actual}",
100            ),
101            Self::MissingRecord { index } => write!(
102                f,
103                "journal record {index} referenced by head but missing on disk",
104            ),
105            Self::LockBusy => write!(f, "journal append lock busy; another process holds it"),
106            Self::MaxUsesExceeded { grant_id, max_uses, current } => write!(
107                f,
108                "approval grant {grant_id} would exceed max_uses ({current}/{max_uses})",
109            ),
110        }
111    }
112}
113
114impl std::error::Error for JournalError {}
115impl From<std::io::Error>    for JournalError { fn from(e: std::io::Error)    -> Self { Self::Io(e) } }
116impl From<serde_json::Error> for JournalError { fn from(e: serde_json::Error) -> Self { Self::Json(e) } }
117
118// ---------------------------------------------------------------------------
119// Layout
120// ---------------------------------------------------------------------------
121
122/// Directory layout under `.treeship/journals/approval-use/`.
123pub struct Journal {
124    /// Root directory.
125    pub dir: PathBuf,
126}
127
128impl Journal {
129    pub fn new(dir: impl Into<PathBuf>) -> Self {
130        Self { dir: dir.into() }
131    }
132
133    pub fn records_dir(&self) -> PathBuf  { self.dir.join("records") }
134    pub fn heads_dir(&self)   -> PathBuf  { self.dir.join("heads") }
135    pub fn indexes_dir(&self) -> PathBuf  { self.dir.join("indexes") }
136    pub fn locks_dir(&self)   -> PathBuf  { self.dir.join("locks") }
137    pub fn current_head_path(&self) -> PathBuf { self.heads_dir().join("current.json") }
138    pub fn lock_path(&self)         -> PathBuf { self.locks_dir().join("journal.lock") }
139    pub fn meta_path(&self)         -> PathBuf { self.dir.join("journal.json") }
140
141    /// Index file for a given grant. Each line is one `record_index`.
142    pub fn by_grant_path(&self, grant_id: &str) -> PathBuf {
143        self.indexes_dir().join("by-grant").join(format!("{}.txt", safe_name(grant_id)))
144    }
145
146    /// Index file for a nonce_digest.
147    pub fn by_nonce_path(&self, nonce_digest: &str) -> PathBuf {
148        self.indexes_dir().join("by-nonce").join(format!("{}.txt", safe_name(nonce_digest)))
149    }
150
151    /// Returns true iff the journal directory exists.
152    pub fn exists(&self) -> bool {
153        self.dir.is_dir()
154    }
155}
156
157/// Make a filesystem-safe name by replacing path-unsafe chars. Used for
158/// index file names; not a security boundary -- the journal's actual
159/// integrity check is the hash chain.
160fn safe_name(s: &str) -> String {
161    s.chars()
162        .map(|c| match c {
163            ':' | '/' | '\\' | ' ' | '.' => '_',
164            c => c,
165        })
166        .collect()
167}
168
169// ---------------------------------------------------------------------------
170// Head file
171// ---------------------------------------------------------------------------
172
173#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
174pub struct Head {
175    /// 1-indexed; 0 means "no records yet."
176    pub index: u64,
177    /// `record_digest` of the most recent record. Empty when index=0.
178    pub digest: String,
179    /// Updated on every append.
180    pub updated_at: String,
181}
182
183impl Default for Head {
184    fn default() -> Self {
185        Self {
186            index:      0,
187            digest:     String::new(),
188            updated_at: String::new(),
189        }
190    }
191}
192
193fn read_head(j: &Journal) -> Result<Head, JournalError> {
194    let path = j.current_head_path();
195    if !path.exists() {
196        return Ok(Head::default());
197    }
198    let bytes = fs::read(&path)?;
199    Ok(serde_json::from_slice(&bytes)?)
200}
201
202fn write_head(j: &Journal, head: &Head) -> Result<(), JournalError> {
203    fs::create_dir_all(j.heads_dir())?;
204    let path = j.current_head_path();
205    let tmp = path.with_extension("json.tmp");
206    let json = serde_json::to_vec_pretty(head)?;
207    fs::write(&tmp, json)?;
208    fs::rename(&tmp, &path)?;
209    Ok(())
210}
211
212// ---------------------------------------------------------------------------
213// Append
214// ---------------------------------------------------------------------------
215
216/// Acquire the journal append lock for the duration of the closure. Uses
217/// fs2::FileExt::try_lock_exclusive (the same primitive `session::event_log`
218/// uses) so behavior matches what the rest of the codebase already
219/// trusts.
220#[cfg(not(target_family = "wasm"))]
221fn with_lock<F, T>(j: &Journal, body: F) -> Result<T, JournalError>
222where
223    F: FnOnce() -> Result<T, JournalError>,
224{
225    fs::create_dir_all(j.locks_dir())?;
226    let lock = OpenOptions::new()
227        .read(true)
228        .write(true)
229        .create(true)
230        .truncate(false)
231        .open(j.lock_path())?;
232    if lock.try_lock_exclusive().is_err() {
233        return Err(JournalError::LockBusy);
234    }
235    let result = body();
236    let _ = fs2::FileExt::unlock(&lock);
237    result
238}
239
240/// WASM build: no concurrent writers, no advisory locks. Run the body
241/// directly. Matches `session::event_log`'s wasm fallback.
242#[cfg(target_family = "wasm")]
243fn with_lock<F, T>(_j: &Journal, body: F) -> Result<T, JournalError>
244where
245    F: FnOnce() -> Result<T, JournalError>,
246{
247    body()
248}
249
250/// Append an ApprovalUse to the journal. The caller MUST set
251/// `previous_record_digest` to the current head's digest on the
252/// incoming record; we re-validate before write. `record_digest` is
253/// computed from the canonical form and stamped on the stored record.
254///
255/// Returns the new head's index and digest.
256pub fn append_use(j: &Journal, mut rec: ApprovalUse) -> Result<Head, JournalError> {
257    rec.type_ = TYPE_APPROVAL_USE.into();
258    with_lock(j, || {
259        let head = read_head(j)?;
260        rec.previous_record_digest = head.digest.clone();
261        rec.record_digest = approval_use_record_digest(&rec);
262        let next_index = head.index + 1;
263        write_record_use(j, next_index, &rec)?;
264        update_indexes_for_use(j, next_index, &rec)?;
265        let new_head = Head {
266            index:      next_index,
267            digest:     rec.record_digest.clone(),
268            updated_at: rec.created_at.clone(),
269        };
270        write_head(j, &new_head)?;
271        ensure_meta(j)?;
272        Ok(new_head)
273    })
274}
275
276/// Atomic check-and-append for the consume path. Combines `check_replay` +
277/// append under a single journal lock so concurrent consume paths cannot
278/// bypass `max_uses` via TOCTOU race.
279///
280/// v0.9.9 PR 3 (`reserve_in_journal` in attest.rs) ran `check_replay` and
281/// derived `use_number` *outside* `with_lock`, then called `append_use`
282/// (which takes the lock only for the write). Two parallel attests could
283/// both pass the pre-lock replay check, then queue serially through the
284/// lock, and both would write — exceeding `max_uses=1`. v0.9.10 closes
285/// that race by doing the check *inside* the lock.
286///
287/// The function also stamps `use_number` from the grant-wide count
288/// observed at lock-acquire time. Callers should pass the record with
289/// `use_number = 0` (or any value); it will be overwritten.
290///
291/// Returns the new head on success. On replay violation, returns
292/// `JournalError::MaxUsesExceeded` and writes nothing — the lock is
293/// released without state change.
294pub fn reserve_use(
295    j: &Journal,
296    mut rec: ApprovalUse,
297    max_uses: Option<u32>,
298) -> Result<Head, JournalError> {
299    rec.type_ = TYPE_APPROVAL_USE.into();
300    with_lock(j, || {
301        // Replay check inside the lock. `check_replay` reads the
302        // by-nonce index; while we hold the exclusive lock, no other
303        // writer can mutate that index, so the count is correct.
304        let replay = check_replay(j, &rec.grant_id, &rec.nonce_digest, max_uses)?;
305        if let Some(false) = replay.passed {
306            let current = replay
307                .use_number
308                .map(|n| n.saturating_sub(1))
309                .unwrap_or(0);
310            return Err(JournalError::MaxUsesExceeded {
311                grant_id: rec.grant_id.clone(),
312                max_uses: replay.max_uses.unwrap_or(0),
313                current,
314            });
315        }
316        // Stamp use_number from grant-wide count, also inside the lock,
317        // so two parallel reservations on the same grant cannot both
318        // claim the same use_number.
319        let prior_count = list_uses_for_grant(j, &rec.grant_id)?.len() as u32;
320        rec.use_number = prior_count.saturating_add(1);
321        // Append.
322        let head = read_head(j)?;
323        rec.previous_record_digest = head.digest.clone();
324        rec.record_digest = approval_use_record_digest(&rec);
325        let next_index = head.index + 1;
326        write_record_use(j, next_index, &rec)?;
327        update_indexes_for_use(j, next_index, &rec)?;
328        let new_head = Head {
329            index:      next_index,
330            digest:     rec.record_digest.clone(),
331            updated_at: rec.created_at.clone(),
332        };
333        write_head(j, &new_head)?;
334        ensure_meta(j)?;
335        Ok(new_head)
336    })
337}
338
339/// Append an ApprovalRevocation. Sibling of `append_use`.
340pub fn append_revocation(j: &Journal, mut rec: ApprovalRevocation) -> Result<Head, JournalError> {
341    rec.type_ = TYPE_APPROVAL_REVOCATION.into();
342    with_lock(j, || {
343        let head = read_head(j)?;
344        rec.previous_record_digest = head.digest.clone();
345        rec.record_digest = approval_revocation_record_digest(&rec);
346        let next_index = head.index + 1;
347        write_record_revocation(j, next_index, &rec)?;
348        index_grant(j, next_index, &rec.grant_id)?;
349        let new_head = Head {
350            index:      next_index,
351            digest:     rec.record_digest.clone(),
352            updated_at: rec.created_at.clone(),
353        };
354        write_head(j, &new_head)?;
355        ensure_meta(j)?;
356        Ok(new_head)
357    })
358}
359
360/// Append a JournalCheckpoint over a contiguous range of prior records.
361pub fn append_checkpoint(j: &Journal, mut rec: JournalCheckpoint) -> Result<Head, JournalError> {
362    rec.type_ = TYPE_JOURNAL_CHECKPOINT.into();
363    with_lock(j, || {
364        let head = read_head(j)?;
365        rec.previous_record_digest = head.digest.clone();
366        rec.record_digest = journal_checkpoint_record_digest(&rec);
367        let next_index = head.index + 1;
368        write_record_checkpoint(j, next_index, &rec)?;
369        let new_head = Head {
370            index:      next_index,
371            digest:     rec.record_digest.clone(),
372            updated_at: rec.created_at.clone(),
373        };
374        write_head(j, &new_head)?;
375        ensure_meta(j)?;
376        Ok(new_head)
377    })
378}
379
380fn record_filename(index: u64, type_: &str, digest: &str) -> String {
381    // Use the digest's hex tail (after "sha256:") so the filename is
382    // bounded length and contains no separators.
383    let tail = digest.strip_prefix("sha256:").unwrap_or(digest);
384    let short = &tail[..tail.len().min(16)];
385    format!("{:010}.{type_}.{short}.json", index)
386}
387
388fn write_record_use(j: &Journal, index: u64, rec: &ApprovalUse) -> Result<(), JournalError> {
389    fs::create_dir_all(j.records_dir())?;
390    let name = record_filename(index, "approval-use", &rec.record_digest);
391    let path = j.records_dir().join(&name);
392    let tmp = path.with_extension("json.tmp");
393    let mut f = File::create(&tmp)?;
394    f.write_all(&serde_json::to_vec_pretty(rec)?)?;
395    f.sync_all()?;
396    fs::rename(&tmp, &path)?;
397    Ok(())
398}
399
400fn write_record_revocation(j: &Journal, index: u64, rec: &ApprovalRevocation) -> Result<(), JournalError> {
401    fs::create_dir_all(j.records_dir())?;
402    let name = record_filename(index, "approval-revocation", &rec.record_digest);
403    let path = j.records_dir().join(&name);
404    let tmp = path.with_extension("json.tmp");
405    let mut f = File::create(&tmp)?;
406    f.write_all(&serde_json::to_vec_pretty(rec)?)?;
407    f.sync_all()?;
408    fs::rename(&tmp, &path)?;
409    Ok(())
410}
411
412fn write_record_checkpoint(j: &Journal, index: u64, rec: &JournalCheckpoint) -> Result<(), JournalError> {
413    fs::create_dir_all(j.records_dir())?;
414    let name = record_filename(index, "journal-checkpoint", &rec.record_digest);
415    let path = j.records_dir().join(&name);
416    let tmp = path.with_extension("json.tmp");
417    let mut f = File::create(&tmp)?;
418    f.write_all(&serde_json::to_vec_pretty(rec)?)?;
419    f.sync_all()?;
420    fs::rename(&tmp, &path)?;
421    Ok(())
422}
423
424fn ensure_meta(j: &Journal) -> Result<(), JournalError> {
425    let path = j.meta_path();
426    if path.exists() {
427        return Ok(());
428    }
429    #[derive(serde::Serialize)]
430    struct Meta<'a> {
431        kind:    &'a str,
432        version: &'a str,
433        format:  &'a str,
434    }
435    let meta = Meta { kind: "approval-use-journal", version: "v1", format: "json-records" };
436    let bytes = serde_json::to_vec_pretty(&meta)?;
437    fs::write(&path, bytes)?;
438    Ok(())
439}
440
441// ---------------------------------------------------------------------------
442// Indexes (rebuildable cache)
443// ---------------------------------------------------------------------------
444
445fn append_index(path: &Path, line: &str) -> Result<(), JournalError> {
446    if let Some(parent) = path.parent() {
447        fs::create_dir_all(parent)?;
448    }
449    let mut f = OpenOptions::new().append(true).create(true).open(path)?;
450    writeln!(f, "{line}")?;
451    Ok(())
452}
453
454fn index_grant(j: &Journal, index: u64, grant_id: &str) -> Result<(), JournalError> {
455    append_index(&j.by_grant_path(grant_id), &index.to_string())
456}
457
458fn index_nonce(j: &Journal, index: u64, nonce_digest: &str) -> Result<(), JournalError> {
459    append_index(&j.by_nonce_path(nonce_digest), &index.to_string())
460}
461
462fn update_indexes_for_use(j: &Journal, index: u64, rec: &ApprovalUse) -> Result<(), JournalError> {
463    index_grant(j, index, &rec.grant_id)?;
464    index_nonce(j, index, &rec.nonce_digest)?;
465    Ok(())
466}
467
468/// Delete and rebuild every index from the records directory. Records are
469/// truth; indexes are cache. Useful as a recovery tool when an index file
470/// is corrupt or out of sync.
471pub fn rebuild_indexes(j: &Journal) -> Result<u64, JournalError> {
472    let dir = j.indexes_dir();
473    if dir.is_dir() {
474        // Wipe by recursive remove. Atomic enough; the worst-case is a
475        // partially-rebuilt index, which the next call to this function
476        // also recovers from.
477        fs::remove_dir_all(&dir)?;
478    }
479    let mut rebuilt = 0u64;
480    for (idx, kind, bytes) in iter_records(j)? {
481        match kind.as_str() {
482            "approval-use" => {
483                let rec: ApprovalUse = serde_json::from_slice(&bytes)?;
484                update_indexes_for_use(j, idx, &rec)?;
485                rebuilt += 1;
486            }
487            "approval-revocation" => {
488                let rec: ApprovalRevocation = serde_json::from_slice(&bytes)?;
489                index_grant(j, idx, &rec.grant_id)?;
490                rebuilt += 1;
491            }
492            "journal-checkpoint" => {
493                rebuilt += 1; // checkpoints aren't indexed by grant/nonce
494            }
495            _ => {}
496        }
497    }
498    Ok(rebuilt)
499}
500
501// ---------------------------------------------------------------------------
502// Iteration + integrity
503// ---------------------------------------------------------------------------
504
505/// Walk records/ in index order. Returns `(index, kind, bytes)`. Kind is
506/// derived from the filename ("approval-use" / "approval-revocation" /
507/// "journal-checkpoint"). Filenames Treeship doesn't recognize are
508/// skipped silently rather than failing the whole walk -- a future record
509/// type added by a newer version shouldn't break older readers.
510fn iter_records(j: &Journal) -> Result<Vec<(u64, String, Vec<u8>)>, JournalError> {
511    let dir = j.records_dir();
512    if !dir.is_dir() {
513        return Ok(Vec::new());
514    }
515    let mut entries: Vec<(u64, String, PathBuf)> = Vec::new();
516    for entry in fs::read_dir(&dir)? {
517        let entry = entry?;
518        let path = entry.path();
519        if path.extension().and_then(|s| s.to_str()) != Some("json") {
520            continue;
521        }
522        let name = match path.file_name().and_then(|n| n.to_str()) {
523            Some(n) => n,
524            None    => continue,
525        };
526        // Filename shape: "<10-digit-index>.<kind>.<short-digest>.json"
527        let mut parts = name.splitn(4, '.');
528        let idx_str = match parts.next() { Some(s) => s, None => continue };
529        let kind    = match parts.next() { Some(s) => s, None => continue };
530        // index parses as u64
531        let idx = match idx_str.parse::<u64>() { Ok(n) => n, Err(_) => continue };
532        entries.push((idx, kind.to_string(), path));
533    }
534    entries.sort_by_key(|(idx, _, _)| *idx);
535    let mut out = Vec::with_capacity(entries.len());
536    for (idx, kind, path) in entries {
537        let bytes = fs::read(&path)?;
538        out.push((idx, kind, bytes));
539    }
540    Ok(out)
541}
542
543/// Walk every record in order, recompute each `record_digest`, and check
544/// that each record's `previous_record_digest` matches the prior
545/// record's stored `record_digest`. Returns the number of records walked
546/// or an error pinpointing the first integrity failure.
547pub fn verify_integrity(j: &Journal) -> Result<u64, JournalError> {
548    let mut prior_digest = String::new();
549    let mut count = 0u64;
550    let head = read_head(j)?;
551    for (idx, kind, bytes) in iter_records(j)? {
552        match kind.as_str() {
553            "approval-use" => {
554                let rec: ApprovalUse = serde_json::from_slice(&bytes)?;
555                if rec.previous_record_digest != prior_digest {
556                    return Err(JournalError::BrokenChain {
557                        index:    idx,
558                        expected: prior_digest,
559                        actual:   rec.previous_record_digest,
560                    });
561                }
562                let recomputed = approval_use_record_digest(&rec);
563                if recomputed != rec.record_digest {
564                    return Err(JournalError::RecordTampered {
565                        index:    idx,
566                        expected: rec.record_digest,
567                        actual:   recomputed,
568                    });
569                }
570                prior_digest = rec.record_digest;
571            }
572            "approval-revocation" => {
573                let rec: ApprovalRevocation = serde_json::from_slice(&bytes)?;
574                if rec.previous_record_digest != prior_digest {
575                    return Err(JournalError::BrokenChain {
576                        index:    idx,
577                        expected: prior_digest,
578                        actual:   rec.previous_record_digest,
579                    });
580                }
581                let recomputed = approval_revocation_record_digest(&rec);
582                if recomputed != rec.record_digest {
583                    return Err(JournalError::RecordTampered {
584                        index:    idx,
585                        expected: rec.record_digest,
586                        actual:   recomputed,
587                    });
588                }
589                prior_digest = rec.record_digest;
590            }
591            "journal-checkpoint" => {
592                let rec: JournalCheckpoint = serde_json::from_slice(&bytes)?;
593                if rec.previous_record_digest != prior_digest {
594                    return Err(JournalError::BrokenChain {
595                        index:    idx,
596                        expected: prior_digest,
597                        actual:   rec.previous_record_digest,
598                    });
599                }
600                let recomputed = journal_checkpoint_record_digest(&rec);
601                if recomputed != rec.record_digest {
602                    return Err(JournalError::RecordTampered {
603                        index:    idx,
604                        expected: rec.record_digest,
605                        actual:   recomputed,
606                    });
607                }
608                prior_digest = rec.record_digest;
609            }
610            _ => {
611                // Unknown record kind. Stop the chain check rather than
612                // skip silently -- a newer record type would still need
613                // to participate in the chain.
614                continue;
615            }
616        }
617        count += 1;
618    }
619    // Tail must match the head if records exist; if records were
620    // deleted off the end the head will be stale.
621    if head.index != 0 && head.digest != prior_digest {
622        return Err(JournalError::MissingRecord { index: head.index });
623    }
624    Ok(count)
625}
626
627// ---------------------------------------------------------------------------
628// check_replay
629// ---------------------------------------------------------------------------
630
631/// Check whether (`grant_id`, `nonce_digest`) has already been consumed,
632/// and how many times. Returns a `ReplayCheck` carrying the strongest
633/// level the journal can speak to:
634///
635///   - `NotPerformed` when the journal directory does not exist on disk.
636///     The caller (verify) should fall back to its package-local check.
637///   - `LocalJournal` otherwise. `passed: true` means the use count is
638///     within `max_uses_hint`; `false` means it would exceed.
639///
640/// `max_uses_hint` is what the caller knows from the signed grant's
641/// `ApprovalScope.max_actions`. We accept it as a hint rather than
642/// reading it back from a stored record because the stored uses already
643/// carry their own `max_uses` snapshot, and disagreement between the
644/// hint and the stored value should be visible in `details`.
645pub fn check_replay(
646    j: &Journal,
647    grant_id: &str,
648    nonce_digest: &str,
649    max_uses_hint: Option<u32>,
650) -> Result<ReplayCheck, JournalError> {
651    if !j.exists() {
652        return Ok(ReplayCheck::not_performed());
653    }
654    // Use the by-nonce index: every prior use of the same approval
655    // shares the same nonce_digest, so the index gives us the exact
656    // record list.
657    let index_path = j.by_nonce_path(nonce_digest);
658    let mut current = 0u32;
659    let mut last_max: Option<u32> = None;
660    if index_path.exists() {
661        let raw = fs::read_to_string(&index_path)?;
662        for line in raw.lines() {
663            let idx: u64 = match line.trim().parse() { Ok(n) => n, Err(_) => continue };
664            if let Some(rec) = load_use_record(j, idx)? {
665                // Only count uses that bind to the same grant_id; the
666                // by-nonce index can in theory share a digest across
667                // grants, though in practice nonces are random.
668                if rec.grant_id == grant_id {
669                    current = current.saturating_add(1);
670                    last_max = rec.max_uses.or(last_max);
671                }
672            }
673        }
674    }
675    let max_uses = max_uses_hint.or(last_max);
676    let passed = match max_uses {
677        Some(m) => current < m,
678        None    => true, // unbounded grant; PR 5 reports this honestly
679    };
680    let details = match max_uses {
681        Some(m) => format!("local Approval Use Journal: use {current}/{m}"),
682        None    => format!("local Approval Use Journal: {current} prior use(s); grant has no max_uses"),
683    };
684    Ok(ReplayCheck {
685        level:      ReplayCheckLevel::LocalJournal,
686        use_number: Some(current.saturating_add(1)),
687        max_uses,
688        passed:     Some(passed),
689        details:    Some(details),
690    })
691}
692
693fn load_use_record(j: &Journal, index: u64) -> Result<Option<ApprovalUse>, JournalError> {
694    let dir = j.records_dir();
695    if !dir.is_dir() {
696        return Ok(None);
697    }
698    let prefix = format!("{:010}.approval-use.", index);
699    for entry in fs::read_dir(&dir)? {
700        let entry = entry?;
701        let name = entry.file_name().to_string_lossy().into_owned();
702        if name.starts_with(&prefix) {
703            let bytes = fs::read(entry.path())?;
704            let rec: ApprovalUse = serde_json::from_slice(&bytes)?;
705            return Ok(Some(rec));
706        }
707    }
708    Ok(None)
709}
710
711// ---------------------------------------------------------------------------
712// Public read helpers (CLI)
713// ---------------------------------------------------------------------------
714
715/// Find the recorded ApprovalUse for an already-signed action.
716/// Returns the matching use record plus a `ReplayCheck` that answers
717/// the *verify-time* question -- "is the recorded use within max_uses?"
718/// -- as opposed to `check_replay`'s consume-time question -- "would
719/// the next use exceed?". The two questions look the same but have
720/// different boundary semantics:
721///
722///   consume-time: passed = use_number_that_would_be_allocated <= max_uses
723///                 (i.e. current_count < max_uses, since next = current + 1)
724///   verify-time:  passed = recorded_use_number <= max_uses
725///
726/// Verify should call THIS, not check_replay, when reporting on an
727/// action that already has a journal record.
728pub fn find_use_for_action(
729    j: &Journal,
730    grant_id: &str,
731    nonce_digest: &str,
732    max_uses_hint: Option<u32>,
733) -> Result<Option<(ApprovalUse, ReplayCheck)>, JournalError> {
734    if !j.exists() {
735        return Ok(None);
736    }
737    let index_path = j.by_nonce_path(nonce_digest);
738    if !index_path.exists() {
739        return Ok(None);
740    }
741    let raw = fs::read_to_string(&index_path)?;
742    // The action under verification corresponds to the most recent use
743    // record sharing the same (grant_id, nonce_digest) -- callers can
744    // also disambiguate by `approval_use_id` from action.meta, which
745    // PR 4 wires in. For PR 3, returning the most recent matching use
746    // is sufficient and matches what verify can derive without that
747    // metadata link.
748    let mut latest: Option<ApprovalUse> = None;
749    for line in raw.lines() {
750        let idx: u64 = match line.trim().parse() { Ok(n) => n, Err(_) => continue };
751        if let Some(rec) = load_use_record(j, idx)? {
752            if rec.grant_id == grant_id {
753                latest = Some(rec);
754            }
755        }
756    }
757    let Some(rec) = latest else { return Ok(None) };
758
759    let stored_max = rec.max_uses;
760    let max_uses = max_uses_hint.or(stored_max);
761    let passed = match max_uses {
762        Some(m) => rec.use_number <= m,
763        None    => true,
764    };
765    let details = match max_uses {
766        Some(m) => format!("local Approval Use Journal passed, use {}/{}", rec.use_number, m),
767        None    => format!("local Approval Use Journal: use {} of unbounded grant", rec.use_number),
768    };
769    Ok(Some((
770        rec.clone(),
771        ReplayCheck {
772            level:      ReplayCheckLevel::LocalJournal,
773            use_number: Some(rec.use_number),
774            max_uses,
775            passed:     Some(passed),
776            details:    Some(details),
777        },
778    )))
779}
780
781/// Every ApprovalUse for `grant_id`. Reads the by-grant index, then
782/// loads each record. Quiet on missing journal.
783pub fn list_uses_for_grant(j: &Journal, grant_id: &str) -> Result<Vec<ApprovalUse>, JournalError> {
784    if !j.exists() {
785        return Ok(Vec::new());
786    }
787    let index_path = j.by_grant_path(grant_id);
788    if !index_path.exists() {
789        return Ok(Vec::new());
790    }
791    let raw = fs::read_to_string(&index_path)?;
792    let mut out = Vec::new();
793    for line in raw.lines() {
794        let idx: u64 = match line.trim().parse() { Ok(n) => n, Err(_) => continue };
795        if let Some(rec) = load_use_record(j, idx)? {
796            out.push(rec);
797        }
798    }
799    Ok(out)
800}
801
802// ---------------------------------------------------------------------------
803// Tests
804// ---------------------------------------------------------------------------
805
806#[cfg(test)]
807mod tests {
808    use super::*;
809    use tempfile::tempdir;
810
811    fn sample_use(use_id: &str, grant_id: &str, nonce_digest: &str, n: u32) -> ApprovalUse {
812        ApprovalUse {
813            type_:                  TYPE_APPROVAL_USE.into(),
814            use_id:                 use_id.into(),
815            grant_id:               grant_id.into(),
816            grant_digest:           "sha256:00".into(),
817            nonce_digest:           nonce_digest.into(),
818            actor:                  "agent://deployer".into(),
819            action:                 "deploy.production".into(),
820            subject:                "env://production".into(),
821            session_id:             None,
822            action_artifact_id:     None,
823            receipt_digest:         None,
824            use_number:             n,
825            max_uses:               Some(2),
826            idempotency_key:        None,
827            created_at:             "2026-04-30T07:00:00Z".into(),
828            expires_at:             None,
829            previous_record_digest: String::new(), // append_use rewrites this
830            record_digest:          String::new(), // append_use rewrites this
831            signature:              None,
832            signature_alg:          None,
833            signing_key_id:         None,
834        }
835    }
836
837    #[test]
838    fn first_append_creates_layout_and_head() {
839        let dir = tempdir().unwrap();
840        let j = Journal::new(dir.path());
841        let head = append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
842        assert_eq!(head.index, 1);
843        assert!(j.records_dir().is_dir());
844        assert!(j.heads_dir().is_dir());
845        assert!(j.current_head_path().is_file());
846        assert!(j.meta_path().is_file());
847        // by-grant + by-nonce indexes populated
848        assert!(j.by_grant_path("g1").is_file());
849        assert!(j.by_nonce_path("sha256:nn1").is_file());
850    }
851
852    #[test]
853    fn second_append_links_previous_record_digest() {
854        let dir = tempdir().unwrap();
855        let j = Journal::new(dir.path());
856        let h1 = append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
857        let h2 = append_use(&j, sample_use("use_2", "g1", "sha256:nn2", 2)).unwrap();
858        assert_eq!(h2.index, 2);
859        // Reading record 2 should show previous_record_digest == h1.digest
860        let recs = iter_records(&j).unwrap();
861        assert_eq!(recs.len(), 2);
862        let (_, _, bytes) = &recs[1];
863        let r2: ApprovalUse = serde_json::from_slice(bytes).unwrap();
864        assert_eq!(r2.previous_record_digest, h1.digest);
865    }
866
867    #[test]
868    fn verify_integrity_passes_on_intact_chain() {
869        let dir = tempdir().unwrap();
870        let j = Journal::new(dir.path());
871        for i in 1..=5 {
872            let nd = format!("sha256:nn{i}");
873            append_use(&j, sample_use(&format!("use_{i}"), "g1", &nd, i)).unwrap();
874        }
875        assert_eq!(verify_integrity(&j).unwrap(), 5);
876    }
877
878    #[test]
879    fn editing_a_record_breaks_integrity() {
880        let dir = tempdir().unwrap();
881        let j = Journal::new(dir.path());
882        append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
883        // Find the on-disk record file and corrupt it.
884        let entries: Vec<_> = fs::read_dir(j.records_dir()).unwrap().collect();
885        let entry = entries.into_iter().next().unwrap().unwrap();
886        let mut json: serde_json::Value =
887            serde_json::from_slice(&fs::read(entry.path()).unwrap()).unwrap();
888        json["actor"] = "agent://attacker".into();
889        fs::write(entry.path(), serde_json::to_vec_pretty(&json).unwrap()).unwrap();
890
891        let err = verify_integrity(&j).unwrap_err();
892        assert!(
893            matches!(err, JournalError::RecordTampered { .. }),
894            "expected RecordTampered, got {err:?}"
895        );
896    }
897
898    #[test]
899    fn deleting_a_record_breaks_integrity_or_head_continuity() {
900        let dir = tempdir().unwrap();
901        let j = Journal::new(dir.path());
902        append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
903        append_use(&j, sample_use("use_2", "g1", "sha256:nn2", 2)).unwrap();
904        // Remove the trailing record. Head still points at index 2.
905        let entries: Vec<_> = fs::read_dir(j.records_dir())
906            .unwrap()
907            .map(|e| e.unwrap().path())
908            .collect();
909        let trailing = entries.iter().max().unwrap();
910        fs::remove_file(trailing).unwrap();
911
912        let err = verify_integrity(&j).unwrap_err();
913        assert!(
914            matches!(err, JournalError::MissingRecord { .. }),
915            "expected MissingRecord, got {err:?}"
916        );
917    }
918
919    #[test]
920    fn indexes_can_be_rebuilt_from_records() {
921        let dir = tempdir().unwrap();
922        let j = Journal::new(dir.path());
923        for i in 1..=3 {
924            let nd = format!("sha256:nn{i}");
925            append_use(&j, sample_use(&format!("use_{i}"), "g1", &nd, i)).unwrap();
926        }
927        // Wipe indexes; check_replay (or rebuild_indexes) should still work.
928        fs::remove_dir_all(j.indexes_dir()).unwrap();
929
930        let rebuilt = rebuild_indexes(&j).unwrap();
931        assert_eq!(rebuilt, 3);
932        assert!(j.by_grant_path("g1").is_file());
933        assert!(j.by_nonce_path("sha256:nn1").is_file());
934    }
935
936    #[test]
937    fn check_replay_reports_use_count_and_max() {
938        let dir = tempdir().unwrap();
939        let j = Journal::new(dir.path());
940        // Two prior uses of grant g1 with the same nonce_digest.
941        append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
942        append_use(&j, sample_use("use_2", "g1", "sha256:nn1", 2)).unwrap();
943
944        // max_uses_hint = 2: the next use would be 3/2 -> not passed.
945        let r = check_replay(&j, "g1", "sha256:nn1", Some(2)).unwrap();
946        assert_eq!(r.level, ReplayCheckLevel::LocalJournal);
947        assert_eq!(r.use_number, Some(3));
948        assert_eq!(r.max_uses,   Some(2));
949        assert_eq!(r.passed,     Some(false));
950    }
951
952    #[test]
953    fn check_replay_passes_when_under_max() {
954        let dir = tempdir().unwrap();
955        let j = Journal::new(dir.path());
956        append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
957        let r = check_replay(&j, "g1", "sha256:nn1", Some(2)).unwrap();
958        assert_eq!(r.use_number, Some(2));
959        assert_eq!(r.passed,     Some(true));
960    }
961
962    #[test]
963    fn check_replay_no_journal_returns_not_performed() {
964        let dir = tempdir().unwrap();
965        let absent = dir.path().join("nope");
966        let j = Journal::new(&absent);
967        let r = check_replay(&j, "g1", "sha256:nn1", Some(1)).unwrap();
968        assert_eq!(r.level, ReplayCheckLevel::NotPerformed);
969        assert!(r.use_number.is_none());
970    }
971
972    #[test]
973    fn check_replay_unbounded_grant_passes_with_count() {
974        let dir = tempdir().unwrap();
975        let j = Journal::new(dir.path());
976        append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
977        // No max_uses_hint and stored record's max_uses is Some(2) too,
978        // so we explicitly set None on a fresh record to test the
979        // unbounded path.
980        let mut u = sample_use("use_2", "g2", "sha256:other", 1);
981        u.max_uses = None;
982        append_use(&j, u).unwrap();
983
984        let r = check_replay(&j, "g2", "sha256:other", None).unwrap();
985        assert!(r.passed.unwrap());
986        assert!(r.max_uses.is_none());
987    }
988
989    #[test]
990    fn list_uses_for_grant_returns_records_in_order() {
991        let dir = tempdir().unwrap();
992        let j = Journal::new(dir.path());
993        append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
994        append_use(&j, sample_use("use_2", "g2", "sha256:nn2", 1)).unwrap();
995        append_use(&j, sample_use("use_3", "g1", "sha256:nn3", 2)).unwrap();
996        let g1 = list_uses_for_grant(&j, "g1").unwrap();
997        assert_eq!(g1.len(), 2);
998        assert_eq!(g1[0].use_id, "use_1");
999        assert_eq!(g1[1].use_id, "use_3");
1000    }
1001
1002    #[test]
1003    fn lock_keeps_two_appends_serial() {
1004        // Hold the lock externally; an append should fail with LockBusy
1005        // rather than racing or silently overwriting.
1006        let dir = tempdir().unwrap();
1007        let j = Journal::new(dir.path());
1008        fs::create_dir_all(j.locks_dir()).unwrap();
1009        let held = OpenOptions::new()
1010            .read(true).write(true).create(true).truncate(false)
1011            .open(j.lock_path()).unwrap();
1012        held.try_lock_exclusive().unwrap();
1013
1014        let err = append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap_err();
1015        assert!(matches!(err, JournalError::LockBusy));
1016
1017        let _ = fs2::FileExt::unlock(&held);
1018    }
1019
1020    #[test]
1021    fn revocation_appends_into_chain() {
1022        let dir = tempdir().unwrap();
1023        let j = Journal::new(dir.path());
1024        append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
1025        let rev = ApprovalRevocation {
1026            type_:                  TYPE_APPROVAL_REVOCATION.into(),
1027            revocation_id:          "rev_1".into(),
1028            grant_id:               "g1".into(),
1029            grant_digest:           "sha256:00".into(),
1030            revoker:                "human://alice".into(),
1031            reason:                 Some("rotated key".into()),
1032            created_at:             "2026-04-30T07:01:00Z".into(),
1033            previous_record_digest: String::new(),
1034            record_digest:          String::new(),
1035            signature:              None,
1036            signature_alg:          None,
1037            signing_key_id:         None,
1038        };
1039        let h = append_revocation(&j, rev).unwrap();
1040        assert_eq!(h.index, 2);
1041        assert_eq!(verify_integrity(&j).unwrap(), 2);
1042    }
1043
1044    #[test]
1045    fn record_files_contain_no_raw_nonce_or_signature_secrets() {
1046        // Privacy invariant: ApprovalUse has no `nonce` field on the
1047        // struct, so by construction the stored JSON only contains
1048        // `nonce_digest`. This test pins the on-disk shape so a future
1049        // schema change can't sneak in a raw-nonce field.
1050        let dir = tempdir().unwrap();
1051        let j = Journal::new(dir.path());
1052        append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
1053        let entries: Vec<_> = fs::read_dir(j.records_dir())
1054            .unwrap()
1055            .map(|e| e.unwrap().path())
1056            .collect();
1057        let bytes = fs::read(&entries[0]).unwrap();
1058        let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1059        let obj = json.as_object().unwrap();
1060        for forbidden in ["nonce", "command", "prompt", "file_content", "bearer_token", "api_key"] {
1061            assert!(
1062                !obj.contains_key(forbidden),
1063                "journal record must not contain `{forbidden}`",
1064            );
1065        }
1066        // The digest IS allowed.
1067        assert!(obj.contains_key("nonce_digest"));
1068    }
1069
1070    // -- v0.9.10 PR A: reserve_use atomic check+append regression tests --
1071
1072    #[test]
1073    fn reserve_use_first_call_succeeds_and_stamps_use_number() {
1074        // Caller passes use_number=0; reserve_use stamps it from the
1075        // grant-wide count observed inside the lock.
1076        let dir = tempdir().unwrap();
1077        let j = Journal::new(dir.path());
1078        let mut rec = sample_use("use_1", "g1", "sha256:nn1", 0);
1079        rec.use_number = 0;
1080        let head = reserve_use(&j, rec, Some(1)).unwrap();
1081        assert_eq!(head.index, 1);
1082        let stored = list_uses_for_grant(&j, "g1").unwrap();
1083        assert_eq!(stored.len(), 1);
1084        assert_eq!(stored[0].use_number, 1, "reserve_use must stamp use_number=1 for the first use");
1085    }
1086
1087    #[test]
1088    fn reserve_use_max_uses_1_serial_second_call_rejects() {
1089        // Sequential second call with the same nonce against
1090        // max_uses=1 must error with MaxUsesExceeded BEFORE writing.
1091        let dir = tempdir().unwrap();
1092        let j = Journal::new(dir.path());
1093        reserve_use(&j, sample_use("use_1", "g1", "sha256:nn_a", 0), Some(1)).unwrap();
1094
1095        let err = reserve_use(&j, sample_use("use_2", "g1", "sha256:nn_a", 0), Some(1))
1096            .expect_err("second consume of max_uses=1 grant must fail");
1097        match err {
1098            JournalError::MaxUsesExceeded { grant_id, max_uses, current } => {
1099                assert_eq!(grant_id, "g1");
1100                assert_eq!(max_uses, 1);
1101                assert_eq!(current, 1);
1102            }
1103            other => panic!("expected MaxUsesExceeded, got {other:?}"),
1104        }
1105        // Crucially, the second record is NOT written.
1106        let stored = list_uses_for_grant(&j, "g1").unwrap();
1107        assert_eq!(stored.len(), 1, "rejected reserve must not append");
1108    }
1109
1110    #[test]
1111    fn reserve_use_max_uses_2_two_uses_pass_third_rejects() {
1112        // Legitimate multi-use grant: two distinct nonces, same grant.
1113        let dir = tempdir().unwrap();
1114        let j = Journal::new(dir.path());
1115        let mut a = sample_use("use_1", "g1", "sha256:nn_a", 0); a.max_uses = Some(2);
1116        let mut b = sample_use("use_2", "g1", "sha256:nn_b", 0); b.max_uses = Some(2);
1117        reserve_use(&j, a, Some(2)).unwrap();
1118        reserve_use(&j, b, Some(2)).unwrap();
1119        // A third nonce with max_uses=2 is fine (per-nonce check, not
1120        // per-grant); the journal's invariant is single-use-per-nonce.
1121        let mut c = sample_use("use_3", "g1", "sha256:nn_c", 0); c.max_uses = Some(2);
1122        reserve_use(&j, c, Some(2)).unwrap();
1123        // But a SECOND consume of nn_a violates max_uses=2 because
1124        // that nonce already has 1 use; 1+1 = 2 is within bound, so
1125        // this second use of nn_a is actually allowed — pin that.
1126        let mut a2 = sample_use("use_1b", "g1", "sha256:nn_a", 0); a2.max_uses = Some(2);
1127        reserve_use(&j, a2, Some(2)).unwrap();
1128        // A THIRD consume of nn_a exceeds max_uses=2.
1129        let mut a3 = sample_use("use_1c", "g1", "sha256:nn_a", 0); a3.max_uses = Some(2);
1130        let err = reserve_use(&j, a3, Some(2)).expect_err("third use of same nonce must fail");
1131        assert!(matches!(err, JournalError::MaxUsesExceeded { .. }));
1132    }
1133
1134    /// Round-2 hardening: idempotency-key retries through the
1135    /// CLI's `reserve_in_journal` should NOT bypass `reserve_use`'s
1136    /// max_uses gate. The CLI checks the idempotency key against
1137    /// existing uses *before* calling `reserve_use`; this test
1138    /// confirms that path doesn't sneak a second reservation past
1139    /// max_uses=1 just because a flaky retry uses the same key.
1140    ///
1141    /// The journal-level invariant we pin here: even if a caller
1142    /// repeatedly invokes `reserve_use` with the same record after a
1143    /// `LockBusy`, the second call sees the first record (now
1144    /// committed) and rejects with `MaxUsesExceeded`. There is no
1145    /// "free retry" loophole.
1146    #[test]
1147    fn reserve_use_retry_after_lock_busy_does_not_bypass_max_uses() {
1148        let dir = tempdir().unwrap();
1149        let j = Journal::new(dir.path());
1150        // First reserve commits use_1.
1151        reserve_use(&j, sample_use("use_1", "g1", "sha256:nn_retry", 0), Some(1)).unwrap();
1152        // Subsequent reserves with the SAME nonce all fail with
1153        // MaxUsesExceeded -- no retry-bypass window.
1154        for i in 0..5 {
1155            let err = reserve_use(
1156                &j,
1157                sample_use(&format!("use_retry_{i}"), "g1", "sha256:nn_retry", 0),
1158                Some(1),
1159            ).expect_err("retry must fail");
1160            assert!(matches!(err, JournalError::MaxUsesExceeded { .. }));
1161        }
1162        let stored = list_uses_for_grant(&j, "g1").unwrap();
1163        assert_eq!(stored.len(), 1, "exactly one record on disk despite 5 retries");
1164    }
1165
1166    #[test]
1167    fn reserve_use_concurrent_max_uses_1_only_one_succeeds() {
1168        // The headline regression test for the v0.9.9 TOCTOU race.
1169        //
1170        // Eight threads race to reserve the same (grant_id, nonce_digest)
1171        // against max_uses=1. With v0.9.9's split check_replay/append_use
1172        // pattern, two threads could both pass the pre-lock replay check
1173        // and write — exceeding max_uses. With v0.9.10's reserve_use,
1174        // the check happens INSIDE the lock; exactly one thread wins.
1175        //
1176        // Outcomes for the 7 losers are a mix of:
1177        //   - LockBusy: the lock was held when they tried try_lock
1178        //   - MaxUsesExceeded: they got the lock after the winner
1179        //     released, saw the winner's record, declined to write
1180        // Both are correct — neither is a bypass.
1181        use std::sync::Arc;
1182        use std::sync::atomic::{AtomicUsize, Ordering};
1183        use std::thread;
1184
1185        let dir = tempdir().unwrap();
1186        let dir_path = Arc::new(dir.path().to_path_buf());
1187        let success = Arc::new(AtomicUsize::new(0));
1188        let lock_busy = Arc::new(AtomicUsize::new(0));
1189        let max_exceeded = Arc::new(AtomicUsize::new(0));
1190
1191        let mut handles = Vec::new();
1192        for i in 0..8 {
1193            let dir_path = Arc::clone(&dir_path);
1194            let success = Arc::clone(&success);
1195            let lock_busy = Arc::clone(&lock_busy);
1196            let max_exceeded = Arc::clone(&max_exceeded);
1197            handles.push(thread::spawn(move || {
1198                let j = Journal::new(dir_path.as_path());
1199                let rec = sample_use(
1200                    &format!("use_{i}"),
1201                    "g1",
1202                    "sha256:race_nonce",
1203                    0,
1204                );
1205                match reserve_use(&j, rec, Some(1)) {
1206                    Ok(_)                                            => { success.fetch_add(1, Ordering::SeqCst); }
1207                    Err(JournalError::LockBusy)                      => { lock_busy.fetch_add(1, Ordering::SeqCst); }
1208                    Err(JournalError::MaxUsesExceeded { .. })        => { max_exceeded.fetch_add(1, Ordering::SeqCst); }
1209                    Err(other) => panic!("unexpected error: {other:?}"),
1210                }
1211            }));
1212        }
1213        for h in handles { h.join().unwrap(); }
1214
1215        let s = success.load(Ordering::SeqCst);
1216        let lb = lock_busy.load(Ordering::SeqCst);
1217        let me = max_exceeded.load(Ordering::SeqCst);
1218        assert_eq!(s, 1, "exactly one of 8 concurrent reserves must succeed; got {s} (lock_busy={lb}, max_exceeded={me})");
1219        assert_eq!(s + lb + me, 8, "every thread accounted for");
1220
1221        // Belt-and-braces: only one record actually on disk for this nonce.
1222        let stored = list_uses_for_grant(&Journal::new(dir.path()), "g1").unwrap();
1223        let same_nonce = stored.iter().filter(|u| u.nonce_digest == "sha256:race_nonce").count();
1224        assert_eq!(same_nonce, 1, "exactly one record on disk for the contested nonce");
1225    }
1226}