use std::fs::{self, File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
#[cfg(not(target_family = "wasm"))]
use fs2::FileExt;
use crate::statements::{
ApprovalRevocation, ApprovalUse, JournalCheckpoint, ReplayCheck, ReplayCheckLevel,
TYPE_APPROVAL_REVOCATION, TYPE_APPROVAL_USE, TYPE_JOURNAL_CHECKPOINT,
approval_revocation_record_digest, approval_use_record_digest,
journal_checkpoint_record_digest,
};
#[derive(Debug)]
pub enum JournalError {
Io(std::io::Error),
Json(serde_json::Error),
BrokenChain {
index: u64,
expected: String,
actual: String,
},
RecordTampered {
index: u64,
expected: String,
actual: String,
},
MissingRecord {
index: u64,
},
LockBusy,
MaxUsesExceeded {
grant_id: String,
max_uses: u32,
current: u32,
},
}
impl std::fmt::Display for JournalError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(e) => write!(f, "journal io: {e}"),
Self::Json(e) => write!(f, "journal json: {e}"),
Self::BrokenChain { index, expected, actual } => write!(
f,
"journal broken at record {index}: previous_record_digest = {actual}, expected {expected}",
),
Self::RecordTampered { index, expected, actual } => write!(
f,
"journal record {index} tampered: stored digest {expected}, recomputed {actual}",
),
Self::MissingRecord { index } => write!(
f,
"journal record {index} referenced by head but missing on disk",
),
Self::LockBusy => write!(f, "journal append lock busy; another process holds it"),
Self::MaxUsesExceeded { grant_id, max_uses, current } => write!(
f,
"approval grant {grant_id} would exceed max_uses ({current}/{max_uses})",
),
}
}
}
impl std::error::Error for JournalError {}
impl From<std::io::Error> for JournalError { fn from(e: std::io::Error) -> Self { Self::Io(e) } }
impl From<serde_json::Error> for JournalError { fn from(e: serde_json::Error) -> Self { Self::Json(e) } }
pub struct Journal {
pub dir: PathBuf,
}
impl Journal {
pub fn new(dir: impl Into<PathBuf>) -> Self {
Self { dir: dir.into() }
}
pub fn records_dir(&self) -> PathBuf { self.dir.join("records") }
pub fn heads_dir(&self) -> PathBuf { self.dir.join("heads") }
pub fn indexes_dir(&self) -> PathBuf { self.dir.join("indexes") }
pub fn locks_dir(&self) -> PathBuf { self.dir.join("locks") }
pub fn current_head_path(&self) -> PathBuf { self.heads_dir().join("current.json") }
pub fn lock_path(&self) -> PathBuf { self.locks_dir().join("journal.lock") }
pub fn meta_path(&self) -> PathBuf { self.dir.join("journal.json") }
pub fn by_grant_path(&self, grant_id: &str) -> PathBuf {
self.indexes_dir().join("by-grant").join(format!("{}.txt", safe_name(grant_id)))
}
pub fn by_nonce_path(&self, nonce_digest: &str) -> PathBuf {
self.indexes_dir().join("by-nonce").join(format!("{}.txt", safe_name(nonce_digest)))
}
pub fn exists(&self) -> bool {
self.dir.is_dir()
}
}
fn safe_name(s: &str) -> String {
s.chars()
.map(|c| match c {
':' | '/' | '\\' | ' ' | '.' => '_',
c => c,
})
.collect()
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Head {
pub index: u64,
pub digest: String,
pub updated_at: String,
}
impl Default for Head {
fn default() -> Self {
Self {
index: 0,
digest: String::new(),
updated_at: String::new(),
}
}
}
fn read_head(j: &Journal) -> Result<Head, JournalError> {
let path = j.current_head_path();
if !path.exists() {
return Ok(Head::default());
}
let bytes = fs::read(&path)?;
Ok(serde_json::from_slice(&bytes)?)
}
fn write_head(j: &Journal, head: &Head) -> Result<(), JournalError> {
fs::create_dir_all(j.heads_dir())?;
let path = j.current_head_path();
let tmp = path.with_extension("json.tmp");
let json = serde_json::to_vec_pretty(head)?;
fs::write(&tmp, json)?;
fs::rename(&tmp, &path)?;
Ok(())
}
#[cfg(not(target_family = "wasm"))]
fn with_lock<F, T>(j: &Journal, body: F) -> Result<T, JournalError>
where
F: FnOnce() -> Result<T, JournalError>,
{
fs::create_dir_all(j.locks_dir())?;
let lock = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(j.lock_path())?;
if lock.try_lock_exclusive().is_err() {
return Err(JournalError::LockBusy);
}
let result = body();
let _ = fs2::FileExt::unlock(&lock);
result
}
#[cfg(target_family = "wasm")]
fn with_lock<F, T>(_j: &Journal, body: F) -> Result<T, JournalError>
where
F: FnOnce() -> Result<T, JournalError>,
{
body()
}
pub fn append_use(j: &Journal, mut rec: ApprovalUse) -> Result<Head, JournalError> {
rec.type_ = TYPE_APPROVAL_USE.into();
with_lock(j, || {
let head = read_head(j)?;
rec.previous_record_digest = head.digest.clone();
rec.record_digest = approval_use_record_digest(&rec);
let next_index = head.index + 1;
write_record_use(j, next_index, &rec)?;
update_indexes_for_use(j, next_index, &rec)?;
let new_head = Head {
index: next_index,
digest: rec.record_digest.clone(),
updated_at: rec.created_at.clone(),
};
write_head(j, &new_head)?;
ensure_meta(j)?;
Ok(new_head)
})
}
pub fn reserve_use(
j: &Journal,
mut rec: ApprovalUse,
max_uses: Option<u32>,
) -> Result<Head, JournalError> {
rec.type_ = TYPE_APPROVAL_USE.into();
with_lock(j, || {
let replay = check_replay(j, &rec.grant_id, &rec.nonce_digest, max_uses)?;
if let Some(false) = replay.passed {
let current = replay
.use_number
.map(|n| n.saturating_sub(1))
.unwrap_or(0);
return Err(JournalError::MaxUsesExceeded {
grant_id: rec.grant_id.clone(),
max_uses: replay.max_uses.unwrap_or(0),
current,
});
}
let prior_count = list_uses_for_grant(j, &rec.grant_id)?.len() as u32;
rec.use_number = prior_count.saturating_add(1);
let head = read_head(j)?;
rec.previous_record_digest = head.digest.clone();
rec.record_digest = approval_use_record_digest(&rec);
let next_index = head.index + 1;
write_record_use(j, next_index, &rec)?;
update_indexes_for_use(j, next_index, &rec)?;
let new_head = Head {
index: next_index,
digest: rec.record_digest.clone(),
updated_at: rec.created_at.clone(),
};
write_head(j, &new_head)?;
ensure_meta(j)?;
Ok(new_head)
})
}
pub fn append_revocation(j: &Journal, mut rec: ApprovalRevocation) -> Result<Head, JournalError> {
rec.type_ = TYPE_APPROVAL_REVOCATION.into();
with_lock(j, || {
let head = read_head(j)?;
rec.previous_record_digest = head.digest.clone();
rec.record_digest = approval_revocation_record_digest(&rec);
let next_index = head.index + 1;
write_record_revocation(j, next_index, &rec)?;
index_grant(j, next_index, &rec.grant_id)?;
let new_head = Head {
index: next_index,
digest: rec.record_digest.clone(),
updated_at: rec.created_at.clone(),
};
write_head(j, &new_head)?;
ensure_meta(j)?;
Ok(new_head)
})
}
pub fn append_checkpoint(j: &Journal, mut rec: JournalCheckpoint) -> Result<Head, JournalError> {
rec.type_ = TYPE_JOURNAL_CHECKPOINT.into();
with_lock(j, || {
let head = read_head(j)?;
rec.previous_record_digest = head.digest.clone();
rec.record_digest = journal_checkpoint_record_digest(&rec);
let next_index = head.index + 1;
write_record_checkpoint(j, next_index, &rec)?;
let new_head = Head {
index: next_index,
digest: rec.record_digest.clone(),
updated_at: rec.created_at.clone(),
};
write_head(j, &new_head)?;
ensure_meta(j)?;
Ok(new_head)
})
}
fn record_filename(index: u64, type_: &str, digest: &str) -> String {
let tail = digest.strip_prefix("sha256:").unwrap_or(digest);
let short = &tail[..tail.len().min(16)];
format!("{:010}.{type_}.{short}.json", index)
}
fn write_record_use(j: &Journal, index: u64, rec: &ApprovalUse) -> Result<(), JournalError> {
fs::create_dir_all(j.records_dir())?;
let name = record_filename(index, "approval-use", &rec.record_digest);
let path = j.records_dir().join(&name);
let tmp = path.with_extension("json.tmp");
let mut f = File::create(&tmp)?;
f.write_all(&serde_json::to_vec_pretty(rec)?)?;
f.sync_all()?;
fs::rename(&tmp, &path)?;
Ok(())
}
fn write_record_revocation(j: &Journal, index: u64, rec: &ApprovalRevocation) -> Result<(), JournalError> {
fs::create_dir_all(j.records_dir())?;
let name = record_filename(index, "approval-revocation", &rec.record_digest);
let path = j.records_dir().join(&name);
let tmp = path.with_extension("json.tmp");
let mut f = File::create(&tmp)?;
f.write_all(&serde_json::to_vec_pretty(rec)?)?;
f.sync_all()?;
fs::rename(&tmp, &path)?;
Ok(())
}
fn write_record_checkpoint(j: &Journal, index: u64, rec: &JournalCheckpoint) -> Result<(), JournalError> {
fs::create_dir_all(j.records_dir())?;
let name = record_filename(index, "journal-checkpoint", &rec.record_digest);
let path = j.records_dir().join(&name);
let tmp = path.with_extension("json.tmp");
let mut f = File::create(&tmp)?;
f.write_all(&serde_json::to_vec_pretty(rec)?)?;
f.sync_all()?;
fs::rename(&tmp, &path)?;
Ok(())
}
fn ensure_meta(j: &Journal) -> Result<(), JournalError> {
let path = j.meta_path();
if path.exists() {
return Ok(());
}
#[derive(serde::Serialize)]
struct Meta<'a> {
kind: &'a str,
version: &'a str,
format: &'a str,
}
let meta = Meta { kind: "approval-use-journal", version: "v1", format: "json-records" };
let bytes = serde_json::to_vec_pretty(&meta)?;
fs::write(&path, bytes)?;
Ok(())
}
fn append_index(path: &Path, line: &str) -> Result<(), JournalError> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let mut f = OpenOptions::new().append(true).create(true).open(path)?;
writeln!(f, "{line}")?;
Ok(())
}
fn index_grant(j: &Journal, index: u64, grant_id: &str) -> Result<(), JournalError> {
append_index(&j.by_grant_path(grant_id), &index.to_string())
}
fn index_nonce(j: &Journal, index: u64, nonce_digest: &str) -> Result<(), JournalError> {
append_index(&j.by_nonce_path(nonce_digest), &index.to_string())
}
fn update_indexes_for_use(j: &Journal, index: u64, rec: &ApprovalUse) -> Result<(), JournalError> {
index_grant(j, index, &rec.grant_id)?;
index_nonce(j, index, &rec.nonce_digest)?;
Ok(())
}
pub fn rebuild_indexes(j: &Journal) -> Result<u64, JournalError> {
let dir = j.indexes_dir();
if dir.is_dir() {
fs::remove_dir_all(&dir)?;
}
let mut rebuilt = 0u64;
for (idx, kind, bytes) in iter_records(j)? {
match kind.as_str() {
"approval-use" => {
let rec: ApprovalUse = serde_json::from_slice(&bytes)?;
update_indexes_for_use(j, idx, &rec)?;
rebuilt += 1;
}
"approval-revocation" => {
let rec: ApprovalRevocation = serde_json::from_slice(&bytes)?;
index_grant(j, idx, &rec.grant_id)?;
rebuilt += 1;
}
"journal-checkpoint" => {
rebuilt += 1; }
_ => {}
}
}
Ok(rebuilt)
}
fn iter_records(j: &Journal) -> Result<Vec<(u64, String, Vec<u8>)>, JournalError> {
let dir = j.records_dir();
if !dir.is_dir() {
return Ok(Vec::new());
}
let mut entries: Vec<(u64, String, PathBuf)> = Vec::new();
for entry in fs::read_dir(&dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) != Some("json") {
continue;
}
let name = match path.file_name().and_then(|n| n.to_str()) {
Some(n) => n,
None => continue,
};
let mut parts = name.splitn(4, '.');
let idx_str = match parts.next() { Some(s) => s, None => continue };
let kind = match parts.next() { Some(s) => s, None => continue };
let idx = match idx_str.parse::<u64>() { Ok(n) => n, Err(_) => continue };
entries.push((idx, kind.to_string(), path));
}
entries.sort_by_key(|(idx, _, _)| *idx);
let mut out = Vec::with_capacity(entries.len());
for (idx, kind, path) in entries {
let bytes = fs::read(&path)?;
out.push((idx, kind, bytes));
}
Ok(out)
}
pub fn verify_integrity(j: &Journal) -> Result<u64, JournalError> {
let mut prior_digest = String::new();
let mut count = 0u64;
let head = read_head(j)?;
for (idx, kind, bytes) in iter_records(j)? {
match kind.as_str() {
"approval-use" => {
let rec: ApprovalUse = serde_json::from_slice(&bytes)?;
if rec.previous_record_digest != prior_digest {
return Err(JournalError::BrokenChain {
index: idx,
expected: prior_digest,
actual: rec.previous_record_digest,
});
}
let recomputed = approval_use_record_digest(&rec);
if recomputed != rec.record_digest {
return Err(JournalError::RecordTampered {
index: idx,
expected: rec.record_digest,
actual: recomputed,
});
}
prior_digest = rec.record_digest;
}
"approval-revocation" => {
let rec: ApprovalRevocation = serde_json::from_slice(&bytes)?;
if rec.previous_record_digest != prior_digest {
return Err(JournalError::BrokenChain {
index: idx,
expected: prior_digest,
actual: rec.previous_record_digest,
});
}
let recomputed = approval_revocation_record_digest(&rec);
if recomputed != rec.record_digest {
return Err(JournalError::RecordTampered {
index: idx,
expected: rec.record_digest,
actual: recomputed,
});
}
prior_digest = rec.record_digest;
}
"journal-checkpoint" => {
let rec: JournalCheckpoint = serde_json::from_slice(&bytes)?;
if rec.previous_record_digest != prior_digest {
return Err(JournalError::BrokenChain {
index: idx,
expected: prior_digest,
actual: rec.previous_record_digest,
});
}
let recomputed = journal_checkpoint_record_digest(&rec);
if recomputed != rec.record_digest {
return Err(JournalError::RecordTampered {
index: idx,
expected: rec.record_digest,
actual: recomputed,
});
}
prior_digest = rec.record_digest;
}
_ => {
continue;
}
}
count += 1;
}
if head.index != 0 && head.digest != prior_digest {
return Err(JournalError::MissingRecord { index: head.index });
}
Ok(count)
}
pub fn check_replay(
j: &Journal,
grant_id: &str,
nonce_digest: &str,
max_uses_hint: Option<u32>,
) -> Result<ReplayCheck, JournalError> {
if !j.exists() {
return Ok(ReplayCheck::not_performed());
}
let index_path = j.by_nonce_path(nonce_digest);
let mut current = 0u32;
let mut last_max: Option<u32> = None;
if index_path.exists() {
let raw = fs::read_to_string(&index_path)?;
for line in raw.lines() {
let idx: u64 = match line.trim().parse() { Ok(n) => n, Err(_) => continue };
if let Some(rec) = load_use_record(j, idx)? {
if rec.grant_id == grant_id {
current = current.saturating_add(1);
last_max = rec.max_uses.or(last_max);
}
}
}
}
let max_uses = max_uses_hint.or(last_max);
let passed = match max_uses {
Some(m) => current < m,
None => true, };
let details = match max_uses {
Some(m) => format!("local Approval Use Journal: use {current}/{m}"),
None => format!("local Approval Use Journal: {current} prior use(s); grant has no max_uses"),
};
Ok(ReplayCheck {
level: ReplayCheckLevel::LocalJournal,
use_number: Some(current.saturating_add(1)),
max_uses,
passed: Some(passed),
details: Some(details),
})
}
fn load_use_record(j: &Journal, index: u64) -> Result<Option<ApprovalUse>, JournalError> {
let dir = j.records_dir();
if !dir.is_dir() {
return Ok(None);
}
let prefix = format!("{:010}.approval-use.", index);
for entry in fs::read_dir(&dir)? {
let entry = entry?;
let name = entry.file_name().to_string_lossy().into_owned();
if name.starts_with(&prefix) {
let bytes = fs::read(entry.path())?;
let rec: ApprovalUse = serde_json::from_slice(&bytes)?;
return Ok(Some(rec));
}
}
Ok(None)
}
pub fn find_use_for_action(
j: &Journal,
grant_id: &str,
nonce_digest: &str,
max_uses_hint: Option<u32>,
) -> Result<Option<(ApprovalUse, ReplayCheck)>, JournalError> {
if !j.exists() {
return Ok(None);
}
let index_path = j.by_nonce_path(nonce_digest);
if !index_path.exists() {
return Ok(None);
}
let raw = fs::read_to_string(&index_path)?;
let mut latest: Option<ApprovalUse> = None;
for line in raw.lines() {
let idx: u64 = match line.trim().parse() { Ok(n) => n, Err(_) => continue };
if let Some(rec) = load_use_record(j, idx)? {
if rec.grant_id == grant_id {
latest = Some(rec);
}
}
}
let Some(rec) = latest else { return Ok(None) };
let stored_max = rec.max_uses;
let max_uses = max_uses_hint.or(stored_max);
let passed = match max_uses {
Some(m) => rec.use_number <= m,
None => true,
};
let details = match max_uses {
Some(m) => format!("local Approval Use Journal passed, use {}/{}", rec.use_number, m),
None => format!("local Approval Use Journal: use {} of unbounded grant", rec.use_number),
};
Ok(Some((
rec.clone(),
ReplayCheck {
level: ReplayCheckLevel::LocalJournal,
use_number: Some(rec.use_number),
max_uses,
passed: Some(passed),
details: Some(details),
},
)))
}
pub fn list_uses_for_grant(j: &Journal, grant_id: &str) -> Result<Vec<ApprovalUse>, JournalError> {
if !j.exists() {
return Ok(Vec::new());
}
let index_path = j.by_grant_path(grant_id);
if !index_path.exists() {
return Ok(Vec::new());
}
let raw = fs::read_to_string(&index_path)?;
let mut out = Vec::new();
for line in raw.lines() {
let idx: u64 = match line.trim().parse() { Ok(n) => n, Err(_) => continue };
if let Some(rec) = load_use_record(j, idx)? {
out.push(rec);
}
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn sample_use(use_id: &str, grant_id: &str, nonce_digest: &str, n: u32) -> ApprovalUse {
ApprovalUse {
type_: TYPE_APPROVAL_USE.into(),
use_id: use_id.into(),
grant_id: grant_id.into(),
grant_digest: "sha256:00".into(),
nonce_digest: nonce_digest.into(),
actor: "agent://deployer".into(),
action: "deploy.production".into(),
subject: "env://production".into(),
session_id: None,
action_artifact_id: None,
receipt_digest: None,
use_number: n,
max_uses: Some(2),
idempotency_key: None,
created_at: "2026-04-30T07:00:00Z".into(),
expires_at: None,
previous_record_digest: String::new(), record_digest: String::new(), signature: None,
signature_alg: None,
signing_key_id: None,
}
}
#[test]
fn first_append_creates_layout_and_head() {
let dir = tempdir().unwrap();
let j = Journal::new(dir.path());
let head = append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
assert_eq!(head.index, 1);
assert!(j.records_dir().is_dir());
assert!(j.heads_dir().is_dir());
assert!(j.current_head_path().is_file());
assert!(j.meta_path().is_file());
assert!(j.by_grant_path("g1").is_file());
assert!(j.by_nonce_path("sha256:nn1").is_file());
}
#[test]
fn second_append_links_previous_record_digest() {
let dir = tempdir().unwrap();
let j = Journal::new(dir.path());
let h1 = append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
let h2 = append_use(&j, sample_use("use_2", "g1", "sha256:nn2", 2)).unwrap();
assert_eq!(h2.index, 2);
let recs = iter_records(&j).unwrap();
assert_eq!(recs.len(), 2);
let (_, _, bytes) = &recs[1];
let r2: ApprovalUse = serde_json::from_slice(bytes).unwrap();
assert_eq!(r2.previous_record_digest, h1.digest);
}
#[test]
fn verify_integrity_passes_on_intact_chain() {
let dir = tempdir().unwrap();
let j = Journal::new(dir.path());
for i in 1..=5 {
let nd = format!("sha256:nn{i}");
append_use(&j, sample_use(&format!("use_{i}"), "g1", &nd, i)).unwrap();
}
assert_eq!(verify_integrity(&j).unwrap(), 5);
}
#[test]
fn editing_a_record_breaks_integrity() {
let dir = tempdir().unwrap();
let j = Journal::new(dir.path());
append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
let entries: Vec<_> = fs::read_dir(j.records_dir()).unwrap().collect();
let entry = entries.into_iter().next().unwrap().unwrap();
let mut json: serde_json::Value =
serde_json::from_slice(&fs::read(entry.path()).unwrap()).unwrap();
json["actor"] = "agent://attacker".into();
fs::write(entry.path(), serde_json::to_vec_pretty(&json).unwrap()).unwrap();
let err = verify_integrity(&j).unwrap_err();
assert!(
matches!(err, JournalError::RecordTampered { .. }),
"expected RecordTampered, got {err:?}"
);
}
#[test]
fn deleting_a_record_breaks_integrity_or_head_continuity() {
let dir = tempdir().unwrap();
let j = Journal::new(dir.path());
append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
append_use(&j, sample_use("use_2", "g1", "sha256:nn2", 2)).unwrap();
let entries: Vec<_> = fs::read_dir(j.records_dir())
.unwrap()
.map(|e| e.unwrap().path())
.collect();
let trailing = entries.iter().max().unwrap();
fs::remove_file(trailing).unwrap();
let err = verify_integrity(&j).unwrap_err();
assert!(
matches!(err, JournalError::MissingRecord { .. }),
"expected MissingRecord, got {err:?}"
);
}
#[test]
fn indexes_can_be_rebuilt_from_records() {
let dir = tempdir().unwrap();
let j = Journal::new(dir.path());
for i in 1..=3 {
let nd = format!("sha256:nn{i}");
append_use(&j, sample_use(&format!("use_{i}"), "g1", &nd, i)).unwrap();
}
fs::remove_dir_all(j.indexes_dir()).unwrap();
let rebuilt = rebuild_indexes(&j).unwrap();
assert_eq!(rebuilt, 3);
assert!(j.by_grant_path("g1").is_file());
assert!(j.by_nonce_path("sha256:nn1").is_file());
}
#[test]
fn check_replay_reports_use_count_and_max() {
let dir = tempdir().unwrap();
let j = Journal::new(dir.path());
append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
append_use(&j, sample_use("use_2", "g1", "sha256:nn1", 2)).unwrap();
let r = check_replay(&j, "g1", "sha256:nn1", Some(2)).unwrap();
assert_eq!(r.level, ReplayCheckLevel::LocalJournal);
assert_eq!(r.use_number, Some(3));
assert_eq!(r.max_uses, Some(2));
assert_eq!(r.passed, Some(false));
}
#[test]
fn check_replay_passes_when_under_max() {
let dir = tempdir().unwrap();
let j = Journal::new(dir.path());
append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
let r = check_replay(&j, "g1", "sha256:nn1", Some(2)).unwrap();
assert_eq!(r.use_number, Some(2));
assert_eq!(r.passed, Some(true));
}
#[test]
fn check_replay_no_journal_returns_not_performed() {
let dir = tempdir().unwrap();
let absent = dir.path().join("nope");
let j = Journal::new(&absent);
let r = check_replay(&j, "g1", "sha256:nn1", Some(1)).unwrap();
assert_eq!(r.level, ReplayCheckLevel::NotPerformed);
assert!(r.use_number.is_none());
}
#[test]
fn check_replay_unbounded_grant_passes_with_count() {
let dir = tempdir().unwrap();
let j = Journal::new(dir.path());
append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
let mut u = sample_use("use_2", "g2", "sha256:other", 1);
u.max_uses = None;
append_use(&j, u).unwrap();
let r = check_replay(&j, "g2", "sha256:other", None).unwrap();
assert!(r.passed.unwrap());
assert!(r.max_uses.is_none());
}
#[test]
fn list_uses_for_grant_returns_records_in_order() {
let dir = tempdir().unwrap();
let j = Journal::new(dir.path());
append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
append_use(&j, sample_use("use_2", "g2", "sha256:nn2", 1)).unwrap();
append_use(&j, sample_use("use_3", "g1", "sha256:nn3", 2)).unwrap();
let g1 = list_uses_for_grant(&j, "g1").unwrap();
assert_eq!(g1.len(), 2);
assert_eq!(g1[0].use_id, "use_1");
assert_eq!(g1[1].use_id, "use_3");
}
#[test]
fn lock_keeps_two_appends_serial() {
let dir = tempdir().unwrap();
let j = Journal::new(dir.path());
fs::create_dir_all(j.locks_dir()).unwrap();
let held = OpenOptions::new()
.read(true).write(true).create(true).truncate(false)
.open(j.lock_path()).unwrap();
held.try_lock_exclusive().unwrap();
let err = append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap_err();
assert!(matches!(err, JournalError::LockBusy));
let _ = fs2::FileExt::unlock(&held);
}
#[test]
fn revocation_appends_into_chain() {
let dir = tempdir().unwrap();
let j = Journal::new(dir.path());
append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
let rev = ApprovalRevocation {
type_: TYPE_APPROVAL_REVOCATION.into(),
revocation_id: "rev_1".into(),
grant_id: "g1".into(),
grant_digest: "sha256:00".into(),
revoker: "human://alice".into(),
reason: Some("rotated key".into()),
created_at: "2026-04-30T07:01:00Z".into(),
previous_record_digest: String::new(),
record_digest: String::new(),
signature: None,
signature_alg: None,
signing_key_id: None,
};
let h = append_revocation(&j, rev).unwrap();
assert_eq!(h.index, 2);
assert_eq!(verify_integrity(&j).unwrap(), 2);
}
#[test]
fn record_files_contain_no_raw_nonce_or_signature_secrets() {
let dir = tempdir().unwrap();
let j = Journal::new(dir.path());
append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
let entries: Vec<_> = fs::read_dir(j.records_dir())
.unwrap()
.map(|e| e.unwrap().path())
.collect();
let bytes = fs::read(&entries[0]).unwrap();
let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
let obj = json.as_object().unwrap();
for forbidden in ["nonce", "command", "prompt", "file_content", "bearer_token", "api_key"] {
assert!(
!obj.contains_key(forbidden),
"journal record must not contain `{forbidden}`",
);
}
assert!(obj.contains_key("nonce_digest"));
}
#[test]
fn reserve_use_first_call_succeeds_and_stamps_use_number() {
let dir = tempdir().unwrap();
let j = Journal::new(dir.path());
let mut rec = sample_use("use_1", "g1", "sha256:nn1", 0);
rec.use_number = 0;
let head = reserve_use(&j, rec, Some(1)).unwrap();
assert_eq!(head.index, 1);
let stored = list_uses_for_grant(&j, "g1").unwrap();
assert_eq!(stored.len(), 1);
assert_eq!(stored[0].use_number, 1, "reserve_use must stamp use_number=1 for the first use");
}
#[test]
fn reserve_use_max_uses_1_serial_second_call_rejects() {
let dir = tempdir().unwrap();
let j = Journal::new(dir.path());
reserve_use(&j, sample_use("use_1", "g1", "sha256:nn_a", 0), Some(1)).unwrap();
let err = reserve_use(&j, sample_use("use_2", "g1", "sha256:nn_a", 0), Some(1))
.expect_err("second consume of max_uses=1 grant must fail");
match err {
JournalError::MaxUsesExceeded { grant_id, max_uses, current } => {
assert_eq!(grant_id, "g1");
assert_eq!(max_uses, 1);
assert_eq!(current, 1);
}
other => panic!("expected MaxUsesExceeded, got {other:?}"),
}
let stored = list_uses_for_grant(&j, "g1").unwrap();
assert_eq!(stored.len(), 1, "rejected reserve must not append");
}
#[test]
fn reserve_use_max_uses_2_two_uses_pass_third_rejects() {
let dir = tempdir().unwrap();
let j = Journal::new(dir.path());
let mut a = sample_use("use_1", "g1", "sha256:nn_a", 0); a.max_uses = Some(2);
let mut b = sample_use("use_2", "g1", "sha256:nn_b", 0); b.max_uses = Some(2);
reserve_use(&j, a, Some(2)).unwrap();
reserve_use(&j, b, Some(2)).unwrap();
let mut c = sample_use("use_3", "g1", "sha256:nn_c", 0); c.max_uses = Some(2);
reserve_use(&j, c, Some(2)).unwrap();
let mut a2 = sample_use("use_1b", "g1", "sha256:nn_a", 0); a2.max_uses = Some(2);
reserve_use(&j, a2, Some(2)).unwrap();
let mut a3 = sample_use("use_1c", "g1", "sha256:nn_a", 0); a3.max_uses = Some(2);
let err = reserve_use(&j, a3, Some(2)).expect_err("third use of same nonce must fail");
assert!(matches!(err, JournalError::MaxUsesExceeded { .. }));
}
#[test]
fn reserve_use_retry_after_lock_busy_does_not_bypass_max_uses() {
let dir = tempdir().unwrap();
let j = Journal::new(dir.path());
reserve_use(&j, sample_use("use_1", "g1", "sha256:nn_retry", 0), Some(1)).unwrap();
for i in 0..5 {
let err = reserve_use(
&j,
sample_use(&format!("use_retry_{i}"), "g1", "sha256:nn_retry", 0),
Some(1),
).expect_err("retry must fail");
assert!(matches!(err, JournalError::MaxUsesExceeded { .. }));
}
let stored = list_uses_for_grant(&j, "g1").unwrap();
assert_eq!(stored.len(), 1, "exactly one record on disk despite 5 retries");
}
#[test]
fn reserve_use_concurrent_max_uses_1_only_one_succeeds() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
let dir = tempdir().unwrap();
let dir_path = Arc::new(dir.path().to_path_buf());
let success = Arc::new(AtomicUsize::new(0));
let lock_busy = Arc::new(AtomicUsize::new(0));
let max_exceeded = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for i in 0..8 {
let dir_path = Arc::clone(&dir_path);
let success = Arc::clone(&success);
let lock_busy = Arc::clone(&lock_busy);
let max_exceeded = Arc::clone(&max_exceeded);
handles.push(thread::spawn(move || {
let j = Journal::new(dir_path.as_path());
let rec = sample_use(
&format!("use_{i}"),
"g1",
"sha256:race_nonce",
0,
);
match reserve_use(&j, rec, Some(1)) {
Ok(_) => { success.fetch_add(1, Ordering::SeqCst); }
Err(JournalError::LockBusy) => { lock_busy.fetch_add(1, Ordering::SeqCst); }
Err(JournalError::MaxUsesExceeded { .. }) => { max_exceeded.fetch_add(1, Ordering::SeqCst); }
Err(other) => panic!("unexpected error: {other:?}"),
}
}));
}
for h in handles { h.join().unwrap(); }
let s = success.load(Ordering::SeqCst);
let lb = lock_busy.load(Ordering::SeqCst);
let me = max_exceeded.load(Ordering::SeqCst);
assert_eq!(s, 1, "exactly one of 8 concurrent reserves must succeed; got {s} (lock_busy={lb}, max_exceeded={me})");
assert_eq!(s + lb + me, 8, "every thread accounted for");
let stored = list_uses_for_grant(&Journal::new(dir.path()), "g1").unwrap();
let same_nonce = stored.iter().filter(|u| u.nonce_digest == "sha256:race_nonce").count();
assert_eq!(same_nonce, 1, "exactly one record on disk for the contested nonce");
}
}