Skip to main content

treeship_core/journal/
mod.rs

1//! Local Approval Use Journal -- v0.9.9 PR 2.
2//!
3//! Per-workspace append-only memory of consumed Approval Grants. The
4//! journal turns the v0.9.6 "package-local only" replay finding into a
5//! local-journal replay finding: with this module wired through, verify
6//! can say "use 1/1 -- local Approval Use Journal passed" instead of
7//! "no global ledger consulted."
8//!
9//! Scope of THIS PR:
10//!   * journal storage (records/, heads/, indexes/, locks/)
11//!   * append-only writes with file lock + atomic temp+rename
12//!   * hash chain via `previous_record_digest`
13//!   * read-only `check_replay` lookup
14//!   * `verify_integrity` chain walk
15//!   * `rebuild_indexes` from records (records are truth)
16//!
17//! Out of scope (later PRs):
18//!   * consume-before-action wiring inside `treeship attest action` (PR 3)
19//!   * package export of journal records (PR 4)
20//!   * Hub checkpoint signing (PR 6 scaffold)
21//!
22//! Privacy rules baked into the layout:
23//!   * `nonce_digest`, never raw nonce
24//!   * no commands, prompts, file contents, bearer tokens, or API keys
25//!     are stored. The journal answers the single question "has this
26//!     (grant_id, nonce_digest) been consumed before, and if so how
27//!     many times?" -- everything else stays in the signed grant +
28//!     receipt where it already is.
29
30use std::fs::{self, File, OpenOptions};
31use std::io::Write;
32use std::path::{Path, PathBuf};
33
34// fs2 is gated to non-wasm targets at the workspace Cargo.toml; the WASM
35// build has no concurrent writers and no real filesystem, so journal
36// operations fall back to a deterministic "no-op write" mode that still
37// keeps the public API building. Same pattern session::event_log uses.
38#[cfg(not(target_family = "wasm"))]
39use fs2::FileExt;
40
41use crate::statements::{
42    ApprovalRevocation, ApprovalUse, JournalCheckpoint, ReplayCheck, ReplayCheckLevel,
43    TYPE_APPROVAL_REVOCATION, TYPE_APPROVAL_USE, TYPE_JOURNAL_CHECKPOINT,
44    approval_revocation_record_digest, approval_use_record_digest,
45    journal_checkpoint_record_digest,
46};
47
48// ---------------------------------------------------------------------------
49// Errors
50// ---------------------------------------------------------------------------
51
52#[derive(Debug)]
53pub enum JournalError {
54    Io(std::io::Error),
55    Json(serde_json::Error),
56    /// `previous_record_digest` on a record didn't match the prior
57    /// record's `record_digest`. The chain is broken.
58    BrokenChain {
59        index:    u64,
60        expected: String,
61        actual:   String,
62    },
63    /// A record's stored `record_digest` didn't match the recomputed
64    /// digest. The record was tampered after write.
65    RecordTampered {
66        index:    u64,
67        expected: String,
68        actual:   String,
69    },
70    /// A record file referenced by the head no longer exists.
71    MissingRecord {
72        index: u64,
73    },
74    /// The journal's append lock could not be acquired.
75    LockBusy,
76    /// The append exceeds `max_uses` recorded on prior uses for this
77    /// grant. Surfaced as an error so callers (PR 3) refuse to sign
78    /// the action; PR 2 itself only writes uses passed in by callers,
79    /// so this only fires from `append_use` when the caller didn't
80    /// preflight via `check_replay`.
81    MaxUsesExceeded {
82        grant_id:   String,
83        max_uses:   u32,
84        current:    u32,
85    },
86}
87
88impl std::fmt::Display for JournalError {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        match self {
91            Self::Io(e)            => write!(f, "journal io: {e}"),
92            Self::Json(e)          => write!(f, "journal json: {e}"),
93            Self::BrokenChain { index, expected, actual } => write!(
94                f,
95                "journal broken at record {index}: previous_record_digest = {actual}, expected {expected}",
96            ),
97            Self::RecordTampered { index, expected, actual } => write!(
98                f,
99                "journal record {index} tampered: stored digest {expected}, recomputed {actual}",
100            ),
101            Self::MissingRecord { index } => write!(
102                f,
103                "journal record {index} referenced by head but missing on disk",
104            ),
105            Self::LockBusy => write!(f, "journal append lock busy; another process holds it"),
106            Self::MaxUsesExceeded { grant_id, max_uses, current } => write!(
107                f,
108                "approval grant {grant_id} would exceed max_uses ({current}/{max_uses})",
109            ),
110        }
111    }
112}
113
114impl std::error::Error for JournalError {}
115impl From<std::io::Error>    for JournalError { fn from(e: std::io::Error)    -> Self { Self::Io(e) } }
116impl From<serde_json::Error> for JournalError { fn from(e: serde_json::Error) -> Self { Self::Json(e) } }
117
118// ---------------------------------------------------------------------------
119// Layout
120// ---------------------------------------------------------------------------
121
122/// Directory layout under `.treeship/journals/approval-use/`.
123pub struct Journal {
124    /// Root directory.
125    pub dir: PathBuf,
126}
127
128impl Journal {
129    pub fn new(dir: impl Into<PathBuf>) -> Self {
130        Self { dir: dir.into() }
131    }
132
133    pub fn records_dir(&self) -> PathBuf  { self.dir.join("records") }
134    pub fn heads_dir(&self)   -> PathBuf  { self.dir.join("heads") }
135    pub fn indexes_dir(&self) -> PathBuf  { self.dir.join("indexes") }
136    pub fn locks_dir(&self)   -> PathBuf  { self.dir.join("locks") }
137    pub fn current_head_path(&self) -> PathBuf { self.heads_dir().join("current.json") }
138    pub fn lock_path(&self)         -> PathBuf { self.locks_dir().join("journal.lock") }
139    pub fn meta_path(&self)         -> PathBuf { self.dir.join("journal.json") }
140
141    /// Index file for a given grant. Each line is one `record_index`.
142    pub fn by_grant_path(&self, grant_id: &str) -> PathBuf {
143        self.indexes_dir().join("by-grant").join(format!("{}.txt", safe_name(grant_id)))
144    }
145
146    /// Index file for a nonce_digest.
147    pub fn by_nonce_path(&self, nonce_digest: &str) -> PathBuf {
148        self.indexes_dir().join("by-nonce").join(format!("{}.txt", safe_name(nonce_digest)))
149    }
150
151    /// Returns true iff the journal directory exists.
152    pub fn exists(&self) -> bool {
153        self.dir.is_dir()
154    }
155}
156
157/// Make a filesystem-safe name by replacing path-unsafe chars. Used for
158/// index file names; not a security boundary -- the journal's actual
159/// integrity check is the hash chain.
160fn safe_name(s: &str) -> String {
161    s.chars()
162        .map(|c| match c {
163            ':' | '/' | '\\' | ' ' | '.' => '_',
164            c => c,
165        })
166        .collect()
167}
168
169// ---------------------------------------------------------------------------
170// Head file
171// ---------------------------------------------------------------------------
172
173#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
174pub struct Head {
175    /// 1-indexed; 0 means "no records yet."
176    pub index: u64,
177    /// `record_digest` of the most recent record. Empty when index=0.
178    pub digest: String,
179    /// Updated on every append.
180    pub updated_at: String,
181}
182
183impl Default for Head {
184    fn default() -> Self {
185        Self {
186            index:      0,
187            digest:     String::new(),
188            updated_at: String::new(),
189        }
190    }
191}
192
193fn read_head(j: &Journal) -> Result<Head, JournalError> {
194    let path = j.current_head_path();
195    if !path.exists() {
196        return Ok(Head::default());
197    }
198    let bytes = fs::read(&path)?;
199    Ok(serde_json::from_slice(&bytes)?)
200}
201
202fn write_head(j: &Journal, head: &Head) -> Result<(), JournalError> {
203    fs::create_dir_all(j.heads_dir())?;
204    let path = j.current_head_path();
205    let tmp = path.with_extension("json.tmp");
206    let json = serde_json::to_vec_pretty(head)?;
207    fs::write(&tmp, json)?;
208    fs::rename(&tmp, &path)?;
209    Ok(())
210}
211
212// ---------------------------------------------------------------------------
213// Append
214// ---------------------------------------------------------------------------
215
216/// Acquire the journal append lock for the duration of the closure. Uses
217/// fs2::FileExt::try_lock_exclusive (the same primitive `session::event_log`
218/// uses) so behavior matches what the rest of the codebase already
219/// trusts.
220#[cfg(not(target_family = "wasm"))]
221fn with_lock<F, T>(j: &Journal, body: F) -> Result<T, JournalError>
222where
223    F: FnOnce() -> Result<T, JournalError>,
224{
225    fs::create_dir_all(j.locks_dir())?;
226    let lock = OpenOptions::new()
227        .read(true)
228        .write(true)
229        .create(true)
230        .truncate(false)
231        .open(j.lock_path())?;
232    if lock.try_lock_exclusive().is_err() {
233        return Err(JournalError::LockBusy);
234    }
235    let result = body();
236    let _ = fs2::FileExt::unlock(&lock);
237    result
238}
239
240/// WASM build: no concurrent writers, no advisory locks. Run the body
241/// directly. Matches `session::event_log`'s wasm fallback.
242#[cfg(target_family = "wasm")]
243fn with_lock<F, T>(_j: &Journal, body: F) -> Result<T, JournalError>
244where
245    F: FnOnce() -> Result<T, JournalError>,
246{
247    body()
248}
249
250/// Append an ApprovalUse to the journal. The caller MUST set
251/// `previous_record_digest` to the current head's digest on the
252/// incoming record; we re-validate before write. `record_digest` is
253/// computed from the canonical form and stamped on the stored record.
254///
255/// Returns the new head's index and digest.
256pub fn append_use(j: &Journal, mut rec: ApprovalUse) -> Result<Head, JournalError> {
257    rec.type_ = TYPE_APPROVAL_USE.into();
258    with_lock(j, || {
259        let head = read_head(j)?;
260        rec.previous_record_digest = head.digest.clone();
261        rec.record_digest = approval_use_record_digest(&rec);
262        let next_index = head.index + 1;
263        write_record_use(j, next_index, &rec)?;
264        update_indexes_for_use(j, next_index, &rec)?;
265        let new_head = Head {
266            index:      next_index,
267            digest:     rec.record_digest.clone(),
268            updated_at: rec.created_at.clone(),
269        };
270        write_head(j, &new_head)?;
271        ensure_meta(j)?;
272        Ok(new_head)
273    })
274}
275
276/// Append an ApprovalRevocation. Sibling of `append_use`.
277pub fn append_revocation(j: &Journal, mut rec: ApprovalRevocation) -> Result<Head, JournalError> {
278    rec.type_ = TYPE_APPROVAL_REVOCATION.into();
279    with_lock(j, || {
280        let head = read_head(j)?;
281        rec.previous_record_digest = head.digest.clone();
282        rec.record_digest = approval_revocation_record_digest(&rec);
283        let next_index = head.index + 1;
284        write_record_revocation(j, next_index, &rec)?;
285        index_grant(j, next_index, &rec.grant_id)?;
286        let new_head = Head {
287            index:      next_index,
288            digest:     rec.record_digest.clone(),
289            updated_at: rec.created_at.clone(),
290        };
291        write_head(j, &new_head)?;
292        ensure_meta(j)?;
293        Ok(new_head)
294    })
295}
296
297/// Append a JournalCheckpoint over a contiguous range of prior records.
298pub fn append_checkpoint(j: &Journal, mut rec: JournalCheckpoint) -> Result<Head, JournalError> {
299    rec.type_ = TYPE_JOURNAL_CHECKPOINT.into();
300    with_lock(j, || {
301        let head = read_head(j)?;
302        rec.previous_record_digest = head.digest.clone();
303        rec.record_digest = journal_checkpoint_record_digest(&rec);
304        let next_index = head.index + 1;
305        write_record_checkpoint(j, next_index, &rec)?;
306        let new_head = Head {
307            index:      next_index,
308            digest:     rec.record_digest.clone(),
309            updated_at: rec.created_at.clone(),
310        };
311        write_head(j, &new_head)?;
312        ensure_meta(j)?;
313        Ok(new_head)
314    })
315}
316
317fn record_filename(index: u64, type_: &str, digest: &str) -> String {
318    // Use the digest's hex tail (after "sha256:") so the filename is
319    // bounded length and contains no separators.
320    let tail = digest.strip_prefix("sha256:").unwrap_or(digest);
321    let short = &tail[..tail.len().min(16)];
322    format!("{:010}.{type_}.{short}.json", index)
323}
324
325fn write_record_use(j: &Journal, index: u64, rec: &ApprovalUse) -> Result<(), JournalError> {
326    fs::create_dir_all(j.records_dir())?;
327    let name = record_filename(index, "approval-use", &rec.record_digest);
328    let path = j.records_dir().join(&name);
329    let tmp = path.with_extension("json.tmp");
330    let mut f = File::create(&tmp)?;
331    f.write_all(&serde_json::to_vec_pretty(rec)?)?;
332    f.sync_all()?;
333    fs::rename(&tmp, &path)?;
334    Ok(())
335}
336
337fn write_record_revocation(j: &Journal, index: u64, rec: &ApprovalRevocation) -> Result<(), JournalError> {
338    fs::create_dir_all(j.records_dir())?;
339    let name = record_filename(index, "approval-revocation", &rec.record_digest);
340    let path = j.records_dir().join(&name);
341    let tmp = path.with_extension("json.tmp");
342    let mut f = File::create(&tmp)?;
343    f.write_all(&serde_json::to_vec_pretty(rec)?)?;
344    f.sync_all()?;
345    fs::rename(&tmp, &path)?;
346    Ok(())
347}
348
349fn write_record_checkpoint(j: &Journal, index: u64, rec: &JournalCheckpoint) -> Result<(), JournalError> {
350    fs::create_dir_all(j.records_dir())?;
351    let name = record_filename(index, "journal-checkpoint", &rec.record_digest);
352    let path = j.records_dir().join(&name);
353    let tmp = path.with_extension("json.tmp");
354    let mut f = File::create(&tmp)?;
355    f.write_all(&serde_json::to_vec_pretty(rec)?)?;
356    f.sync_all()?;
357    fs::rename(&tmp, &path)?;
358    Ok(())
359}
360
361fn ensure_meta(j: &Journal) -> Result<(), JournalError> {
362    let path = j.meta_path();
363    if path.exists() {
364        return Ok(());
365    }
366    #[derive(serde::Serialize)]
367    struct Meta<'a> {
368        kind:    &'a str,
369        version: &'a str,
370        format:  &'a str,
371    }
372    let meta = Meta { kind: "approval-use-journal", version: "v1", format: "json-records" };
373    let bytes = serde_json::to_vec_pretty(&meta)?;
374    fs::write(&path, bytes)?;
375    Ok(())
376}
377
378// ---------------------------------------------------------------------------
379// Indexes (rebuildable cache)
380// ---------------------------------------------------------------------------
381
382fn append_index(path: &Path, line: &str) -> Result<(), JournalError> {
383    if let Some(parent) = path.parent() {
384        fs::create_dir_all(parent)?;
385    }
386    let mut f = OpenOptions::new().append(true).create(true).open(path)?;
387    writeln!(f, "{line}")?;
388    Ok(())
389}
390
391fn index_grant(j: &Journal, index: u64, grant_id: &str) -> Result<(), JournalError> {
392    append_index(&j.by_grant_path(grant_id), &index.to_string())
393}
394
395fn index_nonce(j: &Journal, index: u64, nonce_digest: &str) -> Result<(), JournalError> {
396    append_index(&j.by_nonce_path(nonce_digest), &index.to_string())
397}
398
399fn update_indexes_for_use(j: &Journal, index: u64, rec: &ApprovalUse) -> Result<(), JournalError> {
400    index_grant(j, index, &rec.grant_id)?;
401    index_nonce(j, index, &rec.nonce_digest)?;
402    Ok(())
403}
404
405/// Delete and rebuild every index from the records directory. Records are
406/// truth; indexes are cache. Useful as a recovery tool when an index file
407/// is corrupt or out of sync.
408pub fn rebuild_indexes(j: &Journal) -> Result<u64, JournalError> {
409    let dir = j.indexes_dir();
410    if dir.is_dir() {
411        // Wipe by recursive remove. Atomic enough; the worst-case is a
412        // partially-rebuilt index, which the next call to this function
413        // also recovers from.
414        fs::remove_dir_all(&dir)?;
415    }
416    let mut rebuilt = 0u64;
417    for (idx, kind, bytes) in iter_records(j)? {
418        match kind.as_str() {
419            "approval-use" => {
420                let rec: ApprovalUse = serde_json::from_slice(&bytes)?;
421                update_indexes_for_use(j, idx, &rec)?;
422                rebuilt += 1;
423            }
424            "approval-revocation" => {
425                let rec: ApprovalRevocation = serde_json::from_slice(&bytes)?;
426                index_grant(j, idx, &rec.grant_id)?;
427                rebuilt += 1;
428            }
429            "journal-checkpoint" => {
430                rebuilt += 1; // checkpoints aren't indexed by grant/nonce
431            }
432            _ => {}
433        }
434    }
435    Ok(rebuilt)
436}
437
438// ---------------------------------------------------------------------------
439// Iteration + integrity
440// ---------------------------------------------------------------------------
441
442/// Walk records/ in index order. Returns `(index, kind, bytes)`. Kind is
443/// derived from the filename ("approval-use" / "approval-revocation" /
444/// "journal-checkpoint"). Filenames Treeship doesn't recognize are
445/// skipped silently rather than failing the whole walk -- a future record
446/// type added by a newer version shouldn't break older readers.
447fn iter_records(j: &Journal) -> Result<Vec<(u64, String, Vec<u8>)>, JournalError> {
448    let dir = j.records_dir();
449    if !dir.is_dir() {
450        return Ok(Vec::new());
451    }
452    let mut entries: Vec<(u64, String, PathBuf)> = Vec::new();
453    for entry in fs::read_dir(&dir)? {
454        let entry = entry?;
455        let path = entry.path();
456        if path.extension().and_then(|s| s.to_str()) != Some("json") {
457            continue;
458        }
459        let name = match path.file_name().and_then(|n| n.to_str()) {
460            Some(n) => n,
461            None    => continue,
462        };
463        // Filename shape: "<10-digit-index>.<kind>.<short-digest>.json"
464        let mut parts = name.splitn(4, '.');
465        let idx_str = match parts.next() { Some(s) => s, None => continue };
466        let kind    = match parts.next() { Some(s) => s, None => continue };
467        // index parses as u64
468        let idx = match idx_str.parse::<u64>() { Ok(n) => n, Err(_) => continue };
469        entries.push((idx, kind.to_string(), path));
470    }
471    entries.sort_by_key(|(idx, _, _)| *idx);
472    let mut out = Vec::with_capacity(entries.len());
473    for (idx, kind, path) in entries {
474        let bytes = fs::read(&path)?;
475        out.push((idx, kind, bytes));
476    }
477    Ok(out)
478}
479
480/// Walk every record in order, recompute each `record_digest`, and check
481/// that each record's `previous_record_digest` matches the prior
482/// record's stored `record_digest`. Returns the number of records walked
483/// or an error pinpointing the first integrity failure.
484pub fn verify_integrity(j: &Journal) -> Result<u64, JournalError> {
485    let mut prior_digest = String::new();
486    let mut count = 0u64;
487    let head = read_head(j)?;
488    for (idx, kind, bytes) in iter_records(j)? {
489        match kind.as_str() {
490            "approval-use" => {
491                let rec: ApprovalUse = serde_json::from_slice(&bytes)?;
492                if rec.previous_record_digest != prior_digest {
493                    return Err(JournalError::BrokenChain {
494                        index:    idx,
495                        expected: prior_digest,
496                        actual:   rec.previous_record_digest,
497                    });
498                }
499                let recomputed = approval_use_record_digest(&rec);
500                if recomputed != rec.record_digest {
501                    return Err(JournalError::RecordTampered {
502                        index:    idx,
503                        expected: rec.record_digest,
504                        actual:   recomputed,
505                    });
506                }
507                prior_digest = rec.record_digest;
508            }
509            "approval-revocation" => {
510                let rec: ApprovalRevocation = serde_json::from_slice(&bytes)?;
511                if rec.previous_record_digest != prior_digest {
512                    return Err(JournalError::BrokenChain {
513                        index:    idx,
514                        expected: prior_digest,
515                        actual:   rec.previous_record_digest,
516                    });
517                }
518                let recomputed = approval_revocation_record_digest(&rec);
519                if recomputed != rec.record_digest {
520                    return Err(JournalError::RecordTampered {
521                        index:    idx,
522                        expected: rec.record_digest,
523                        actual:   recomputed,
524                    });
525                }
526                prior_digest = rec.record_digest;
527            }
528            "journal-checkpoint" => {
529                let rec: JournalCheckpoint = serde_json::from_slice(&bytes)?;
530                if rec.previous_record_digest != prior_digest {
531                    return Err(JournalError::BrokenChain {
532                        index:    idx,
533                        expected: prior_digest,
534                        actual:   rec.previous_record_digest,
535                    });
536                }
537                let recomputed = journal_checkpoint_record_digest(&rec);
538                if recomputed != rec.record_digest {
539                    return Err(JournalError::RecordTampered {
540                        index:    idx,
541                        expected: rec.record_digest,
542                        actual:   recomputed,
543                    });
544                }
545                prior_digest = rec.record_digest;
546            }
547            _ => {
548                // Unknown record kind. Stop the chain check rather than
549                // skip silently -- a newer record type would still need
550                // to participate in the chain.
551                continue;
552            }
553        }
554        count += 1;
555    }
556    // Tail must match the head if records exist; if records were
557    // deleted off the end the head will be stale.
558    if head.index != 0 && head.digest != prior_digest {
559        return Err(JournalError::MissingRecord { index: head.index });
560    }
561    Ok(count)
562}
563
564// ---------------------------------------------------------------------------
565// check_replay
566// ---------------------------------------------------------------------------
567
568/// Check whether (`grant_id`, `nonce_digest`) has already been consumed,
569/// and how many times. Returns a `ReplayCheck` carrying the strongest
570/// level the journal can speak to:
571///
572///   - `NotPerformed` when the journal directory does not exist on disk.
573///     The caller (verify) should fall back to its package-local check.
574///   - `LocalJournal` otherwise. `passed: true` means the use count is
575///     within `max_uses_hint`; `false` means it would exceed.
576///
577/// `max_uses_hint` is what the caller knows from the signed grant's
578/// `ApprovalScope.max_actions`. We accept it as a hint rather than
579/// reading it back from a stored record because the stored uses already
580/// carry their own `max_uses` snapshot, and disagreement between the
581/// hint and the stored value should be visible in `details`.
582pub fn check_replay(
583    j: &Journal,
584    grant_id: &str,
585    nonce_digest: &str,
586    max_uses_hint: Option<u32>,
587) -> Result<ReplayCheck, JournalError> {
588    if !j.exists() {
589        return Ok(ReplayCheck::not_performed());
590    }
591    // Use the by-nonce index: every prior use of the same approval
592    // shares the same nonce_digest, so the index gives us the exact
593    // record list.
594    let index_path = j.by_nonce_path(nonce_digest);
595    let mut current = 0u32;
596    let mut last_max: Option<u32> = None;
597    if index_path.exists() {
598        let raw = fs::read_to_string(&index_path)?;
599        for line in raw.lines() {
600            let idx: u64 = match line.trim().parse() { Ok(n) => n, Err(_) => continue };
601            if let Some(rec) = load_use_record(j, idx)? {
602                // Only count uses that bind to the same grant_id; the
603                // by-nonce index can in theory share a digest across
604                // grants, though in practice nonces are random.
605                if rec.grant_id == grant_id {
606                    current = current.saturating_add(1);
607                    last_max = rec.max_uses.or(last_max);
608                }
609            }
610        }
611    }
612    let max_uses = max_uses_hint.or(last_max);
613    let passed = match max_uses {
614        Some(m) => current < m,
615        None    => true, // unbounded grant; PR 5 reports this honestly
616    };
617    let details = match max_uses {
618        Some(m) => format!("local Approval Use Journal: use {current}/{m}"),
619        None    => format!("local Approval Use Journal: {current} prior use(s); grant has no max_uses"),
620    };
621    Ok(ReplayCheck {
622        level:      ReplayCheckLevel::LocalJournal,
623        use_number: Some(current.saturating_add(1)),
624        max_uses,
625        passed:     Some(passed),
626        details:    Some(details),
627    })
628}
629
630fn load_use_record(j: &Journal, index: u64) -> Result<Option<ApprovalUse>, JournalError> {
631    let dir = j.records_dir();
632    if !dir.is_dir() {
633        return Ok(None);
634    }
635    let prefix = format!("{:010}.approval-use.", index);
636    for entry in fs::read_dir(&dir)? {
637        let entry = entry?;
638        let name = entry.file_name().to_string_lossy().into_owned();
639        if name.starts_with(&prefix) {
640            let bytes = fs::read(entry.path())?;
641            let rec: ApprovalUse = serde_json::from_slice(&bytes)?;
642            return Ok(Some(rec));
643        }
644    }
645    Ok(None)
646}
647
648// ---------------------------------------------------------------------------
649// Public read helpers (CLI)
650// ---------------------------------------------------------------------------
651
652/// Find the recorded ApprovalUse for an already-signed action.
653/// Returns the matching use record plus a `ReplayCheck` that answers
654/// the *verify-time* question -- "is the recorded use within max_uses?"
655/// -- as opposed to `check_replay`'s consume-time question -- "would
656/// the next use exceed?". The two questions look the same but have
657/// different boundary semantics:
658///
659///   consume-time: passed = use_number_that_would_be_allocated <= max_uses
660///                 (i.e. current_count < max_uses, since next = current + 1)
661///   verify-time:  passed = recorded_use_number <= max_uses
662///
663/// Verify should call THIS, not check_replay, when reporting on an
664/// action that already has a journal record.
665pub fn find_use_for_action(
666    j: &Journal,
667    grant_id: &str,
668    nonce_digest: &str,
669    max_uses_hint: Option<u32>,
670) -> Result<Option<(ApprovalUse, ReplayCheck)>, JournalError> {
671    if !j.exists() {
672        return Ok(None);
673    }
674    let index_path = j.by_nonce_path(nonce_digest);
675    if !index_path.exists() {
676        return Ok(None);
677    }
678    let raw = fs::read_to_string(&index_path)?;
679    // The action under verification corresponds to the most recent use
680    // record sharing the same (grant_id, nonce_digest) -- callers can
681    // also disambiguate by `approval_use_id` from action.meta, which
682    // PR 4 wires in. For PR 3, returning the most recent matching use
683    // is sufficient and matches what verify can derive without that
684    // metadata link.
685    let mut latest: Option<ApprovalUse> = None;
686    for line in raw.lines() {
687        let idx: u64 = match line.trim().parse() { Ok(n) => n, Err(_) => continue };
688        if let Some(rec) = load_use_record(j, idx)? {
689            if rec.grant_id == grant_id {
690                latest = Some(rec);
691            }
692        }
693    }
694    let Some(rec) = latest else { return Ok(None) };
695
696    let stored_max = rec.max_uses;
697    let max_uses = max_uses_hint.or(stored_max);
698    let passed = match max_uses {
699        Some(m) => rec.use_number <= m,
700        None    => true,
701    };
702    let details = match max_uses {
703        Some(m) => format!("local Approval Use Journal passed, use {}/{}", rec.use_number, m),
704        None    => format!("local Approval Use Journal: use {} of unbounded grant", rec.use_number),
705    };
706    Ok(Some((
707        rec.clone(),
708        ReplayCheck {
709            level:      ReplayCheckLevel::LocalJournal,
710            use_number: Some(rec.use_number),
711            max_uses,
712            passed:     Some(passed),
713            details:    Some(details),
714        },
715    )))
716}
717
718/// Every ApprovalUse for `grant_id`. Reads the by-grant index, then
719/// loads each record. Quiet on missing journal.
720pub fn list_uses_for_grant(j: &Journal, grant_id: &str) -> Result<Vec<ApprovalUse>, JournalError> {
721    if !j.exists() {
722        return Ok(Vec::new());
723    }
724    let index_path = j.by_grant_path(grant_id);
725    if !index_path.exists() {
726        return Ok(Vec::new());
727    }
728    let raw = fs::read_to_string(&index_path)?;
729    let mut out = Vec::new();
730    for line in raw.lines() {
731        let idx: u64 = match line.trim().parse() { Ok(n) => n, Err(_) => continue };
732        if let Some(rec) = load_use_record(j, idx)? {
733            out.push(rec);
734        }
735    }
736    Ok(out)
737}
738
739// ---------------------------------------------------------------------------
740// Tests
741// ---------------------------------------------------------------------------
742
743#[cfg(test)]
744mod tests {
745    use super::*;
746    use tempfile::tempdir;
747
748    fn sample_use(use_id: &str, grant_id: &str, nonce_digest: &str, n: u32) -> ApprovalUse {
749        ApprovalUse {
750            type_:                  TYPE_APPROVAL_USE.into(),
751            use_id:                 use_id.into(),
752            grant_id:               grant_id.into(),
753            grant_digest:           "sha256:00".into(),
754            nonce_digest:           nonce_digest.into(),
755            actor:                  "agent://deployer".into(),
756            action:                 "deploy.production".into(),
757            subject:                "env://production".into(),
758            session_id:             None,
759            action_artifact_id:     None,
760            receipt_digest:         None,
761            use_number:             n,
762            max_uses:               Some(2),
763            idempotency_key:        None,
764            created_at:             "2026-04-30T07:00:00Z".into(),
765            expires_at:             None,
766            previous_record_digest: String::new(), // append_use rewrites this
767            record_digest:          String::new(), // append_use rewrites this
768            signature:              None,
769            signature_alg:          None,
770            signing_key_id:         None,
771        }
772    }
773
774    #[test]
775    fn first_append_creates_layout_and_head() {
776        let dir = tempdir().unwrap();
777        let j = Journal::new(dir.path());
778        let head = append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
779        assert_eq!(head.index, 1);
780        assert!(j.records_dir().is_dir());
781        assert!(j.heads_dir().is_dir());
782        assert!(j.current_head_path().is_file());
783        assert!(j.meta_path().is_file());
784        // by-grant + by-nonce indexes populated
785        assert!(j.by_grant_path("g1").is_file());
786        assert!(j.by_nonce_path("sha256:nn1").is_file());
787    }
788
789    #[test]
790    fn second_append_links_previous_record_digest() {
791        let dir = tempdir().unwrap();
792        let j = Journal::new(dir.path());
793        let h1 = append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
794        let h2 = append_use(&j, sample_use("use_2", "g1", "sha256:nn2", 2)).unwrap();
795        assert_eq!(h2.index, 2);
796        // Reading record 2 should show previous_record_digest == h1.digest
797        let recs = iter_records(&j).unwrap();
798        assert_eq!(recs.len(), 2);
799        let (_, _, bytes) = &recs[1];
800        let r2: ApprovalUse = serde_json::from_slice(bytes).unwrap();
801        assert_eq!(r2.previous_record_digest, h1.digest);
802    }
803
804    #[test]
805    fn verify_integrity_passes_on_intact_chain() {
806        let dir = tempdir().unwrap();
807        let j = Journal::new(dir.path());
808        for i in 1..=5 {
809            let nd = format!("sha256:nn{i}");
810            append_use(&j, sample_use(&format!("use_{i}"), "g1", &nd, i)).unwrap();
811        }
812        assert_eq!(verify_integrity(&j).unwrap(), 5);
813    }
814
815    #[test]
816    fn editing_a_record_breaks_integrity() {
817        let dir = tempdir().unwrap();
818        let j = Journal::new(dir.path());
819        append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
820        // Find the on-disk record file and corrupt it.
821        let entries: Vec<_> = fs::read_dir(j.records_dir()).unwrap().collect();
822        let entry = entries.into_iter().next().unwrap().unwrap();
823        let mut json: serde_json::Value =
824            serde_json::from_slice(&fs::read(entry.path()).unwrap()).unwrap();
825        json["actor"] = "agent://attacker".into();
826        fs::write(entry.path(), serde_json::to_vec_pretty(&json).unwrap()).unwrap();
827
828        let err = verify_integrity(&j).unwrap_err();
829        assert!(
830            matches!(err, JournalError::RecordTampered { .. }),
831            "expected RecordTampered, got {err:?}"
832        );
833    }
834
835    #[test]
836    fn deleting_a_record_breaks_integrity_or_head_continuity() {
837        let dir = tempdir().unwrap();
838        let j = Journal::new(dir.path());
839        append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
840        append_use(&j, sample_use("use_2", "g1", "sha256:nn2", 2)).unwrap();
841        // Remove the trailing record. Head still points at index 2.
842        let entries: Vec<_> = fs::read_dir(j.records_dir())
843            .unwrap()
844            .map(|e| e.unwrap().path())
845            .collect();
846        let trailing = entries.iter().max().unwrap();
847        fs::remove_file(trailing).unwrap();
848
849        let err = verify_integrity(&j).unwrap_err();
850        assert!(
851            matches!(err, JournalError::MissingRecord { .. }),
852            "expected MissingRecord, got {err:?}"
853        );
854    }
855
856    #[test]
857    fn indexes_can_be_rebuilt_from_records() {
858        let dir = tempdir().unwrap();
859        let j = Journal::new(dir.path());
860        for i in 1..=3 {
861            let nd = format!("sha256:nn{i}");
862            append_use(&j, sample_use(&format!("use_{i}"), "g1", &nd, i)).unwrap();
863        }
864        // Wipe indexes; check_replay (or rebuild_indexes) should still work.
865        fs::remove_dir_all(j.indexes_dir()).unwrap();
866
867        let rebuilt = rebuild_indexes(&j).unwrap();
868        assert_eq!(rebuilt, 3);
869        assert!(j.by_grant_path("g1").is_file());
870        assert!(j.by_nonce_path("sha256:nn1").is_file());
871    }
872
873    #[test]
874    fn check_replay_reports_use_count_and_max() {
875        let dir = tempdir().unwrap();
876        let j = Journal::new(dir.path());
877        // Two prior uses of grant g1 with the same nonce_digest.
878        append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
879        append_use(&j, sample_use("use_2", "g1", "sha256:nn1", 2)).unwrap();
880
881        // max_uses_hint = 2: the next use would be 3/2 -> not passed.
882        let r = check_replay(&j, "g1", "sha256:nn1", Some(2)).unwrap();
883        assert_eq!(r.level, ReplayCheckLevel::LocalJournal);
884        assert_eq!(r.use_number, Some(3));
885        assert_eq!(r.max_uses,   Some(2));
886        assert_eq!(r.passed,     Some(false));
887    }
888
889    #[test]
890    fn check_replay_passes_when_under_max() {
891        let dir = tempdir().unwrap();
892        let j = Journal::new(dir.path());
893        append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
894        let r = check_replay(&j, "g1", "sha256:nn1", Some(2)).unwrap();
895        assert_eq!(r.use_number, Some(2));
896        assert_eq!(r.passed,     Some(true));
897    }
898
899    #[test]
900    fn check_replay_no_journal_returns_not_performed() {
901        let dir = tempdir().unwrap();
902        let absent = dir.path().join("nope");
903        let j = Journal::new(&absent);
904        let r = check_replay(&j, "g1", "sha256:nn1", Some(1)).unwrap();
905        assert_eq!(r.level, ReplayCheckLevel::NotPerformed);
906        assert!(r.use_number.is_none());
907    }
908
909    #[test]
910    fn check_replay_unbounded_grant_passes_with_count() {
911        let dir = tempdir().unwrap();
912        let j = Journal::new(dir.path());
913        append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
914        // No max_uses_hint and stored record's max_uses is Some(2) too,
915        // so we explicitly set None on a fresh record to test the
916        // unbounded path.
917        let mut u = sample_use("use_2", "g2", "sha256:other", 1);
918        u.max_uses = None;
919        append_use(&j, u).unwrap();
920
921        let r = check_replay(&j, "g2", "sha256:other", None).unwrap();
922        assert!(r.passed.unwrap());
923        assert!(r.max_uses.is_none());
924    }
925
926    #[test]
927    fn list_uses_for_grant_returns_records_in_order() {
928        let dir = tempdir().unwrap();
929        let j = Journal::new(dir.path());
930        append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
931        append_use(&j, sample_use("use_2", "g2", "sha256:nn2", 1)).unwrap();
932        append_use(&j, sample_use("use_3", "g1", "sha256:nn3", 2)).unwrap();
933        let g1 = list_uses_for_grant(&j, "g1").unwrap();
934        assert_eq!(g1.len(), 2);
935        assert_eq!(g1[0].use_id, "use_1");
936        assert_eq!(g1[1].use_id, "use_3");
937    }
938
939    #[test]
940    fn lock_keeps_two_appends_serial() {
941        // Hold the lock externally; an append should fail with LockBusy
942        // rather than racing or silently overwriting.
943        let dir = tempdir().unwrap();
944        let j = Journal::new(dir.path());
945        fs::create_dir_all(j.locks_dir()).unwrap();
946        let held = OpenOptions::new()
947            .read(true).write(true).create(true).truncate(false)
948            .open(j.lock_path()).unwrap();
949        held.try_lock_exclusive().unwrap();
950
951        let err = append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap_err();
952        assert!(matches!(err, JournalError::LockBusy));
953
954        let _ = fs2::FileExt::unlock(&held);
955    }
956
957    #[test]
958    fn revocation_appends_into_chain() {
959        let dir = tempdir().unwrap();
960        let j = Journal::new(dir.path());
961        append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
962        let rev = ApprovalRevocation {
963            type_:                  TYPE_APPROVAL_REVOCATION.into(),
964            revocation_id:          "rev_1".into(),
965            grant_id:               "g1".into(),
966            grant_digest:           "sha256:00".into(),
967            revoker:                "human://alice".into(),
968            reason:                 Some("rotated key".into()),
969            created_at:             "2026-04-30T07:01:00Z".into(),
970            previous_record_digest: String::new(),
971            record_digest:          String::new(),
972            signature:              None,
973            signature_alg:          None,
974            signing_key_id:         None,
975        };
976        let h = append_revocation(&j, rev).unwrap();
977        assert_eq!(h.index, 2);
978        assert_eq!(verify_integrity(&j).unwrap(), 2);
979    }
980
981    #[test]
982    fn record_files_contain_no_raw_nonce_or_signature_secrets() {
983        // Privacy invariant: ApprovalUse has no `nonce` field on the
984        // struct, so by construction the stored JSON only contains
985        // `nonce_digest`. This test pins the on-disk shape so a future
986        // schema change can't sneak in a raw-nonce field.
987        let dir = tempdir().unwrap();
988        let j = Journal::new(dir.path());
989        append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
990        let entries: Vec<_> = fs::read_dir(j.records_dir())
991            .unwrap()
992            .map(|e| e.unwrap().path())
993            .collect();
994        let bytes = fs::read(&entries[0]).unwrap();
995        let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
996        let obj = json.as_object().unwrap();
997        for forbidden in ["nonce", "command", "prompt", "file_content", "bearer_token", "api_key"] {
998            assert!(
999                !obj.contains_key(forbidden),
1000                "journal record must not contain `{forbidden}`",
1001            );
1002        }
1003        // The digest IS allowed.
1004        assert!(obj.contains_key("nonce_digest"));
1005    }
1006}