1use std::fs::{self, File, OpenOptions};
31use std::io::Write;
32use std::path::{Path, PathBuf};
33
34#[cfg(not(target_family = "wasm"))]
39use fs2::FileExt;
40
41use crate::statements::{
42 ApprovalRevocation, ApprovalUse, JournalCheckpoint, ReplayCheck, ReplayCheckLevel,
43 TYPE_APPROVAL_REVOCATION, TYPE_APPROVAL_USE, TYPE_JOURNAL_CHECKPOINT,
44 approval_revocation_record_digest, approval_use_record_digest,
45 journal_checkpoint_record_digest,
46};
47
48#[derive(Debug)]
53pub enum JournalError {
54 Io(std::io::Error),
55 Json(serde_json::Error),
56 BrokenChain {
59 index: u64,
60 expected: String,
61 actual: String,
62 },
63 RecordTampered {
66 index: u64,
67 expected: String,
68 actual: String,
69 },
70 MissingRecord {
72 index: u64,
73 },
74 LockBusy,
76 MaxUsesExceeded {
82 grant_id: String,
83 max_uses: u32,
84 current: u32,
85 },
86}
87
88impl std::fmt::Display for JournalError {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 match self {
91 Self::Io(e) => write!(f, "journal io: {e}"),
92 Self::Json(e) => write!(f, "journal json: {e}"),
93 Self::BrokenChain { index, expected, actual } => write!(
94 f,
95 "journal broken at record {index}: previous_record_digest = {actual}, expected {expected}",
96 ),
97 Self::RecordTampered { index, expected, actual } => write!(
98 f,
99 "journal record {index} tampered: stored digest {expected}, recomputed {actual}",
100 ),
101 Self::MissingRecord { index } => write!(
102 f,
103 "journal record {index} referenced by head but missing on disk",
104 ),
105 Self::LockBusy => write!(f, "journal append lock busy; another process holds it"),
106 Self::MaxUsesExceeded { grant_id, max_uses, current } => write!(
107 f,
108 "approval grant {grant_id} would exceed max_uses ({current}/{max_uses})",
109 ),
110 }
111 }
112}
113
114impl std::error::Error for JournalError {}
115impl From<std::io::Error> for JournalError { fn from(e: std::io::Error) -> Self { Self::Io(e) } }
116impl From<serde_json::Error> for JournalError { fn from(e: serde_json::Error) -> Self { Self::Json(e) } }
117
118pub struct Journal {
124 pub dir: PathBuf,
126}
127
128impl Journal {
129 pub fn new(dir: impl Into<PathBuf>) -> Self {
130 Self { dir: dir.into() }
131 }
132
133 pub fn records_dir(&self) -> PathBuf { self.dir.join("records") }
134 pub fn heads_dir(&self) -> PathBuf { self.dir.join("heads") }
135 pub fn indexes_dir(&self) -> PathBuf { self.dir.join("indexes") }
136 pub fn locks_dir(&self) -> PathBuf { self.dir.join("locks") }
137 pub fn current_head_path(&self) -> PathBuf { self.heads_dir().join("current.json") }
138 pub fn lock_path(&self) -> PathBuf { self.locks_dir().join("journal.lock") }
139 pub fn meta_path(&self) -> PathBuf { self.dir.join("journal.json") }
140
141 pub fn by_grant_path(&self, grant_id: &str) -> PathBuf {
143 self.indexes_dir().join("by-grant").join(format!("{}.txt", safe_name(grant_id)))
144 }
145
146 pub fn by_nonce_path(&self, nonce_digest: &str) -> PathBuf {
148 self.indexes_dir().join("by-nonce").join(format!("{}.txt", safe_name(nonce_digest)))
149 }
150
151 pub fn exists(&self) -> bool {
153 self.dir.is_dir()
154 }
155}
156
157fn safe_name(s: &str) -> String {
161 s.chars()
162 .map(|c| match c {
163 ':' | '/' | '\\' | ' ' | '.' => '_',
164 c => c,
165 })
166 .collect()
167}
168
169#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
174pub struct Head {
175 pub index: u64,
177 pub digest: String,
179 pub updated_at: String,
181}
182
183impl Default for Head {
184 fn default() -> Self {
185 Self {
186 index: 0,
187 digest: String::new(),
188 updated_at: String::new(),
189 }
190 }
191}
192
193fn read_head(j: &Journal) -> Result<Head, JournalError> {
194 let path = j.current_head_path();
195 if !path.exists() {
196 return Ok(Head::default());
197 }
198 let bytes = fs::read(&path)?;
199 Ok(serde_json::from_slice(&bytes)?)
200}
201
202fn write_head(j: &Journal, head: &Head) -> Result<(), JournalError> {
203 fs::create_dir_all(j.heads_dir())?;
204 let path = j.current_head_path();
205 let tmp = path.with_extension("json.tmp");
206 let json = serde_json::to_vec_pretty(head)?;
207 fs::write(&tmp, json)?;
208 fs::rename(&tmp, &path)?;
209 Ok(())
210}
211
212#[cfg(not(target_family = "wasm"))]
221fn with_lock<F, T>(j: &Journal, body: F) -> Result<T, JournalError>
222where
223 F: FnOnce() -> Result<T, JournalError>,
224{
225 fs::create_dir_all(j.locks_dir())?;
226 let lock = OpenOptions::new()
227 .read(true)
228 .write(true)
229 .create(true)
230 .truncate(false)
231 .open(j.lock_path())?;
232 if lock.try_lock_exclusive().is_err() {
233 return Err(JournalError::LockBusy);
234 }
235 let result = body();
236 let _ = fs2::FileExt::unlock(&lock);
237 result
238}
239
240#[cfg(target_family = "wasm")]
243fn with_lock<F, T>(_j: &Journal, body: F) -> Result<T, JournalError>
244where
245 F: FnOnce() -> Result<T, JournalError>,
246{
247 body()
248}
249
250pub fn append_use(j: &Journal, mut rec: ApprovalUse) -> Result<Head, JournalError> {
257 rec.type_ = TYPE_APPROVAL_USE.into();
258 with_lock(j, || {
259 let head = read_head(j)?;
260 rec.previous_record_digest = head.digest.clone();
261 rec.record_digest = approval_use_record_digest(&rec);
262 let next_index = head.index + 1;
263 write_record_use(j, next_index, &rec)?;
264 update_indexes_for_use(j, next_index, &rec)?;
265 let new_head = Head {
266 index: next_index,
267 digest: rec.record_digest.clone(),
268 updated_at: rec.created_at.clone(),
269 };
270 write_head(j, &new_head)?;
271 ensure_meta(j)?;
272 Ok(new_head)
273 })
274}
275
276pub fn reserve_use(
295 j: &Journal,
296 mut rec: ApprovalUse,
297 max_uses: Option<u32>,
298) -> Result<Head, JournalError> {
299 rec.type_ = TYPE_APPROVAL_USE.into();
300 with_lock(j, || {
301 let replay = check_replay(j, &rec.grant_id, &rec.nonce_digest, max_uses)?;
305 if let Some(false) = replay.passed {
306 let current = replay
307 .use_number
308 .map(|n| n.saturating_sub(1))
309 .unwrap_or(0);
310 return Err(JournalError::MaxUsesExceeded {
311 grant_id: rec.grant_id.clone(),
312 max_uses: replay.max_uses.unwrap_or(0),
313 current,
314 });
315 }
316 let prior_count = list_uses_for_grant(j, &rec.grant_id)?.len() as u32;
320 rec.use_number = prior_count.saturating_add(1);
321 let head = read_head(j)?;
323 rec.previous_record_digest = head.digest.clone();
324 rec.record_digest = approval_use_record_digest(&rec);
325 let next_index = head.index + 1;
326 write_record_use(j, next_index, &rec)?;
327 update_indexes_for_use(j, next_index, &rec)?;
328 let new_head = Head {
329 index: next_index,
330 digest: rec.record_digest.clone(),
331 updated_at: rec.created_at.clone(),
332 };
333 write_head(j, &new_head)?;
334 ensure_meta(j)?;
335 Ok(new_head)
336 })
337}
338
339pub fn append_revocation(j: &Journal, mut rec: ApprovalRevocation) -> Result<Head, JournalError> {
341 rec.type_ = TYPE_APPROVAL_REVOCATION.into();
342 with_lock(j, || {
343 let head = read_head(j)?;
344 rec.previous_record_digest = head.digest.clone();
345 rec.record_digest = approval_revocation_record_digest(&rec);
346 let next_index = head.index + 1;
347 write_record_revocation(j, next_index, &rec)?;
348 index_grant(j, next_index, &rec.grant_id)?;
349 let new_head = Head {
350 index: next_index,
351 digest: rec.record_digest.clone(),
352 updated_at: rec.created_at.clone(),
353 };
354 write_head(j, &new_head)?;
355 ensure_meta(j)?;
356 Ok(new_head)
357 })
358}
359
360pub fn append_checkpoint(j: &Journal, mut rec: JournalCheckpoint) -> Result<Head, JournalError> {
362 rec.type_ = TYPE_JOURNAL_CHECKPOINT.into();
363 with_lock(j, || {
364 let head = read_head(j)?;
365 rec.previous_record_digest = head.digest.clone();
366 rec.record_digest = journal_checkpoint_record_digest(&rec);
367 let next_index = head.index + 1;
368 write_record_checkpoint(j, next_index, &rec)?;
369 let new_head = Head {
370 index: next_index,
371 digest: rec.record_digest.clone(),
372 updated_at: rec.created_at.clone(),
373 };
374 write_head(j, &new_head)?;
375 ensure_meta(j)?;
376 Ok(new_head)
377 })
378}
379
380fn record_filename(index: u64, type_: &str, digest: &str) -> String {
381 let tail = digest.strip_prefix("sha256:").unwrap_or(digest);
384 let short = &tail[..tail.len().min(16)];
385 format!("{:010}.{type_}.{short}.json", index)
386}
387
388fn write_record_use(j: &Journal, index: u64, rec: &ApprovalUse) -> Result<(), JournalError> {
389 fs::create_dir_all(j.records_dir())?;
390 let name = record_filename(index, "approval-use", &rec.record_digest);
391 let path = j.records_dir().join(&name);
392 let tmp = path.with_extension("json.tmp");
393 let mut f = File::create(&tmp)?;
394 f.write_all(&serde_json::to_vec_pretty(rec)?)?;
395 f.sync_all()?;
396 fs::rename(&tmp, &path)?;
397 Ok(())
398}
399
400fn write_record_revocation(j: &Journal, index: u64, rec: &ApprovalRevocation) -> Result<(), JournalError> {
401 fs::create_dir_all(j.records_dir())?;
402 let name = record_filename(index, "approval-revocation", &rec.record_digest);
403 let path = j.records_dir().join(&name);
404 let tmp = path.with_extension("json.tmp");
405 let mut f = File::create(&tmp)?;
406 f.write_all(&serde_json::to_vec_pretty(rec)?)?;
407 f.sync_all()?;
408 fs::rename(&tmp, &path)?;
409 Ok(())
410}
411
412fn write_record_checkpoint(j: &Journal, index: u64, rec: &JournalCheckpoint) -> Result<(), JournalError> {
413 fs::create_dir_all(j.records_dir())?;
414 let name = record_filename(index, "journal-checkpoint", &rec.record_digest);
415 let path = j.records_dir().join(&name);
416 let tmp = path.with_extension("json.tmp");
417 let mut f = File::create(&tmp)?;
418 f.write_all(&serde_json::to_vec_pretty(rec)?)?;
419 f.sync_all()?;
420 fs::rename(&tmp, &path)?;
421 Ok(())
422}
423
424fn ensure_meta(j: &Journal) -> Result<(), JournalError> {
425 let path = j.meta_path();
426 if path.exists() {
427 return Ok(());
428 }
429 #[derive(serde::Serialize)]
430 struct Meta<'a> {
431 kind: &'a str,
432 version: &'a str,
433 format: &'a str,
434 }
435 let meta = Meta { kind: "approval-use-journal", version: "v1", format: "json-records" };
436 let bytes = serde_json::to_vec_pretty(&meta)?;
437 fs::write(&path, bytes)?;
438 Ok(())
439}
440
441fn append_index(path: &Path, line: &str) -> Result<(), JournalError> {
446 if let Some(parent) = path.parent() {
447 fs::create_dir_all(parent)?;
448 }
449 let mut f = OpenOptions::new().append(true).create(true).open(path)?;
450 writeln!(f, "{line}")?;
451 Ok(())
452}
453
454fn index_grant(j: &Journal, index: u64, grant_id: &str) -> Result<(), JournalError> {
455 append_index(&j.by_grant_path(grant_id), &index.to_string())
456}
457
458fn index_nonce(j: &Journal, index: u64, nonce_digest: &str) -> Result<(), JournalError> {
459 append_index(&j.by_nonce_path(nonce_digest), &index.to_string())
460}
461
462fn update_indexes_for_use(j: &Journal, index: u64, rec: &ApprovalUse) -> Result<(), JournalError> {
463 index_grant(j, index, &rec.grant_id)?;
464 index_nonce(j, index, &rec.nonce_digest)?;
465 Ok(())
466}
467
468pub fn rebuild_indexes(j: &Journal) -> Result<u64, JournalError> {
472 let dir = j.indexes_dir();
473 if dir.is_dir() {
474 fs::remove_dir_all(&dir)?;
478 }
479 let mut rebuilt = 0u64;
480 for (idx, kind, bytes) in iter_records(j)? {
481 match kind.as_str() {
482 "approval-use" => {
483 let rec: ApprovalUse = serde_json::from_slice(&bytes)?;
484 update_indexes_for_use(j, idx, &rec)?;
485 rebuilt += 1;
486 }
487 "approval-revocation" => {
488 let rec: ApprovalRevocation = serde_json::from_slice(&bytes)?;
489 index_grant(j, idx, &rec.grant_id)?;
490 rebuilt += 1;
491 }
492 "journal-checkpoint" => {
493 rebuilt += 1; }
495 _ => {}
496 }
497 }
498 Ok(rebuilt)
499}
500
501fn iter_records(j: &Journal) -> Result<Vec<(u64, String, Vec<u8>)>, JournalError> {
511 let dir = j.records_dir();
512 if !dir.is_dir() {
513 return Ok(Vec::new());
514 }
515 let mut entries: Vec<(u64, String, PathBuf)> = Vec::new();
516 for entry in fs::read_dir(&dir)? {
517 let entry = entry?;
518 let path = entry.path();
519 if path.extension().and_then(|s| s.to_str()) != Some("json") {
520 continue;
521 }
522 let name = match path.file_name().and_then(|n| n.to_str()) {
523 Some(n) => n,
524 None => continue,
525 };
526 let mut parts = name.splitn(4, '.');
528 let idx_str = match parts.next() { Some(s) => s, None => continue };
529 let kind = match parts.next() { Some(s) => s, None => continue };
530 let idx = match idx_str.parse::<u64>() { Ok(n) => n, Err(_) => continue };
532 entries.push((idx, kind.to_string(), path));
533 }
534 entries.sort_by_key(|(idx, _, _)| *idx);
535 let mut out = Vec::with_capacity(entries.len());
536 for (idx, kind, path) in entries {
537 let bytes = fs::read(&path)?;
538 out.push((idx, kind, bytes));
539 }
540 Ok(out)
541}
542
543pub fn verify_integrity(j: &Journal) -> Result<u64, JournalError> {
548 let mut prior_digest = String::new();
549 let mut count = 0u64;
550 let head = read_head(j)?;
551 for (idx, kind, bytes) in iter_records(j)? {
552 match kind.as_str() {
553 "approval-use" => {
554 let rec: ApprovalUse = serde_json::from_slice(&bytes)?;
555 if rec.previous_record_digest != prior_digest {
556 return Err(JournalError::BrokenChain {
557 index: idx,
558 expected: prior_digest,
559 actual: rec.previous_record_digest,
560 });
561 }
562 let recomputed = approval_use_record_digest(&rec);
563 if recomputed != rec.record_digest {
564 return Err(JournalError::RecordTampered {
565 index: idx,
566 expected: rec.record_digest,
567 actual: recomputed,
568 });
569 }
570 prior_digest = rec.record_digest;
571 }
572 "approval-revocation" => {
573 let rec: ApprovalRevocation = serde_json::from_slice(&bytes)?;
574 if rec.previous_record_digest != prior_digest {
575 return Err(JournalError::BrokenChain {
576 index: idx,
577 expected: prior_digest,
578 actual: rec.previous_record_digest,
579 });
580 }
581 let recomputed = approval_revocation_record_digest(&rec);
582 if recomputed != rec.record_digest {
583 return Err(JournalError::RecordTampered {
584 index: idx,
585 expected: rec.record_digest,
586 actual: recomputed,
587 });
588 }
589 prior_digest = rec.record_digest;
590 }
591 "journal-checkpoint" => {
592 let rec: JournalCheckpoint = serde_json::from_slice(&bytes)?;
593 if rec.previous_record_digest != prior_digest {
594 return Err(JournalError::BrokenChain {
595 index: idx,
596 expected: prior_digest,
597 actual: rec.previous_record_digest,
598 });
599 }
600 let recomputed = journal_checkpoint_record_digest(&rec);
601 if recomputed != rec.record_digest {
602 return Err(JournalError::RecordTampered {
603 index: idx,
604 expected: rec.record_digest,
605 actual: recomputed,
606 });
607 }
608 prior_digest = rec.record_digest;
609 }
610 _ => {
611 continue;
615 }
616 }
617 count += 1;
618 }
619 if head.index != 0 && head.digest != prior_digest {
622 return Err(JournalError::MissingRecord { index: head.index });
623 }
624 Ok(count)
625}
626
627pub fn check_replay(
646 j: &Journal,
647 grant_id: &str,
648 nonce_digest: &str,
649 max_uses_hint: Option<u32>,
650) -> Result<ReplayCheck, JournalError> {
651 if !j.exists() {
652 return Ok(ReplayCheck::not_performed());
653 }
654 let index_path = j.by_nonce_path(nonce_digest);
658 let mut current = 0u32;
659 let mut last_max: Option<u32> = None;
660 if index_path.exists() {
661 let raw = fs::read_to_string(&index_path)?;
662 for line in raw.lines() {
663 let idx: u64 = match line.trim().parse() { Ok(n) => n, Err(_) => continue };
664 if let Some(rec) = load_use_record(j, idx)? {
665 if rec.grant_id == grant_id {
669 current = current.saturating_add(1);
670 last_max = rec.max_uses.or(last_max);
671 }
672 }
673 }
674 }
675 let max_uses = max_uses_hint.or(last_max);
676 let passed = match max_uses {
677 Some(m) => current < m,
678 None => true, };
680 let details = match max_uses {
681 Some(m) => format!("local Approval Use Journal: use {current}/{m}"),
682 None => format!("local Approval Use Journal: {current} prior use(s); grant has no max_uses"),
683 };
684 Ok(ReplayCheck {
685 level: ReplayCheckLevel::LocalJournal,
686 use_number: Some(current.saturating_add(1)),
687 max_uses,
688 passed: Some(passed),
689 details: Some(details),
690 })
691}
692
693fn load_use_record(j: &Journal, index: u64) -> Result<Option<ApprovalUse>, JournalError> {
694 let dir = j.records_dir();
695 if !dir.is_dir() {
696 return Ok(None);
697 }
698 let prefix = format!("{:010}.approval-use.", index);
699 for entry in fs::read_dir(&dir)? {
700 let entry = entry?;
701 let name = entry.file_name().to_string_lossy().into_owned();
702 if name.starts_with(&prefix) {
703 let bytes = fs::read(entry.path())?;
704 let rec: ApprovalUse = serde_json::from_slice(&bytes)?;
705 return Ok(Some(rec));
706 }
707 }
708 Ok(None)
709}
710
711pub fn find_use_for_action(
729 j: &Journal,
730 grant_id: &str,
731 nonce_digest: &str,
732 max_uses_hint: Option<u32>,
733) -> Result<Option<(ApprovalUse, ReplayCheck)>, JournalError> {
734 if !j.exists() {
735 return Ok(None);
736 }
737 let index_path = j.by_nonce_path(nonce_digest);
738 if !index_path.exists() {
739 return Ok(None);
740 }
741 let raw = fs::read_to_string(&index_path)?;
742 let mut latest: Option<ApprovalUse> = None;
749 for line in raw.lines() {
750 let idx: u64 = match line.trim().parse() { Ok(n) => n, Err(_) => continue };
751 if let Some(rec) = load_use_record(j, idx)? {
752 if rec.grant_id == grant_id {
753 latest = Some(rec);
754 }
755 }
756 }
757 let Some(rec) = latest else { return Ok(None) };
758
759 let stored_max = rec.max_uses;
760 let max_uses = max_uses_hint.or(stored_max);
761 let passed = match max_uses {
762 Some(m) => rec.use_number <= m,
763 None => true,
764 };
765 let details = match max_uses {
766 Some(m) => format!("local Approval Use Journal passed, use {}/{}", rec.use_number, m),
767 None => format!("local Approval Use Journal: use {} of unbounded grant", rec.use_number),
768 };
769 Ok(Some((
770 rec.clone(),
771 ReplayCheck {
772 level: ReplayCheckLevel::LocalJournal,
773 use_number: Some(rec.use_number),
774 max_uses,
775 passed: Some(passed),
776 details: Some(details),
777 },
778 )))
779}
780
781pub fn list_uses_for_grant(j: &Journal, grant_id: &str) -> Result<Vec<ApprovalUse>, JournalError> {
784 if !j.exists() {
785 return Ok(Vec::new());
786 }
787 let index_path = j.by_grant_path(grant_id);
788 if !index_path.exists() {
789 return Ok(Vec::new());
790 }
791 let raw = fs::read_to_string(&index_path)?;
792 let mut out = Vec::new();
793 for line in raw.lines() {
794 let idx: u64 = match line.trim().parse() { Ok(n) => n, Err(_) => continue };
795 if let Some(rec) = load_use_record(j, idx)? {
796 out.push(rec);
797 }
798 }
799 Ok(out)
800}
801
802#[cfg(test)]
807mod tests {
808 use super::*;
809 use tempfile::tempdir;
810
811 fn sample_use(use_id: &str, grant_id: &str, nonce_digest: &str, n: u32) -> ApprovalUse {
812 ApprovalUse {
813 type_: TYPE_APPROVAL_USE.into(),
814 use_id: use_id.into(),
815 grant_id: grant_id.into(),
816 grant_digest: "sha256:00".into(),
817 nonce_digest: nonce_digest.into(),
818 actor: "agent://deployer".into(),
819 action: "deploy.production".into(),
820 subject: "env://production".into(),
821 session_id: None,
822 action_artifact_id: None,
823 receipt_digest: None,
824 use_number: n,
825 max_uses: Some(2),
826 idempotency_key: None,
827 created_at: "2026-04-30T07:00:00Z".into(),
828 expires_at: None,
829 previous_record_digest: String::new(), record_digest: String::new(), signature: None,
832 signature_alg: None,
833 signing_key_id: None,
834 }
835 }
836
837 #[test]
838 fn first_append_creates_layout_and_head() {
839 let dir = tempdir().unwrap();
840 let j = Journal::new(dir.path());
841 let head = append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
842 assert_eq!(head.index, 1);
843 assert!(j.records_dir().is_dir());
844 assert!(j.heads_dir().is_dir());
845 assert!(j.current_head_path().is_file());
846 assert!(j.meta_path().is_file());
847 assert!(j.by_grant_path("g1").is_file());
849 assert!(j.by_nonce_path("sha256:nn1").is_file());
850 }
851
852 #[test]
853 fn second_append_links_previous_record_digest() {
854 let dir = tempdir().unwrap();
855 let j = Journal::new(dir.path());
856 let h1 = append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
857 let h2 = append_use(&j, sample_use("use_2", "g1", "sha256:nn2", 2)).unwrap();
858 assert_eq!(h2.index, 2);
859 let recs = iter_records(&j).unwrap();
861 assert_eq!(recs.len(), 2);
862 let (_, _, bytes) = &recs[1];
863 let r2: ApprovalUse = serde_json::from_slice(bytes).unwrap();
864 assert_eq!(r2.previous_record_digest, h1.digest);
865 }
866
867 #[test]
868 fn verify_integrity_passes_on_intact_chain() {
869 let dir = tempdir().unwrap();
870 let j = Journal::new(dir.path());
871 for i in 1..=5 {
872 let nd = format!("sha256:nn{i}");
873 append_use(&j, sample_use(&format!("use_{i}"), "g1", &nd, i)).unwrap();
874 }
875 assert_eq!(verify_integrity(&j).unwrap(), 5);
876 }
877
878 #[test]
879 fn editing_a_record_breaks_integrity() {
880 let dir = tempdir().unwrap();
881 let j = Journal::new(dir.path());
882 append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
883 let entries: Vec<_> = fs::read_dir(j.records_dir()).unwrap().collect();
885 let entry = entries.into_iter().next().unwrap().unwrap();
886 let mut json: serde_json::Value =
887 serde_json::from_slice(&fs::read(entry.path()).unwrap()).unwrap();
888 json["actor"] = "agent://attacker".into();
889 fs::write(entry.path(), serde_json::to_vec_pretty(&json).unwrap()).unwrap();
890
891 let err = verify_integrity(&j).unwrap_err();
892 assert!(
893 matches!(err, JournalError::RecordTampered { .. }),
894 "expected RecordTampered, got {err:?}"
895 );
896 }
897
898 #[test]
899 fn deleting_a_record_breaks_integrity_or_head_continuity() {
900 let dir = tempdir().unwrap();
901 let j = Journal::new(dir.path());
902 append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
903 append_use(&j, sample_use("use_2", "g1", "sha256:nn2", 2)).unwrap();
904 let entries: Vec<_> = fs::read_dir(j.records_dir())
906 .unwrap()
907 .map(|e| e.unwrap().path())
908 .collect();
909 let trailing = entries.iter().max().unwrap();
910 fs::remove_file(trailing).unwrap();
911
912 let err = verify_integrity(&j).unwrap_err();
913 assert!(
914 matches!(err, JournalError::MissingRecord { .. }),
915 "expected MissingRecord, got {err:?}"
916 );
917 }
918
919 #[test]
920 fn indexes_can_be_rebuilt_from_records() {
921 let dir = tempdir().unwrap();
922 let j = Journal::new(dir.path());
923 for i in 1..=3 {
924 let nd = format!("sha256:nn{i}");
925 append_use(&j, sample_use(&format!("use_{i}"), "g1", &nd, i)).unwrap();
926 }
927 fs::remove_dir_all(j.indexes_dir()).unwrap();
929
930 let rebuilt = rebuild_indexes(&j).unwrap();
931 assert_eq!(rebuilt, 3);
932 assert!(j.by_grant_path("g1").is_file());
933 assert!(j.by_nonce_path("sha256:nn1").is_file());
934 }
935
936 #[test]
937 fn check_replay_reports_use_count_and_max() {
938 let dir = tempdir().unwrap();
939 let j = Journal::new(dir.path());
940 append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
942 append_use(&j, sample_use("use_2", "g1", "sha256:nn1", 2)).unwrap();
943
944 let r = check_replay(&j, "g1", "sha256:nn1", Some(2)).unwrap();
946 assert_eq!(r.level, ReplayCheckLevel::LocalJournal);
947 assert_eq!(r.use_number, Some(3));
948 assert_eq!(r.max_uses, Some(2));
949 assert_eq!(r.passed, Some(false));
950 }
951
952 #[test]
953 fn check_replay_passes_when_under_max() {
954 let dir = tempdir().unwrap();
955 let j = Journal::new(dir.path());
956 append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
957 let r = check_replay(&j, "g1", "sha256:nn1", Some(2)).unwrap();
958 assert_eq!(r.use_number, Some(2));
959 assert_eq!(r.passed, Some(true));
960 }
961
962 #[test]
963 fn check_replay_no_journal_returns_not_performed() {
964 let dir = tempdir().unwrap();
965 let absent = dir.path().join("nope");
966 let j = Journal::new(&absent);
967 let r = check_replay(&j, "g1", "sha256:nn1", Some(1)).unwrap();
968 assert_eq!(r.level, ReplayCheckLevel::NotPerformed);
969 assert!(r.use_number.is_none());
970 }
971
972 #[test]
973 fn check_replay_unbounded_grant_passes_with_count() {
974 let dir = tempdir().unwrap();
975 let j = Journal::new(dir.path());
976 append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
977 let mut u = sample_use("use_2", "g2", "sha256:other", 1);
981 u.max_uses = None;
982 append_use(&j, u).unwrap();
983
984 let r = check_replay(&j, "g2", "sha256:other", None).unwrap();
985 assert!(r.passed.unwrap());
986 assert!(r.max_uses.is_none());
987 }
988
989 #[test]
990 fn list_uses_for_grant_returns_records_in_order() {
991 let dir = tempdir().unwrap();
992 let j = Journal::new(dir.path());
993 append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
994 append_use(&j, sample_use("use_2", "g2", "sha256:nn2", 1)).unwrap();
995 append_use(&j, sample_use("use_3", "g1", "sha256:nn3", 2)).unwrap();
996 let g1 = list_uses_for_grant(&j, "g1").unwrap();
997 assert_eq!(g1.len(), 2);
998 assert_eq!(g1[0].use_id, "use_1");
999 assert_eq!(g1[1].use_id, "use_3");
1000 }
1001
1002 #[test]
1003 fn lock_keeps_two_appends_serial() {
1004 let dir = tempdir().unwrap();
1007 let j = Journal::new(dir.path());
1008 fs::create_dir_all(j.locks_dir()).unwrap();
1009 let held = OpenOptions::new()
1010 .read(true).write(true).create(true).truncate(false)
1011 .open(j.lock_path()).unwrap();
1012 held.try_lock_exclusive().unwrap();
1013
1014 let err = append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap_err();
1015 assert!(matches!(err, JournalError::LockBusy));
1016
1017 let _ = fs2::FileExt::unlock(&held);
1018 }
1019
1020 #[test]
1021 fn revocation_appends_into_chain() {
1022 let dir = tempdir().unwrap();
1023 let j = Journal::new(dir.path());
1024 append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
1025 let rev = ApprovalRevocation {
1026 type_: TYPE_APPROVAL_REVOCATION.into(),
1027 revocation_id: "rev_1".into(),
1028 grant_id: "g1".into(),
1029 grant_digest: "sha256:00".into(),
1030 revoker: "human://alice".into(),
1031 reason: Some("rotated key".into()),
1032 created_at: "2026-04-30T07:01:00Z".into(),
1033 previous_record_digest: String::new(),
1034 record_digest: String::new(),
1035 signature: None,
1036 signature_alg: None,
1037 signing_key_id: None,
1038 };
1039 let h = append_revocation(&j, rev).unwrap();
1040 assert_eq!(h.index, 2);
1041 assert_eq!(verify_integrity(&j).unwrap(), 2);
1042 }
1043
1044 #[test]
1045 fn record_files_contain_no_raw_nonce_or_signature_secrets() {
1046 let dir = tempdir().unwrap();
1051 let j = Journal::new(dir.path());
1052 append_use(&j, sample_use("use_1", "g1", "sha256:nn1", 1)).unwrap();
1053 let entries: Vec<_> = fs::read_dir(j.records_dir())
1054 .unwrap()
1055 .map(|e| e.unwrap().path())
1056 .collect();
1057 let bytes = fs::read(&entries[0]).unwrap();
1058 let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1059 let obj = json.as_object().unwrap();
1060 for forbidden in ["nonce", "command", "prompt", "file_content", "bearer_token", "api_key"] {
1061 assert!(
1062 !obj.contains_key(forbidden),
1063 "journal record must not contain `{forbidden}`",
1064 );
1065 }
1066 assert!(obj.contains_key("nonce_digest"));
1068 }
1069
1070 #[test]
1073 fn reserve_use_first_call_succeeds_and_stamps_use_number() {
1074 let dir = tempdir().unwrap();
1077 let j = Journal::new(dir.path());
1078 let mut rec = sample_use("use_1", "g1", "sha256:nn1", 0);
1079 rec.use_number = 0;
1080 let head = reserve_use(&j, rec, Some(1)).unwrap();
1081 assert_eq!(head.index, 1);
1082 let stored = list_uses_for_grant(&j, "g1").unwrap();
1083 assert_eq!(stored.len(), 1);
1084 assert_eq!(stored[0].use_number, 1, "reserve_use must stamp use_number=1 for the first use");
1085 }
1086
1087 #[test]
1088 fn reserve_use_max_uses_1_serial_second_call_rejects() {
1089 let dir = tempdir().unwrap();
1092 let j = Journal::new(dir.path());
1093 reserve_use(&j, sample_use("use_1", "g1", "sha256:nn_a", 0), Some(1)).unwrap();
1094
1095 let err = reserve_use(&j, sample_use("use_2", "g1", "sha256:nn_a", 0), Some(1))
1096 .expect_err("second consume of max_uses=1 grant must fail");
1097 match err {
1098 JournalError::MaxUsesExceeded { grant_id, max_uses, current } => {
1099 assert_eq!(grant_id, "g1");
1100 assert_eq!(max_uses, 1);
1101 assert_eq!(current, 1);
1102 }
1103 other => panic!("expected MaxUsesExceeded, got {other:?}"),
1104 }
1105 let stored = list_uses_for_grant(&j, "g1").unwrap();
1107 assert_eq!(stored.len(), 1, "rejected reserve must not append");
1108 }
1109
1110 #[test]
1111 fn reserve_use_max_uses_2_two_uses_pass_third_rejects() {
1112 let dir = tempdir().unwrap();
1114 let j = Journal::new(dir.path());
1115 let mut a = sample_use("use_1", "g1", "sha256:nn_a", 0); a.max_uses = Some(2);
1116 let mut b = sample_use("use_2", "g1", "sha256:nn_b", 0); b.max_uses = Some(2);
1117 reserve_use(&j, a, Some(2)).unwrap();
1118 reserve_use(&j, b, Some(2)).unwrap();
1119 let mut c = sample_use("use_3", "g1", "sha256:nn_c", 0); c.max_uses = Some(2);
1122 reserve_use(&j, c, Some(2)).unwrap();
1123 let mut a2 = sample_use("use_1b", "g1", "sha256:nn_a", 0); a2.max_uses = Some(2);
1127 reserve_use(&j, a2, Some(2)).unwrap();
1128 let mut a3 = sample_use("use_1c", "g1", "sha256:nn_a", 0); a3.max_uses = Some(2);
1130 let err = reserve_use(&j, a3, Some(2)).expect_err("third use of same nonce must fail");
1131 assert!(matches!(err, JournalError::MaxUsesExceeded { .. }));
1132 }
1133
1134 #[test]
1147 fn reserve_use_retry_after_lock_busy_does_not_bypass_max_uses() {
1148 let dir = tempdir().unwrap();
1149 let j = Journal::new(dir.path());
1150 reserve_use(&j, sample_use("use_1", "g1", "sha256:nn_retry", 0), Some(1)).unwrap();
1152 for i in 0..5 {
1155 let err = reserve_use(
1156 &j,
1157 sample_use(&format!("use_retry_{i}"), "g1", "sha256:nn_retry", 0),
1158 Some(1),
1159 ).expect_err("retry must fail");
1160 assert!(matches!(err, JournalError::MaxUsesExceeded { .. }));
1161 }
1162 let stored = list_uses_for_grant(&j, "g1").unwrap();
1163 assert_eq!(stored.len(), 1, "exactly one record on disk despite 5 retries");
1164 }
1165
1166 #[test]
1167 fn reserve_use_concurrent_max_uses_1_only_one_succeeds() {
1168 use std::sync::Arc;
1182 use std::sync::atomic::{AtomicUsize, Ordering};
1183 use std::thread;
1184
1185 let dir = tempdir().unwrap();
1186 let dir_path = Arc::new(dir.path().to_path_buf());
1187 let success = Arc::new(AtomicUsize::new(0));
1188 let lock_busy = Arc::new(AtomicUsize::new(0));
1189 let max_exceeded = Arc::new(AtomicUsize::new(0));
1190
1191 let mut handles = Vec::new();
1192 for i in 0..8 {
1193 let dir_path = Arc::clone(&dir_path);
1194 let success = Arc::clone(&success);
1195 let lock_busy = Arc::clone(&lock_busy);
1196 let max_exceeded = Arc::clone(&max_exceeded);
1197 handles.push(thread::spawn(move || {
1198 let j = Journal::new(dir_path.as_path());
1199 let rec = sample_use(
1200 &format!("use_{i}"),
1201 "g1",
1202 "sha256:race_nonce",
1203 0,
1204 );
1205 match reserve_use(&j, rec, Some(1)) {
1206 Ok(_) => { success.fetch_add(1, Ordering::SeqCst); }
1207 Err(JournalError::LockBusy) => { lock_busy.fetch_add(1, Ordering::SeqCst); }
1208 Err(JournalError::MaxUsesExceeded { .. }) => { max_exceeded.fetch_add(1, Ordering::SeqCst); }
1209 Err(other) => panic!("unexpected error: {other:?}"),
1210 }
1211 }));
1212 }
1213 for h in handles { h.join().unwrap(); }
1214
1215 let s = success.load(Ordering::SeqCst);
1216 let lb = lock_busy.load(Ordering::SeqCst);
1217 let me = max_exceeded.load(Ordering::SeqCst);
1218 assert_eq!(s, 1, "exactly one of 8 concurrent reserves must succeed; got {s} (lock_busy={lb}, max_exceeded={me})");
1219 assert_eq!(s + lb + me, 8, "every thread accounted for");
1220
1221 let stored = list_uses_for_grant(&Journal::new(dir.path()), "g1").unwrap();
1223 let same_nonce = stored.iter().filter(|u| u.nonce_digest == "sha256:race_nonce").count();
1224 assert_eq!(same_nonce, 1, "exactly one record on disk for the contested nonce");
1225 }
1226}