doiget_core/provenance.rs
1//! JSON Lines + SHA-256 hash-chained provenance log.
2//!
3//! Binding spec: `docs/PROVENANCE_LOG.md` (NORMATIVE, §3 row schema, §4 hash
4//! chain). Failure semantics: **fail-closed** — callers MUST abort the fetch
5//! if a log write returns `Err`. See `docs/SECURITY.md` §1.8 and ADR-0006.
6//!
7//! # On-disk format
8//!
9//! - JSON Lines (`.jsonl`): one JSON object per line, terminated by `\n` (LF).
10//! - UTF-8. Timestamps are RFC3339 in UTC.
11//! - Each row is appended via a single `write_all` whose payload always ends
12//! in `\n`, so a partially-written row is detectable as a missing trailing
13//! newline rather than a torn JSON record.
14//! - In audit-grade mode (the only mode shipped here), the writer flushes the
15//! `BufWriter` and `fsync`s the file after every row.
16//!
17//! # Hash chain (PROVENANCE_LOG.md §4)
18//!
19//! Each row carries a `prev_hash` and a `this_hash`. The first row's
20//! `prev_hash` is the literal string `"GENESIS"`. Every subsequent row's
21//! `prev_hash` MUST equal the previous row's `this_hash`.
22//!
23//! When a log file rotates (§6 — not yet implemented in this crate; see TODO
24//! below), the first row of the NEW log file also uses `prev_hash =
25//! "GENESIS"`, restarting the chain.
26//!
27//! `this_hash` is computed as:
28//!
29//! ```text
30//! this_hash = lower_hex(SHA-256(canonical_json(row \ {this_hash})))
31//! ```
32//!
33//! where `canonical_json` is **compact JSON (no whitespace) with object keys
34//! sorted lexicographically** (PROVENANCE_LOG.md §4). For a row with fields
35//! `{ts: "...", ts_seq: 1, event: "fetch", ...}`, the canonical bytes begin
36//! with `{"capability":...` because `capability` is the lex-first top-level
37//! key. Downstream `doiget audit-log --verify` (Phase 1+) relies on this
38//! exact rule — do not change the canonicalization without bumping the spec.
39//!
40//! # In-process serialization
41//!
42//! `ProvenanceLog` holds a `Mutex<LogState>`. All `append` calls within the
43//! same process serialize on this mutex, satisfying the "process-local mutex
44//! on log appender" requirement of `docs/SECURITY.md` §1.8. Cross-process
45//! coordination (multiple `doiget` invocations) is out of scope here and
46//! handled by the higher-level `flock`-based store layer.
47//!
48//! # Session id
49//!
50//! `session_id` (PROVENANCE_LOG.md §3) is a 26-char ULID generated **once per
51//! process invocation** by the caller and stamped into every row written
52//! through the resulting [`ProvenanceLog`]. This crate does not generate the
53//! ULID itself — see [`ProvenanceLog::open`] for the contract.
54//!
55//! # Log rotation and retention (§6)
56//!
57//! Implemented (PROVENANCE_LOG.md §6): when `access.log` exceeds
58//! `ROTATE_BYTES` (100 MiB) a subsequent [`ProvenanceLog::append`]
59//! gzip-compresses the full file to `access.log.<YYYY-MM-DD-HHMMSS>.gz`,
60//! removes the old `access.log`, and writes the incoming row as the
61//! first row of a fresh file with `prev_hash = "GENESIS"` (the hash
62//! chain **restarts** per segment — segments are NOT linked). Rotation
63//! is fail-closed: any gzip / rename / unlink failure aborts the
64//! `append` (the caller's fetch aborts) so the chain never silently
65//! skips. At [`ProvenanceLog::open`], rotated `.gz` segments older than
66//! the retention window (`DOIGET_LOG_RETENTION_DAYS`, default 90; `0`
67//! disables) are deleted **best-effort** (a prune failure is logged,
68//! not fatal — pruning is housekeeping, not integrity).
69//! [`verify_all`] verifies the current file plus every rotated `.gz`
70//! segment (each its own GENESIS-rooted chain).
71
72use std::collections::BTreeMap;
73use std::fs::{File, OpenOptions};
74use std::io::{BufRead, BufReader, BufWriter, Write};
75use std::sync::Mutex;
76
77use flate2::read::GzDecoder;
78use flate2::write::GzEncoder;
79use flate2::Compression;
80
81use camino::{Utf8Path, Utf8PathBuf};
82use chrono::{DateTime, Utc};
83use serde::{Deserialize, Serialize};
84use sha2::{Digest, Sha256};
85
86/// One row of the provenance log (PROVENANCE_LOG.md §3).
87///
88/// The on-disk wire field names match the spec table; struct-field order is
89/// **not** load-bearing for the hash because canonicalization sorts keys
90/// lexicographically (see PROVENANCE_LOG.md §4).
91///
92/// **Schema version**: this struct is the **v2** row shape (ADR-0024).
93/// Every v2 row carries `schema_version = "v2"` literally; the
94/// `canonical_digest` field carries the ADR-0021 §1 audit identity of
95/// the fetch on rows where one applies (`Fetch` / `Resolve` /
96/// `StoreWrite`) and is `None` on session bookend rows
97/// (`SessionStart` / `SessionEnd` / `CapabilityResolved`) that have no
98/// ref. v1 rows (pre-Slice-4) lack both fields and MUST be migrated via
99/// [`migrate_v1_to_v2`] before the v2 binary can read them — the
100/// `deny_unknown_fields` + non-defaulted `schema_version` shape ensures
101/// v1 rows fail to parse loudly rather than producing silent hash-chain
102/// mismatches.
103#[derive(Debug, Clone, Serialize, Deserialize)]
104#[serde(deny_unknown_fields)]
105pub struct LogRow {
106 /// RFC3339 UTC timestamp of the append (millisecond precision).
107 pub ts: DateTime<Utc>,
108 /// Per-session monotonic sequence number, starting at 1.
109 pub ts_seq: u64,
110 /// Event class (see [`LogEvent`]).
111 pub event: LogEvent,
112 /// Optional reference (DOI / arXiv id). Wire field name is `ref`.
113 #[serde(rename = "ref")]
114 pub ref_: Option<String>,
115 /// Optional source name (e.g. `unpaywall`).
116 pub source: Option<String>,
117 /// Result (see [`LogResult`]).
118 pub result: LogResult,
119 /// OA license string (`event=fetch`, `result=ok`); `None` otherwise.
120 pub license: Option<String>,
121 /// Bytes written / fetched, on success rows.
122 pub size_bytes: Option<u64>,
123 /// Path to the stored payload, relative to the store root
124 /// (`event=fetch`, `result=ok`); `None` otherwise.
125 pub store_path: Option<String>,
126 /// Capability under which the row was written (REQUIRED, every row).
127 pub capability: Capability,
128 /// 26-char ULID identifying the process invocation (REQUIRED).
129 pub session_id: String,
130 /// Stable error code on failure rows.
131 pub error_code: Option<String>,
132 /// Row schema version. Always [`LOG_SCHEMA_VERSION`] (`"v2"`) for
133 /// new rows written by this build (ADR-0024). v1 rows lack this
134 /// field; they MUST be migrated via [`migrate_v1_to_v2`] first.
135 pub schema_version: String,
136 /// Canonical-digest of the fetch's audit identity (ADR-0021 §1) as
137 /// 64 lowercase hex chars. Present on rows with a `ref` (`Fetch`,
138 /// `Resolve`, `StoreWrite`); `None` on session bookend rows. The
139 /// digest is computed from a [`crate::CanonicalRef`] whose
140 /// `resolver_profile` matches this row's `source` field for
141 /// migrated v1 rows; new v2 rows MAY pass an explicit
142 /// `resolver_profile` distinct from `source`.
143 pub canonical_digest: Option<String>,
144 /// 64 lowercase hex chars, OR the literal string `"GENESIS"` for the
145 /// first row of a fresh log file.
146 pub prev_hash: String,
147 /// 64 lowercase hex chars. SHA-256 of canonical JSON of THIS row with
148 /// the `this_hash` field removed. See module docs.
149 pub this_hash: String,
150}
151
152/// Provenance-log row schema version this build writes
153/// (`docs/PROVENANCE_LOG.md` §3, ADR-0024).
154///
155/// Bumped from `"v1"` (implicit; pre-Slice-4 rows had no
156/// `schema_version` field) to `"v2"` when the `canonical_digest` column
157/// landed. The v1→v2 migration is one-shot, idempotent, and dry-runnable
158/// via [`migrate_v1_to_v2`].
159pub const LOG_SCHEMA_VERSION: &str = "v2";
160
161/// Event class for a log row (PROVENANCE_LOG.md §3).
162///
163/// Note: result-status (`ok`/`err`/`denied`) lives in [`LogResult`], NOT in
164/// the event variant. So `Fetch` covers both successful and failed fetch
165/// attempts; the row's `result` distinguishes them.
166///
167/// `non_exhaustive` so adding new variants is non-breaking.
168#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
169#[serde(rename_all = "snake_case")]
170#[non_exhaustive]
171pub enum LogEvent {
172 /// Process started; first row of a new session.
173 SessionStart,
174 /// Capability resolution finished (allowed / denied / which env var).
175 CapabilityResolved,
176 /// Reference resolved to a fetch URL.
177 Resolve,
178 /// Fetch attempt (success or failure determined by `result`).
179 Fetch,
180 /// Store write attempt (success or failure determined by `result`).
181 StoreWrite,
182 /// Process ended cleanly.
183 SessionEnd,
184}
185
186/// Per-row outcome (PROVENANCE_LOG.md §3). `non_exhaustive` for forward
187/// compatibility.
188#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
189#[serde(rename_all = "snake_case")]
190#[non_exhaustive]
191pub enum LogResult {
192 /// The operation succeeded.
193 Ok,
194 /// The operation failed with an error.
195 Err,
196 /// The operation was denied (e.g. capability gate).
197 Denied,
198}
199
200/// Capability under which a row was written (PROVENANCE_LOG.md §3).
201///
202/// `kebab-case` serde rename emits `oa`, `metadata`, `tdm-elsevier`,
203/// `tdm-aps`, `tdm-springer` exactly as the spec requires. `non_exhaustive`
204/// for forward compatibility.
205#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
206#[serde(rename_all = "kebab-case")]
207#[non_exhaustive]
208pub enum Capability {
209 /// Open access tier.
210 Oa,
211 /// Metadata-only access.
212 Metadata,
213 /// Elsevier TDM (Tier 3, opt-in build).
214 TdmElsevier,
215 /// APS TDM (Tier 3, opt-in build).
216 TdmAps,
217 /// Springer TDM (Tier 3, opt-in build).
218 TdmSpringer,
219}
220
221/// Errors emitted by the provenance log writer. Callers MUST treat any
222/// variant as a fail-closed signal and abort the surrounding fetch.
223#[derive(Debug, thiserror::Error)]
224#[non_exhaustive]
225pub enum LogError {
226 /// I/O error opening, reading, writing, or syncing the log file. Includes
227 /// recovery-time corruption detection where the synthetic message is
228 /// `"corrupted log at line N: …"`.
229 #[error("provenance log io error: {0}")]
230 Io(#[from] std::io::Error),
231 /// Serialization of a row to canonical JSON failed.
232 #[error("provenance log serialization error: {0}")]
233 Serialize(#[from] serde_json::Error),
234 /// Path supplied to [`ProvenanceLog::open`] exists but is not a regular
235 /// file (e.g. a directory or symlink).
236 #[error("provenance log path is not a regular file: {0}")]
237 NotARegularFile(Utf8PathBuf),
238}
239
240/// Append-only writer with in-process serialization.
241#[derive(Debug)]
242pub struct ProvenanceLog {
243 path: Utf8PathBuf,
244 state: Mutex<LogState>,
245 session_id: String,
246 /// §6 rotation threshold, resolved ONCE at [`ProvenanceLog::open`]
247 /// (not per-`append`). Reading `DOIGET_LOG_ROTATE_BYTES` once at
248 /// open — rather than on every append — means a log opened without
249 /// the env set keeps the real 100 MiB threshold for its whole life
250 /// even if another (test) thread later mutates that process-global
251 /// env var; this removes a parallel-test race without serializing
252 /// every multi-append test. `0` = rotation disabled.
253 rotate_threshold: u64,
254}
255
256/// Mutable internal state, guarded by [`ProvenanceLog::state`].
257#[derive(Debug)]
258struct LogState {
259 /// `ts_seq` of the **next** row to be appended.
260 next_seq: u64,
261 /// 64 lowercase hex chars; [`GENESIS_HASH`] if the log is empty.
262 last_hash: String,
263}
264
265/// The genesis sentinel used as `prev_hash` for the first row of a log file
266/// (PROVENANCE_LOG.md §3, §6). Also written verbatim as the prev-hash of the
267/// first row after a log rotation (the chain restarts per segment).
268const GENESIS_HASH: &str = "GENESIS";
269
270/// Rotate `access.log` once it reaches this size (PROVENANCE_LOG.md §6:
271/// "100 MB"). 100 MiB. Overridable via the `DOIGET_LOG_ROTATE_BYTES`
272/// env var — an internal ops/testing knob (NOT a documented public
273/// surface): tests set it tiny to exercise rotation without writing
274/// 100 MiB; a value of `0` disables rotation.
275const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
276
277/// Default rotated-segment retention (PROVENANCE_LOG.md §6: "90 days").
278/// Overridable via `DOIGET_LOG_RETENTION_DAYS`; `0` disables pruning.
279const DEFAULT_RETENTION_DAYS: i64 = 90;
280
281/// Resolve the rotation threshold: `DOIGET_LOG_ROTATE_BYTES` if set and
282/// parseable, else [`ROTATE_BYTES`]. `0` (or unparsable) → returns the
283/// value as-is (`0` means "never rotate").
284fn rotate_threshold_bytes() -> u64 {
285 match std::env::var("DOIGET_LOG_ROTATE_BYTES") {
286 Ok(s) => s.trim().parse::<u64>().unwrap_or(ROTATE_BYTES),
287 Err(_) => ROTATE_BYTES,
288 }
289}
290
291/// Resolve retention days from `DOIGET_LOG_RETENTION_DAYS`
292/// (default [`DEFAULT_RETENTION_DAYS`]). `0` disables pruning. A
293/// negative / unparsable value falls back to the default with a warn.
294fn retention_days() -> i64 {
295 match std::env::var("DOIGET_LOG_RETENTION_DAYS") {
296 Ok(s) => match s.trim().parse::<i64>() {
297 Ok(n) if n >= 0 => n,
298 _ => {
299 tracing::warn!(
300 value = %s,
301 "DOIGET_LOG_RETENTION_DAYS is not a non-negative integer; \
302 using the {DEFAULT_RETENTION_DAYS}-day default"
303 );
304 DEFAULT_RETENTION_DAYS
305 }
306 },
307 Err(_) => DEFAULT_RETENTION_DAYS,
308 }
309}
310
311/// gzip-compress `path` to `<file_name>.<YYYY-MM-DD-HHMMSS>.gz` (in the
312/// same directory) and unlink `path` (PROVENANCE_LOG.md §6).
313///
314/// Atomic & fail-closed: the gzip is written to a `.tmp`, fsynced, then
315/// `rename`d into place (so a partial `.gz` is never observable), and
316/// only then is the original removed. Every step propagates its error
317/// to the caller (`ProvenanceLog::append`), which is fail-closed — a
318/// rotation failure aborts the surrounding fetch. Crash safety: a crash
319/// after the rename but before the unlink leaves both the full `.gz`
320/// and the (over-size) `access.log`; the next `append` simply rotates
321/// again, producing a second independently-valid segment — wasteful but
322/// never lossy or corrupt.
323fn rotate_log(path: &Utf8Path) -> Result<(), LogError> {
324 let file_name = path.file_name().ok_or_else(|| {
325 LogError::Io(std::io::Error::other(
326 "provenance log path has no file name; cannot rotate",
327 ))
328 })?;
329 let ts = Utc::now().format("%Y-%m-%d-%H%M%S");
330 let gz_name = format!("{file_name}.{ts}.gz");
331 let dir = path.parent().unwrap_or_else(|| Utf8Path::new("."));
332 let gz_path = dir.join(&gz_name);
333 let tmp_path = dir.join(format!("{gz_name}.tmp"));
334
335 {
336 let mut src = File::open(path)?;
337 let tmp = File::create(&tmp_path)?;
338 let mut enc = GzEncoder::new(BufWriter::new(tmp), Compression::default());
339 std::io::copy(&mut src, &mut enc)?;
340 let bufw = enc.finish()?;
341 let tmp = bufw.into_inner().map_err(|e| {
342 LogError::Io(std::io::Error::other(format!(
343 "gz tmp buf flush failed: {}",
344 e.error()
345 )))
346 })?;
347 tmp.sync_all()?;
348 }
349 std::fs::rename(&tmp_path, &gz_path)?;
350 std::fs::remove_file(path)?;
351 Ok(())
352}
353
354/// Rotated `.gz` segments siblings of `current`, sorted ascending. The
355/// embedded `YYYY-MM-DD-HHMMSS` timestamp makes lexicographic order ==
356/// chronological order.
357fn rotated_segments(current: &Utf8Path) -> Vec<Utf8PathBuf> {
358 let Some(file_name) = current.file_name() else {
359 return Vec::new();
360 };
361 let dir = current.parent().unwrap_or_else(|| Utf8Path::new("."));
362 let prefix = format!("{file_name}.");
363 let mut segs: Vec<Utf8PathBuf> = match std::fs::read_dir(dir.as_std_path()) {
364 Ok(rd) => rd
365 .filter_map(|e| e.ok())
366 .filter_map(|e| Utf8PathBuf::from_path_buf(e.path()).ok())
367 .filter(|p| {
368 p.file_name()
369 .map(|n| n.starts_with(&prefix) && n.ends_with(".gz"))
370 .unwrap_or(false)
371 })
372 .collect(),
373 Err(_) => Vec::new(),
374 };
375 segs.sort();
376 segs
377}
378
379/// Delete rotated `.gz` segments older than `days` (PROVENANCE_LOG.md
380/// §6 retention). `days <= 0` is a no-op (disabled). **Best-effort**:
381/// pruning is housekeeping, not integrity, so any failure is logged and
382/// skipped — `ProvenanceLog::open` still succeeds.
383fn prune_rotated_segments(current: &Utf8Path, days: i64) {
384 if days <= 0 {
385 return;
386 }
387 let Some(cutoff) = std::time::SystemTime::now()
388 .checked_sub(std::time::Duration::from_secs(days as u64 * 86_400))
389 else {
390 return;
391 };
392 for seg in rotated_segments(current) {
393 let aged = std::fs::metadata(seg.as_std_path())
394 .and_then(|m| m.modified())
395 .map(|mt| mt < cutoff)
396 .unwrap_or(false);
397 if !aged {
398 continue;
399 }
400 match std::fs::remove_file(seg.as_std_path()) {
401 Ok(()) => tracing::info!(
402 segment = %seg,
403 "provenance: pruned rotated segment past retention"
404 ),
405 Err(e) => tracing::warn!(
406 segment = %seg, error = %e,
407 "provenance: failed to prune rotated segment (best-effort; continuing)"
408 ),
409 }
410 }
411}
412
413/// Verify the full provenance history: every rotated `.gz` segment
414/// (oldest→newest) followed by the current `access.log`. Each segment
415/// is its own GENESIS-rooted hash chain (segments are deliberately NOT
416/// linked across a rotation, PROVENANCE_LOG.md §6), so they are
417/// verified independently and reported per-segment.
418///
419/// The audited [`verify`] function itself is unchanged; this only
420/// orchestrates it over the segment set (gunzipping each `.gz` to a
421/// tempfile first).
422///
423/// # Errors
424///
425/// [`LogError::Io`] on a gunzip / tempfile failure. A missing current
426/// `access.log` is not an error ([`verify`] reports it empty).
427pub fn verify_all(current: &Utf8Path) -> Result<Vec<(Utf8PathBuf, VerifyReport)>, LogError> {
428 let mut out = Vec::new();
429 for seg in rotated_segments(current) {
430 let gz = File::open(seg.as_std_path())?;
431 let mut dec = GzDecoder::new(gz);
432 let tmp = tempfile::NamedTempFile::new().map_err(|e| {
433 LogError::Io(std::io::Error::other(format!(
434 "verify_all: tempfile for {seg}: {e}"
435 )))
436 })?;
437 {
438 let mut w = File::create(tmp.path())?;
439 std::io::copy(&mut dec, &mut w)?;
440 w.sync_all()?;
441 }
442 let tmp_utf8 = Utf8Path::from_path(tmp.path()).ok_or_else(|| {
443 LogError::Io(std::io::Error::other("verify_all: non-utf8 tempfile path"))
444 })?;
445 let report = verify(tmp_utf8)?;
446 out.push((seg, report));
447 // `tmp` (and the gunzipped file) drop here, after verify.
448 }
449 let report = verify(current)?;
450 out.push((current.to_path_buf(), report));
451 Ok(out)
452}
453
454/// Caller-supplied fields for a row. The writer fills in `ts`, `ts_seq`,
455/// `session_id`, `prev_hash`, `this_hash`, and the literal
456/// `schema_version = "v2"` (`LOG_SCHEMA_VERSION`).
457///
458/// Callers SHOULD populate [`Self::canonical_digest`] on rows that have
459/// a meaningful audit identity (`Fetch` / `Resolve` / `StoreWrite` rows
460/// with a `ref`), leaving it `None` on session bookend rows. The digest
461/// is produced by [`crate::CanonicalRef::digest_hex`] from a
462/// `(source_type, source_id, resolver_profile, version)` tuple — see
463/// ADR-0021 §1 for the algorithm and ADR-0024 for the implementation
464/// surface.
465#[derive(Debug, Clone)]
466pub struct RowInput<'a> {
467 /// Event class.
468 pub event: LogEvent,
469 /// Result.
470 pub result: LogResult,
471 /// Capability under which the row is written (REQUIRED for every row).
472 pub capability: Capability,
473 /// Optional DOI / arXiv id.
474 pub ref_: Option<&'a str>,
475 /// Optional source name.
476 pub source: Option<&'a str>,
477 /// Optional error code on failure rows.
478 pub error_code: Option<&'a str>,
479 /// Optional payload size in bytes.
480 pub size_bytes: Option<u64>,
481 /// Optional OA license string (set on `event=fetch`, `result=ok`).
482 pub license: Option<&'a str>,
483 /// Optional store path relative to the store root (set on `event=fetch`,
484 /// `result=ok`).
485 pub store_path: Option<&'a str>,
486 /// Optional canonical-digest (ADR-0021 §1) as 64 lowercase hex
487 /// chars. `None` for session bookend / capability-resolution rows;
488 /// SHOULD be `Some` for `Fetch` / `Resolve` / `StoreWrite` rows
489 /// whose `source` field names the resolver. Build via
490 /// [`crate::Ref::promote`] + [`crate::CanonicalRef::digest_hex`].
491 pub canonical_digest: Option<&'a str>,
492}
493
494// ---------------------------------------------------------------------------
495// Canonical-JSON helper (PROVENANCE_LOG.md §4)
496//
497// Hashing rule (CRITICAL — this is the spec contract for `audit-log --verify`):
498//
499// this_hash = lower_hex(SHA-256(canonical_json(row \ {this_hash})))
500//
501// Canonical JSON = **compact (no whitespace), keys sorted lexicographically,
502// no trailing whitespace** (§4). Struct field order is deliberately NOT
503// load-bearing here; the canonicalizer sorts the resulting object keys via
504// `BTreeMap<String, Value>`, which serializes in lex-sorted key order.
505//
506// Worked example: for the row fragment `{ts_seq: 1, ts: "..."}` (input order),
507// the canonical bytes after lex sort are `{"ts":"...","ts_seq":1}` because
508// `"ts"` < `"ts_seq"` lexicographically. In v2 (ADR-0024) the lex-first
509// top-level key is `"canonical_digest"` — `"canonical_digest"` < `"capability"`
510// because 'n'(110) < 'p'(112) at byte index 2 (both share the `"ca"`
511// prefix). The pre-v2 lex-first key was `"capability"`.
512// ---------------------------------------------------------------------------
513
514/// Serializable shadow of [`LogRow`] **without** `this_hash`. Used solely as
515/// an intermediate to compute the canonical bytes that `this_hash` is the
516/// SHA-256 of. The wire key names match [`LogRow`]'s `serde` attributes.
517///
518/// v2 shape (ADR-0024): includes `schema_version` and
519/// `canonical_digest`. Both fields participate in the hash chain — a
520/// tampered `canonical_digest` is detected by `audit-log --verify`
521/// exactly like a tampered `ref` or `source` would be.
522#[derive(Serialize)]
523struct RowForHash<'a> {
524 ts: DateTime<Utc>,
525 ts_seq: u64,
526 event: LogEvent,
527 #[serde(rename = "ref")]
528 ref_: Option<&'a str>,
529 source: Option<&'a str>,
530 result: LogResult,
531 license: Option<&'a str>,
532 size_bytes: Option<u64>,
533 store_path: Option<&'a str>,
534 capability: Capability,
535 session_id: &'a str,
536 error_code: Option<&'a str>,
537 schema_version: &'a str,
538 canonical_digest: Option<&'a str>,
539 prev_hash: &'a str,
540}
541
542/// Produce canonical-JSON bytes for a row-without-hash, with object keys
543/// sorted lexicographically per PROVENANCE_LOG.md §4.
544///
545/// Implementation: serialize via `serde_json::to_value` to get a `Value`,
546/// require it be an object, then move its entries into a
547/// `BTreeMap<String, Value>` (which serializes with lex-sorted keys) and
548/// re-serialize compactly. No new dependency required.
549fn canonical_json_for_hash(rfh: &RowForHash<'_>) -> Result<Vec<u8>, LogError> {
550 let value = serde_json::to_value(rfh)?;
551 let map = match value {
552 serde_json::Value::Object(m) => m,
553 // RowForHash is always a struct, so this branch is unreachable in
554 // practice; surface as a serde error if it ever changes.
555 _ => {
556 return Err(LogError::Serialize(serde::de::Error::custom(
557 "RowForHash did not serialize to a JSON object",
558 )));
559 }
560 };
561 let sorted: BTreeMap<String, serde_json::Value> = map.into_iter().collect();
562 Ok(serde_json::to_vec(&sorted)?)
563}
564
565/// Compute `this_hash` for the given row-without-hash. Returns 64 lowercase
566/// hex chars.
567fn compute_this_hash(rfh: &RowForHash<'_>) -> Result<String, LogError> {
568 let bytes = canonical_json_for_hash(rfh)?;
569 let digest = Sha256::digest(&bytes);
570 Ok(hex::encode(digest))
571}
572
573impl ProvenanceLog {
574 /// Open or create the log at `path`, stamping every row with
575 /// `session_id`.
576 ///
577 /// `session_id` MUST be a 26-char ULID generated **once per process**
578 /// invocation by the caller. Re-opening the log within the same process
579 /// reuses the same `session_id`; re-opening in a new process gets a new
580 /// one. This crate intentionally does NOT generate the ULID itself —
581 /// callers are responsible for creating one (e.g. via the `ulid` crate
582 /// already present in the workspace) and threading it through.
583 ///
584 /// If the file exists, scan it once to recover the last `ts_seq` and
585 /// `this_hash`. If the file is missing or empty, the first row will use
586 /// `prev_hash = "GENESIS"` and `ts_seq = 1`.
587 ///
588 /// # Errors
589 ///
590 /// Returns [`LogError::Io`] for I/O failures or if any line fails to
591 /// parse as a [`LogRow`] (synthetic message: `"corrupted log at line N: …"`).
592 /// The writer never silently truncates a corrupt log.
593 ///
594 /// Returns [`LogError::NotARegularFile`] if `path` exists but is not a
595 /// regular file (e.g. a directory).
596 pub fn open(path: impl Into<Utf8PathBuf>, session_id: String) -> Result<Self, LogError> {
597 // Production path: the §6 threshold comes from
598 // `DOIGET_LOG_ROTATE_BYTES` (default 100 MiB), resolved ONCE here.
599 Self::open_with_rotate_threshold(path, session_id, rotate_threshold_bytes())
600 }
601
602 /// [`open`](Self::open) with an explicit rotation threshold instead
603 /// of reading `DOIGET_LOG_ROTATE_BYTES`.
604 ///
605 /// This exists so the rotation tests inject a tiny threshold WITHOUT
606 /// mutating the process-global env var: a global env knob raced
607 /// non-`#[serial]` tests (a concurrent test's `open` would cache the
608 /// tiny threshold and spuriously rotate). `#[serial]` only
609 /// serializes `#[serial]` tests, so injection — not serialization —
610 /// is the robust fix. `0` disables rotation.
611 pub(crate) fn open_with_rotate_threshold(
612 path: impl Into<Utf8PathBuf>,
613 session_id: String,
614 rotate_threshold: u64,
615 ) -> Result<Self, LogError> {
616 let path: Utf8PathBuf = path.into();
617
618 // Ensure the parent directory exists. The provenance log defaults to
619 // `<config>/doiget/access.jsonl`, and on a fresh machine (e.g. a CI
620 // runner where `~/.config/doiget` was never created) neither the
621 // recover-state read nor the first append can open the file — the
622 // append fails with ENOENT and `verify` fail-closes on the LogError.
623 // `create_dir_all` is idempotent; a genuine permission failure still
624 // surfaces as a `LogError` (the correct fail-closed signal).
625 if let Some(parent) = path.parent() {
626 if !parent.as_str().is_empty() {
627 std::fs::create_dir_all(parent.as_std_path())?;
628 }
629 }
630
631 // Reject obvious non-files up front so later `OpenOptions::append`
632 // doesn't produce a confusing platform-dependent error.
633 if path.exists() {
634 let md = std::fs::metadata(&path)?;
635 if !md.is_file() {
636 return Err(LogError::NotARegularFile(path));
637 }
638 }
639
640 let (next_seq, last_hash) = recover_state(&path)?;
641
642 // §6 retention: prune rotated `.gz` segments older than the
643 // window. Best-effort — pruning is housekeeping, not integrity,
644 // so a failure is logged and `open` still succeeds (unlike
645 // rotation, which is fail-closed).
646 prune_rotated_segments(&path, retention_days());
647
648 Ok(Self {
649 path,
650 state: Mutex::new(LogState {
651 next_seq,
652 last_hash,
653 }),
654 session_id,
655 rotate_threshold,
656 })
657 }
658
659 /// Append a row. Computes `prev_hash`, `ts_seq`, `ts`, `session_id`, and
660 /// `this_hash`; the caller only supplies the semantic fields via
661 /// [`RowInput`].
662 ///
663 /// Returns the assigned `ts_seq` on success.
664 ///
665 /// # Errors
666 ///
667 /// Returns [`LogError`] on serialization, I/O, or fsync failure. Callers
668 /// MUST treat this as fail-closed and abort the surrounding fetch.
669 pub fn append(&self, input: RowInput<'_>) -> Result<u64, LogError> {
670 // Hold the mutex for the entire append: serialize + write + flush +
671 // fsync + state update. This is the in-process serialization point
672 // promised by `docs/SECURITY.md` §1.8.
673 //
674 // A poisoned mutex only happens if a previous `append` panicked
675 // mid-write. Surface that as an I/O error rather than propagating
676 // a panic.
677 let mut state = self
678 .state
679 .lock()
680 .map_err(|_| LogError::Io(std::io::Error::other("provenance log mutex poisoned")))?;
681
682 // §6 rotation, BEFORE this row is written. If `access.log` has
683 // reached the threshold, gzip+rename it and reset the in-memory
684 // chain state so this row becomes the GENESIS-rooted first row of
685 // a fresh file. Fail-closed: a rotation error aborts the append
686 // (the `?`), so the caller's fetch aborts and the chain never
687 // silently continues in an over-size or half-rotated file. The
688 // `state` mutex is held, so rotation is serialized with appends.
689 let threshold = self.rotate_threshold;
690 if threshold > 0 {
691 let size = match std::fs::metadata(&self.path) {
692 Ok(m) => m.len(),
693 Err(e) if e.kind() == std::io::ErrorKind::NotFound => 0,
694 Err(e) => return Err(LogError::Io(e)),
695 };
696 if size >= threshold {
697 rotate_log(&self.path)?;
698 state.next_seq = 1;
699 state.last_hash = GENESIS_HASH.to_string();
700 }
701 }
702
703 let ts_seq = state.next_seq;
704 let prev_hash = state.last_hash.clone();
705 let ts = Utc::now();
706
707 let rfh = RowForHash {
708 ts,
709 ts_seq,
710 event: input.event,
711 ref_: input.ref_,
712 source: input.source,
713 result: input.result,
714 license: input.license,
715 size_bytes: input.size_bytes,
716 store_path: input.store_path,
717 capability: input.capability,
718 session_id: &self.session_id,
719 error_code: input.error_code,
720 schema_version: LOG_SCHEMA_VERSION,
721 canonical_digest: input.canonical_digest,
722 prev_hash: &prev_hash,
723 };
724
725 let this_hash = compute_this_hash(&rfh)?;
726
727 // Build the on-disk row. Owned strings here because `LogRow` does
728 // not borrow.
729 let row = LogRow {
730 ts,
731 ts_seq,
732 event: input.event,
733 ref_: input.ref_.map(str::to_string),
734 source: input.source.map(str::to_string),
735 result: input.result,
736 license: input.license.map(str::to_string),
737 size_bytes: input.size_bytes,
738 store_path: input.store_path.map(str::to_string),
739 capability: input.capability,
740 session_id: self.session_id.clone(),
741 error_code: input.error_code.map(str::to_string),
742 schema_version: LOG_SCHEMA_VERSION.to_string(),
743 canonical_digest: input.canonical_digest.map(str::to_string),
744 prev_hash,
745 this_hash: this_hash.clone(),
746 };
747
748 // Serialize, append `\n`, write_all in one syscall, flush BufWriter,
749 // fsync the underlying file. `\n` is part of the same buffer, so a
750 // crash mid-write leaves at most a partial line (no trailing `\n`),
751 // which is detectable on recovery as a corrupted final line.
752 let mut bytes = serde_json::to_vec(&row)?;
753 bytes.push(b'\n');
754
755 let file = OpenOptions::new()
756 .create(true)
757 .append(true)
758 .open(&self.path)?;
759 let mut writer = BufWriter::new(file);
760 writer.write_all(&bytes)?;
761 writer.flush()?;
762 // `into_inner` to recover the underlying File for `sync_all`.
763 let file = writer.into_inner().map_err(|e| {
764 LogError::Io(std::io::Error::other(format!(
765 "buf writer flush failed: {}",
766 e.error()
767 )))
768 })?;
769 file.sync_all()?;
770
771 // Only after a successful fsync do we advance the in-memory state.
772 // If any of the above fails, the next `append` retries from the
773 // same `(ts_seq, prev_hash)` — at most a torn last line on disk.
774 state.next_seq = ts_seq + 1;
775 state.last_hash = this_hash;
776
777 Ok(ts_seq)
778 }
779
780 /// Returns the path the log was opened at. Useful for tests and audit tooling.
781 pub fn path(&self) -> &Utf8Path {
782 &self.path
783 }
784
785 /// Returns the session id stamped into every row written through this
786 /// writer.
787 pub fn session_id(&self) -> &str {
788 &self.session_id
789 }
790}
791
792/// Scan an existing log to recover `(next_seq, last_hash)`.
793///
794/// Walk every line, parse as [`LogRow`], track the last successfully parsed
795/// row. If parsing fails, return [`LogError::Io`] with a synthetic
796/// `"corrupted log at line N: …"` message — never silently truncate.
797fn recover_state(path: &Utf8Path) -> Result<(u64, String), LogError> {
798 let file = match File::open(path) {
799 Ok(f) => f,
800 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
801 return Ok((1, GENESIS_HASH.to_string()));
802 }
803 Err(e) => return Err(LogError::Io(e)),
804 };
805
806 let reader = BufReader::new(file);
807 let mut last_seq: u64 = 0;
808 let mut last_hash: String = GENESIS_HASH.to_string();
809
810 for (idx, line_res) in reader.lines().enumerate() {
811 let line_no = idx + 1;
812 let line = line_res?;
813 if line.is_empty() {
814 // Tolerate trailing/empty lines silently — they are not data.
815 continue;
816 }
817 let row: LogRow = serde_json::from_str(&line).map_err(|e| {
818 LogError::Io(std::io::Error::new(
819 std::io::ErrorKind::InvalidData,
820 format!("corrupted log at line {}: {}", line_no, e),
821 ))
822 })?;
823 last_seq = row.ts_seq;
824 last_hash = row.this_hash;
825 }
826
827 if last_seq == 0 {
828 Ok((1, GENESIS_HASH.to_string()))
829 } else {
830 Ok((last_seq + 1, last_hash))
831 }
832}
833
834// ---------------------------------------------------------------------------
835// Verification (`doiget audit-log --verify`)
836//
837// The provenance log is a JSON Lines file with a SHA-256 hash chain
838// (PROVENANCE_LOG.md §4). Tampering is detected by recomputing every row's
839// `this_hash` and validating the chain. This module provides the offline
840// verifier; the CLI wrapper lives in `doiget-cli::commands::audit_log`.
841//
842// Failure model: returning `Err` is reserved for I/O failures opening / reading
843// the file. Per-row issues (parse failures, hash/chain mismatches, sequence
844// regressions) are accumulated into [`VerifyReport::errors`] so callers can
845// report them all in one pass — this is the contract Phase 1 ships.
846// ---------------------------------------------------------------------------
847
848/// Outcome of [`verify`]: per-row chain status across the entire log.
849#[derive(Debug, Clone)]
850#[non_exhaustive]
851pub struct VerifyReport {
852 /// Total non-empty lines processed (1-based count).
853 pub total_rows: usize,
854 /// Rows whose hash, chain link, and `ts_seq` all validated.
855 pub ok_rows: usize,
856 /// Issues encountered, in encounter order. Line numbers are 1-based.
857 pub errors: Vec<VerifyIssue>,
858}
859
860impl VerifyReport {
861 /// An empty, all-clear report — used when the log file is absent.
862 fn empty() -> Self {
863 Self {
864 total_rows: 0,
865 ok_rows: 0,
866 errors: Vec::new(),
867 }
868 }
869}
870
871/// A single issue discovered by [`verify`].
872#[derive(Debug, Clone)]
873#[non_exhaustive]
874pub struct VerifyIssue {
875 /// 1-based line number where the issue was detected.
876 pub line: usize,
877 /// Classification of the issue (see [`VerifyIssueKind`]).
878 pub kind: VerifyIssueKind,
879 /// Human-readable description (caller may format for stderr/stdout).
880 pub message: String,
881}
882
883/// Classification of a [`VerifyIssue`]. `non_exhaustive` for forward
884/// compatibility — future kinds may include `SessionIdChange`, etc.
885#[derive(Debug, Clone, Copy, PartialEq, Eq)]
886#[non_exhaustive]
887pub enum VerifyIssueKind {
888 /// Row failed to parse as [`LogRow`] (corrupted JSON or unknown field).
889 ParseError,
890 /// `prev_hash` did not match the previous row's `this_hash` (or the
891 /// genesis sentinel on row 1).
892 PrevHashMismatch,
893 /// Row's stored `this_hash` did not match the recomputed canonical-JSON
894 /// SHA-256.
895 ThisHashMismatch,
896 /// `ts_seq` did not increase strictly monotonically (within a session;
897 /// see PROVENANCE_LOG.md §3 + §6 — chain restarts after rotation are
898 /// permitted to reset `ts_seq` and are detected via the genesis sentinel).
899 SequenceJump,
900}
901
902/// Verify the entire log file at `path`.
903///
904/// Returns `Ok(VerifyReport)` regardless of whether the chain validates;
905/// callers inspect `report.errors.is_empty()` to determine pass/fail.
906/// Returns `Err` only when the file itself cannot be opened or read at the
907/// I/O level.
908///
909/// Behavior:
910///
911/// - A missing file is treated as a clean, empty log (no tampering possible
912/// on bytes that don't exist) and returns an empty report after a `warn!`.
913/// - Empty / blank lines are skipped — they are not data per the writer's
914/// on-disk format (PROVENANCE_LOG.md §2).
915/// - On a row that fails to parse as [`LogRow`], a `ParseError` is recorded
916/// and verification continues on the next line. The chain anchor does NOT
917/// advance through an unparsable row, so the next valid row's `prev_hash`
918/// is checked against the last successfully parsed row (or against
919/// `"GENESIS"` if no valid row has been seen yet).
920/// - A `prev_hash == "GENESIS"` sentinel marks a chain restart (first row of
921/// a fresh / rotated log per §6) and resets the `ts_seq` monotonicity
922/// anchor — `ts_seq` is NOT compared to the prior row across a restart.
923pub fn verify(path: &Utf8Path) -> Result<VerifyReport, LogError> {
924 let file = match File::open(path) {
925 Ok(f) => f,
926 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
927 tracing::warn!(
928 path = %path,
929 "audit-log verify: log file does not exist; reporting empty"
930 );
931 return Ok(VerifyReport::empty());
932 }
933 Err(e) => return Err(LogError::Io(e)),
934 };
935
936 let reader = BufReader::new(file);
937 let mut report = VerifyReport::empty();
938
939 // Anchor for the chain check: the LAST SUCCESSFULLY PARSED row. The chain
940 // is anchored to the bytes on disk, not to a hypothetical "should have
941 // been". This matches the spec — tampering at row N must surface both as
942 // a hash mismatch on N and as a chain break on N+1.
943 let mut prev_row: Option<LogRow> = None;
944
945 for (idx, line_res) in reader.lines().enumerate() {
946 let line_no = idx + 1;
947 let line = line_res?;
948 if line.is_empty() {
949 continue;
950 }
951
952 report.total_rows += 1;
953
954 let row: LogRow = match serde_json::from_str(&line) {
955 Ok(r) => r,
956 Err(e) => {
957 report.errors.push(VerifyIssue {
958 line: line_no,
959 kind: VerifyIssueKind::ParseError,
960 message: format!("failed to parse row as LogRow: {e}"),
961 });
962 // Chain anchor cannot advance through an unparsable row;
963 // leave `prev_row` untouched so the next valid row's
964 // `prev_hash` is checked against the last-known anchor (or
965 // GENESIS if we never had one).
966 continue;
967 }
968 };
969
970 let mut row_ok = true;
971
972 // 1. Recompute `this_hash` from canonical JSON (row \ {this_hash}).
973 let rfh = RowForHash {
974 ts: row.ts,
975 ts_seq: row.ts_seq,
976 event: row.event,
977 ref_: row.ref_.as_deref(),
978 source: row.source.as_deref(),
979 result: row.result,
980 license: row.license.as_deref(),
981 size_bytes: row.size_bytes,
982 store_path: row.store_path.as_deref(),
983 capability: row.capability,
984 session_id: &row.session_id,
985 error_code: row.error_code.as_deref(),
986 schema_version: &row.schema_version,
987 canonical_digest: row.canonical_digest.as_deref(),
988 prev_hash: &row.prev_hash,
989 };
990 match compute_this_hash(&rfh) {
991 Ok(recomputed) => {
992 if recomputed != row.this_hash {
993 report.errors.push(VerifyIssue {
994 line: line_no,
995 kind: VerifyIssueKind::ThisHashMismatch,
996 message: format!(
997 "this_hash mismatch: stored={}, recomputed={}",
998 row.this_hash, recomputed
999 ),
1000 });
1001 row_ok = false;
1002 }
1003 }
1004 Err(e) => {
1005 // Canonicalization itself failed — surface as a hash
1006 // mismatch with the underlying error in the message.
1007 report.errors.push(VerifyIssue {
1008 line: line_no,
1009 kind: VerifyIssueKind::ThisHashMismatch,
1010 message: format!("failed to recompute this_hash: {e}"),
1011 });
1012 row_ok = false;
1013 }
1014 }
1015
1016 // 2. Chain link: `prev_hash` matches anchor (GENESIS on row 1 / after
1017 // a chain restart, prior row's `this_hash` otherwise).
1018 let is_genesis = row.prev_hash == GENESIS_HASH;
1019 match &prev_row {
1020 None => {
1021 // First non-empty row in the file: must declare GENESIS.
1022 if !is_genesis {
1023 report.errors.push(VerifyIssue {
1024 line: line_no,
1025 kind: VerifyIssueKind::PrevHashMismatch,
1026 message: format!(
1027 "first row must have prev_hash=\"GENESIS\", got {:?}",
1028 row.prev_hash
1029 ),
1030 });
1031 row_ok = false;
1032 }
1033 }
1034 Some(prev) => {
1035 if is_genesis {
1036 // Chain restart (rotation per §6) — accepted, no link
1037 // check, and the `ts_seq` monotonicity anchor resets
1038 // (handled below via `is_genesis`).
1039 } else if row.prev_hash != prev.this_hash {
1040 report.errors.push(VerifyIssue {
1041 line: line_no,
1042 kind: VerifyIssueKind::PrevHashMismatch,
1043 message: format!(
1044 "prev_hash mismatch: row stores {}, previous row's this_hash is {}",
1045 row.prev_hash, prev.this_hash
1046 ),
1047 });
1048 row_ok = false;
1049 }
1050 }
1051 }
1052
1053 // 3. ts_seq monotonicity — strictly greater than the previous row's
1054 // `ts_seq`, EXCEPT across a chain restart (where `ts_seq` resets).
1055 if let Some(prev) = &prev_row {
1056 if !is_genesis && row.ts_seq <= prev.ts_seq {
1057 report.errors.push(VerifyIssue {
1058 line: line_no,
1059 kind: VerifyIssueKind::SequenceJump,
1060 message: format!(
1061 "ts_seq did not increase strictly: previous={}, current={}",
1062 prev.ts_seq, row.ts_seq
1063 ),
1064 });
1065 row_ok = false;
1066 }
1067 }
1068
1069 if row_ok {
1070 report.ok_rows += 1;
1071 }
1072
1073 // Advance the anchor to the just-parsed row (whether or not it had
1074 // issues — the on-disk bytes ARE the chain).
1075 prev_row = Some(row);
1076 }
1077
1078 Ok(report)
1079}
1080
1081// ---------------------------------------------------------------------------
1082// v1 → v2 migration (ADR-0024, `docs/PROVENANCE_LOG.md` §"Schema migration").
1083//
1084// v1 rows lack `schema_version` and `canonical_digest`; the v2 binary
1085// fails loudly when asked to read them (see `recover_state` /
1086// `verify`). The migration recovers a v2 log from a v1 file by:
1087//
1088// 1. Parsing every v1 row via the [`V1LogRow`] shadow struct.
1089// 2. Deriving a [`crate::CanonicalRef`] from the v1 `(ref, source)`
1090// pair — `source` becomes `resolver_profile`, `version` is `None`
1091// (ADR-0021 §1 → ADR-0024 migration recipe).
1092// 3. Re-computing the SHA-256 hash chain across the new row
1093// payloads. The v1 chain is invalidated by the schema change; the
1094// v2 chain restarts at the first row's stored `prev_hash` (which
1095// is `"GENESIS"` on a fresh log).
1096// 4. Writing the new rows to `<log_path>.v2-migrated`, then
1097// atomically renaming it onto `<log_path>` after backing up the
1098// original to `<log_path>.v1-backup`.
1099//
1100// The migration is **idempotent**: running it on an already-v2 log
1101// re-parses every row as v2, recomputes the same hash chain, and
1102// produces a byte-equivalent output.
1103//
1104// The migration is **dry-runnable**: `dry_run = true` returns a
1105// [`MigrationReport`] summarizing what would change without touching
1106// disk.
1107// ---------------------------------------------------------------------------
1108
1109/// Summary of a [`migrate_v1_to_v2`] run.
1110///
1111/// Marked `#[non_exhaustive]` so future fields (e.g. a per-row error
1112/// list, an aborted-row count) can be added without breaking callers
1113/// that pattern-match.
1114///
1115/// `Serialize` enables `provenance migrate --mode json` (#204) — the
1116/// wire form is `{"rows_rewritten": N, "dry_run": bool,
1117/// "first_row_v1_chain_hash": "...", "first_row_v2_chain_hash": "..."}`.
1118///
1119/// # Wire-format stability (post-#208 self-review §1)
1120///
1121/// Once a release ships with the [`Serialize`] derive, the field
1122/// **names** below become part of the public API. Renaming a field is
1123/// then a semver minor bump and warrants a CHANGELOG \[BREAKING\] note;
1124/// new fields are still safe (per `#[non_exhaustive]`).
1125#[derive(Debug, Clone, Serialize)]
1126#[non_exhaustive]
1127pub struct MigrationReport {
1128 /// Number of rows rewritten (or that WOULD be rewritten under
1129 /// `dry_run`).
1130 pub rows_rewritten: u64,
1131 /// Whether this was a dry-run preview (`true`) or a live rewrite
1132 /// (`false`).
1133 pub dry_run: bool,
1134 /// Stored `this_hash` of the first input row (the v1 chain anchor).
1135 /// `"GENESIS"` is reported as the literal `"GENESIS"` when the log
1136 /// was empty.
1137 pub first_row_v1_chain_hash: String,
1138 /// Recomputed `this_hash` of the first migrated row under the v2
1139 /// canonicalization. Equal to [`Self::first_row_v1_chain_hash`]
1140 /// only if the input was already v2 (idempotent case).
1141 pub first_row_v2_chain_hash: String,
1142}
1143
1144/// v1 row shadow struct used ONLY by [`migrate_v1_to_v2`]. The
1145/// non-defaulted v2 fields (`schema_version`, `canonical_digest`) are
1146/// absent here; `deny_unknown_fields` rejects unexpected v2 fields so a
1147/// v2 row on disk fails to parse as v1, letting the migrator detect
1148/// already-v2 input via fallback to the v2 parser.
1149#[derive(Debug, Clone, Deserialize, Serialize)]
1150#[serde(deny_unknown_fields)]
1151struct V1LogRow {
1152 ts: DateTime<Utc>,
1153 ts_seq: u64,
1154 event: LogEvent,
1155 #[serde(rename = "ref")]
1156 ref_: Option<String>,
1157 source: Option<String>,
1158 result: LogResult,
1159 license: Option<String>,
1160 size_bytes: Option<u64>,
1161 store_path: Option<String>,
1162 capability: Capability,
1163 session_id: String,
1164 error_code: Option<String>,
1165 prev_hash: String,
1166 this_hash: String,
1167}
1168
1169/// Minimal in-memory representation a v1 OR v2 row can be promoted to
1170/// before re-hashing.
1171#[derive(Debug, Clone)]
1172struct MigrationRowSeed {
1173 ts: DateTime<Utc>,
1174 ts_seq: u64,
1175 event: LogEvent,
1176 ref_: Option<String>,
1177 source: Option<String>,
1178 result: LogResult,
1179 license: Option<String>,
1180 size_bytes: Option<u64>,
1181 store_path: Option<String>,
1182 capability: Capability,
1183 session_id: String,
1184 error_code: Option<String>,
1185 /// `None` for v1 inputs (the digest is computed during migration);
1186 /// `Some(...)` for already-v2 inputs (carried through verbatim for
1187 /// idempotency).
1188 canonical_digest_in: Option<String>,
1189 /// As stored on disk in the input. Used only for the
1190 /// `first_row_v1_chain_hash` field of [`MigrationReport`].
1191 stored_this_hash: String,
1192}
1193
1194/// Migrate a v1 provenance log to v2 (ADR-0024).
1195///
1196/// Returns a [`MigrationReport`] describing how many rows were (or
1197/// would be) rewritten and the first-row chain-anchor delta. The
1198/// migration is idempotent: running it twice produces byte-equivalent
1199/// output the second time.
1200///
1201/// On a missing log file, returns a no-op report (`rows_rewritten = 0`,
1202/// `first_row_v1_chain_hash = "GENESIS"`, `first_row_v2_chain_hash =
1203/// "GENESIS"`) — there is nothing to migrate.
1204///
1205/// # Errors
1206///
1207/// Returns [`LogError::Io`] on I/O failures and on rows that fail to
1208/// parse as either v1 or v2 (the synthetic message names the line
1209/// number). Returns [`LogError::Serialize`] on canonicalization
1210/// failures.
1211pub fn migrate_v1_to_v2(log_path: &Utf8Path, dry_run: bool) -> Result<MigrationReport, LogError> {
1212 use std::io::BufRead;
1213
1214 // -- 1. Read the input log, parsing each line as v1 OR (idempotent
1215 // fallback) v2. --------------------------------------------------
1216 let file = match File::open(log_path) {
1217 Ok(f) => f,
1218 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1219 return Ok(MigrationReport {
1220 rows_rewritten: 0,
1221 dry_run,
1222 first_row_v1_chain_hash: GENESIS_HASH.to_string(),
1223 first_row_v2_chain_hash: GENESIS_HASH.to_string(),
1224 });
1225 }
1226 Err(e) => return Err(LogError::Io(e)),
1227 };
1228 let reader = BufReader::new(file);
1229 let mut seeds: Vec<MigrationRowSeed> = Vec::new();
1230
1231 for (idx, line_res) in reader.lines().enumerate() {
1232 let line_no = idx + 1;
1233 let line = line_res?;
1234 if line.is_empty() {
1235 continue;
1236 }
1237 // Try v1 first. If it fails, try v2 (idempotency: re-migrating
1238 // a v2 log MUST succeed and produce equivalent output).
1239 let seed = if let Ok(v1) = serde_json::from_str::<V1LogRow>(&line) {
1240 MigrationRowSeed {
1241 ts: v1.ts,
1242 ts_seq: v1.ts_seq,
1243 event: v1.event,
1244 ref_: v1.ref_,
1245 source: v1.source,
1246 result: v1.result,
1247 license: v1.license,
1248 size_bytes: v1.size_bytes,
1249 store_path: v1.store_path,
1250 capability: v1.capability,
1251 session_id: v1.session_id,
1252 error_code: v1.error_code,
1253 canonical_digest_in: None,
1254 stored_this_hash: v1.this_hash,
1255 }
1256 } else {
1257 match serde_json::from_str::<LogRow>(&line) {
1258 Ok(v2) => MigrationRowSeed {
1259 ts: v2.ts,
1260 ts_seq: v2.ts_seq,
1261 event: v2.event,
1262 ref_: v2.ref_,
1263 source: v2.source,
1264 result: v2.result,
1265 license: v2.license,
1266 size_bytes: v2.size_bytes,
1267 store_path: v2.store_path,
1268 capability: v2.capability,
1269 session_id: v2.session_id,
1270 error_code: v2.error_code,
1271 canonical_digest_in: v2.canonical_digest,
1272 stored_this_hash: v2.this_hash,
1273 },
1274 Err(e) => {
1275 return Err(LogError::Io(std::io::Error::new(
1276 std::io::ErrorKind::InvalidData,
1277 format!("migration: line {line_no} is neither v1 nor v2: {e}"),
1278 )));
1279 }
1280 }
1281 };
1282 seeds.push(seed);
1283 }
1284
1285 // -- 2. Derive `canonical_digest` for each seed that lacks one. ------
1286 //
1287 // For v1 rows: build a CanonicalRef from
1288 // - source_type from `event`/`ref` shape (DOI prefix `10.` vs
1289 // arXiv) — we use a heuristic that matches `Ref::parse`'s rule
1290 // (`starts_with "10."` ⇒ DOI; else arXiv).
1291 // - source_id = ref value (verbatim).
1292 // - resolver_profile = source value (verbatim, ADR-0021 §3
1293 // migration recipe).
1294 // - version = None.
1295 //
1296 // Rows without a `ref` (session bookend) keep `canonical_digest =
1297 // None` per the v2 row contract.
1298
1299 fn derive_digest(seed: &MigrationRowSeed) -> Option<String> {
1300 let ref_str = seed.ref_.as_deref()?;
1301 let source_key = seed.source.as_deref().unwrap_or("");
1302 // Heuristic: bare DOIs always start `10.`; everything else is
1303 // treated as an arXiv id. Mirrors `Ref::parse` rule 3/4.
1304 let source_type = if ref_str.starts_with("10.") {
1305 crate::SourceType::Doi
1306 } else {
1307 crate::SourceType::Arxiv
1308 };
1309 let c = crate::CanonicalRef::new(source_type, ref_str, source_key, None);
1310 Some(c.digest_hex())
1311 }
1312
1313 let digests: Vec<Option<String>> = seeds
1314 .iter()
1315 .map(|s| s.canonical_digest_in.clone().or_else(|| derive_digest(s)))
1316 .collect();
1317
1318 // -- 3. Rebuild the hash chain across the v2 payloads. ----------------
1319 let mut out_rows: Vec<LogRow> = Vec::with_capacity(seeds.len());
1320 let mut prev_hash: String = GENESIS_HASH.to_string();
1321
1322 for (seed, digest) in seeds.iter().zip(digests.iter()) {
1323 let rfh = RowForHash {
1324 ts: seed.ts,
1325 ts_seq: seed.ts_seq,
1326 event: seed.event,
1327 ref_: seed.ref_.as_deref(),
1328 source: seed.source.as_deref(),
1329 result: seed.result,
1330 license: seed.license.as_deref(),
1331 size_bytes: seed.size_bytes,
1332 store_path: seed.store_path.as_deref(),
1333 capability: seed.capability,
1334 session_id: &seed.session_id,
1335 error_code: seed.error_code.as_deref(),
1336 schema_version: LOG_SCHEMA_VERSION,
1337 canonical_digest: digest.as_deref(),
1338 prev_hash: &prev_hash,
1339 };
1340 let this_hash = compute_this_hash(&rfh)?;
1341 let row = LogRow {
1342 ts: seed.ts,
1343 ts_seq: seed.ts_seq,
1344 event: seed.event,
1345 ref_: seed.ref_.clone(),
1346 source: seed.source.clone(),
1347 result: seed.result,
1348 license: seed.license.clone(),
1349 size_bytes: seed.size_bytes,
1350 store_path: seed.store_path.clone(),
1351 capability: seed.capability,
1352 session_id: seed.session_id.clone(),
1353 error_code: seed.error_code.clone(),
1354 schema_version: LOG_SCHEMA_VERSION.to_string(),
1355 canonical_digest: digest.clone(),
1356 prev_hash: prev_hash.clone(),
1357 this_hash: this_hash.clone(),
1358 };
1359 prev_hash = this_hash;
1360 out_rows.push(row);
1361 }
1362
1363 // -- 4. Build the report. --------------------------------------------
1364 let first_v1_hash = seeds
1365 .first()
1366 .map(|s| s.stored_this_hash.clone())
1367 .unwrap_or_else(|| GENESIS_HASH.to_string());
1368 let first_v2_hash = out_rows
1369 .first()
1370 .map(|r| r.this_hash.clone())
1371 .unwrap_or_else(|| GENESIS_HASH.to_string());
1372 let report = MigrationReport {
1373 rows_rewritten: out_rows.len() as u64,
1374 dry_run,
1375 first_row_v1_chain_hash: first_v1_hash,
1376 first_row_v2_chain_hash: first_v2_hash,
1377 };
1378
1379 if dry_run {
1380 return Ok(report);
1381 }
1382
1383 // -- 5. Live write: stage to `<log_path>.v2-migrated`, back up the
1384 // v1, then atomically rename. -----------------------------------
1385 let staged_path = with_suffix(log_path, ".v2-migrated");
1386 let backup_path = with_suffix(log_path, ".v1-backup");
1387
1388 {
1389 let staged_file = OpenOptions::new()
1390 .create(true)
1391 .write(true)
1392 .truncate(true)
1393 .open(&staged_path)?;
1394 let mut writer = BufWriter::new(staged_file);
1395 for row in &out_rows {
1396 let mut bytes = serde_json::to_vec(row)?;
1397 bytes.push(b'\n');
1398 writer.write_all(&bytes)?;
1399 }
1400 writer.flush()?;
1401 let file = writer.into_inner().map_err(|e| {
1402 LogError::Io(std::io::Error::other(format!(
1403 "migration buf writer flush failed: {}",
1404 e.error()
1405 )))
1406 })?;
1407 file.sync_all()?;
1408 }
1409
1410 // Sanity-check: the staged file MUST verify clean before we
1411 // commit the swap. If it doesn't, the migration is buggy — abort
1412 // without touching the live log.
1413 let verify_report = verify(&staged_path)?;
1414 if !verify_report.errors.is_empty() {
1415 return Err(LogError::Io(std::io::Error::other(format!(
1416 "migration: staged v2 log failed verify; first issue: {:?}",
1417 verify_report.errors.first()
1418 ))));
1419 }
1420
1421 // Move the original aside as `<log_path>.v1-backup`. Overwriting
1422 // any prior backup is intentional — the user re-running migrate
1423 // expects the most recent original preserved.
1424 if log_path.exists() {
1425 if backup_path.exists() {
1426 std::fs::remove_file(&backup_path)?;
1427 }
1428 std::fs::rename(log_path, &backup_path)?;
1429 }
1430 // Atomically promote the staged file to the live path.
1431 std::fs::rename(&staged_path, log_path)?;
1432
1433 Ok(report)
1434}
1435
1436/// Append a literal suffix to a [`Utf8Path`], producing a sibling path
1437/// in the same directory. Avoids `std::path::PathBuf` per the workspace
1438/// posture rule (`docs/SECURITY.md` §3 — camino-only file paths in
1439/// production code).
1440fn with_suffix(path: &Utf8Path, suffix: &str) -> Utf8PathBuf {
1441 let s = format!("{path}{suffix}");
1442 Utf8PathBuf::from(s)
1443}
1444
1445// ---------------------------------------------------------------------------
1446// Tests
1447// ---------------------------------------------------------------------------
1448
1449#[cfg(test)]
1450#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
1451mod tests {
1452 use super::*;
1453 use std::fs;
1454 use std::sync::Arc;
1455 use std::thread;
1456
1457 use tempfile::TempDir;
1458
1459 /// Convert a `TempDir`'s `&std::path::Path` to a `Utf8PathBuf`. Tests
1460 /// always run on UTF-8 temp paths in CI; if the OS returns a non-UTF-8
1461 /// path we panic, which is acceptable for a unit test.
1462 fn tmp_dir_utf8(dir: &TempDir) -> Utf8PathBuf {
1463 Utf8PathBuf::from_path_buf(dir.path().to_path_buf()).expect("temp dir path must be UTF-8")
1464 }
1465
1466 /// A fixed 26-char ULID-shaped string used in tests. Real callers use
1467 /// the `ulid` crate; tests pin a constant so output is reproducible.
1468 const TEST_SESSION_ID: &str = "01JCKZ7Q0000000000000000AB";
1469
1470 fn open_log(path: &Utf8Path) -> ProvenanceLog {
1471 ProvenanceLog::open(path, TEST_SESSION_ID.to_string()).expect("open")
1472 }
1473
1474 #[test]
1475 fn open_creates_missing_parent_dir() {
1476 // Regression: opening a log whose parent dir does not yet exist must
1477 // create the dir and succeed (then a row appends cleanly), not abort
1478 // with ENOENT. This is the `doiget verify` failure on a fresh CI
1479 // runner where `<config>/doiget/` was never created.
1480 let dir = TempDir::new().expect("tempdir");
1481 let path = tmp_dir_utf8(&dir)
1482 .join("nested")
1483 .join("doiget")
1484 .join("access.jsonl");
1485 assert!(
1486 !path.parent().expect("has parent").exists(),
1487 "parent dir must not pre-exist for this test to be meaningful"
1488 );
1489 let log = ProvenanceLog::open(&path, TEST_SESSION_ID.to_string())
1490 .expect("open must create the parent dir and succeed");
1491 log.append(empty_input())
1492 .expect("append after auto-created dir");
1493 assert!(path.exists(), "log file written under the auto-created dir");
1494 // End-to-end: the row the verify path would write is actually
1495 // readable back (exercises the full OpenOptions/flush/sync write,
1496 // not just that the file exists).
1497 let rows = read_rows(&path);
1498 assert_eq!(rows.len(), 1, "exactly one row in the auto-created log");
1499 }
1500
1501 fn empty_input() -> RowInput<'static> {
1502 RowInput {
1503 event: LogEvent::Fetch,
1504 result: LogResult::Ok,
1505 capability: Capability::Oa,
1506 ref_: None,
1507 source: None,
1508 error_code: None,
1509 size_bytes: None,
1510 license: None,
1511 store_path: None,
1512 canonical_digest: None,
1513 }
1514 }
1515
1516 /// Read the on-disk log and parse every line into a `LogRow`.
1517 fn read_rows(path: &Utf8Path) -> Vec<LogRow> {
1518 let raw = fs::read_to_string(path).expect("read log");
1519 raw.lines()
1520 .filter(|l| !l.is_empty())
1521 .map(|l| serde_json::from_str::<LogRow>(l).expect("valid LogRow"))
1522 .collect()
1523 }
1524
1525 /// Recompute `this_hash` for a stored row and assert it matches the
1526 /// stored value. Walks the same canonicalization rule as
1527 /// [`compute_this_hash`].
1528 fn verify_this_hash(row: &LogRow) {
1529 let rfh = RowForHash {
1530 ts: row.ts,
1531 ts_seq: row.ts_seq,
1532 event: row.event,
1533 ref_: row.ref_.as_deref(),
1534 source: row.source.as_deref(),
1535 result: row.result,
1536 license: row.license.as_deref(),
1537 size_bytes: row.size_bytes,
1538 store_path: row.store_path.as_deref(),
1539 capability: row.capability,
1540 session_id: &row.session_id,
1541 error_code: row.error_code.as_deref(),
1542 schema_version: &row.schema_version,
1543 canonical_digest: row.canonical_digest.as_deref(),
1544 prev_hash: &row.prev_hash,
1545 };
1546 let recomputed = compute_this_hash(&rfh).expect("hash");
1547 assert_eq!(
1548 recomputed, row.this_hash,
1549 "this_hash mismatch on ts_seq {}",
1550 row.ts_seq
1551 );
1552 }
1553
1554 #[test]
1555 fn first_row_uses_genesis_prev_hash() {
1556 let dir = TempDir::new().expect("tmp");
1557 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1558 let log = open_log(&path);
1559 let seq = log.append(empty_input()).expect("append");
1560 assert_eq!(seq, 1);
1561
1562 let rows = read_rows(&path);
1563 assert_eq!(rows.len(), 1);
1564 assert_eq!(rows[0].ts_seq, 1);
1565 assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1566 assert_eq!(rows[0].this_hash.len(), 64);
1567 assert_eq!(rows[0].session_id, TEST_SESSION_ID);
1568 verify_this_hash(&rows[0]);
1569 }
1570
1571 #[test]
1572 fn subsequent_rows_chain_correctly() {
1573 let dir = TempDir::new().expect("tmp");
1574 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1575 let log = open_log(&path);
1576
1577 for _ in 0..3 {
1578 log.append(empty_input()).expect("append");
1579 }
1580
1581 let rows = read_rows(&path);
1582 assert_eq!(rows.len(), 3);
1583 assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1584 assert_eq!(rows[1].prev_hash, rows[0].this_hash);
1585 assert_eq!(rows[2].prev_hash, rows[1].this_hash);
1586 for r in &rows {
1587 verify_this_hash(r);
1588 }
1589 assert_eq!(rows[0].ts_seq, 1);
1590 assert_eq!(rows[1].ts_seq, 2);
1591 assert_eq!(rows[2].ts_seq, 3);
1592 }
1593
1594 #[test]
1595 fn recovery_after_reopen() {
1596 let dir = TempDir::new().expect("tmp");
1597 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1598
1599 {
1600 let log = open_log(&path);
1601 for _ in 0..3 {
1602 log.append(empty_input()).expect("append");
1603 }
1604 } // drop writer
1605
1606 let log2 = open_log(&path);
1607 let seq = log2.append(empty_input()).expect("append after reopen");
1608 assert_eq!(seq, 4);
1609
1610 let rows = read_rows(&path);
1611 assert_eq!(rows.len(), 4);
1612 assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1613 for i in 1..rows.len() {
1614 assert_eq!(
1615 rows[i].prev_hash,
1616 rows[i - 1].this_hash,
1617 "chain break at row {}",
1618 i + 1
1619 );
1620 }
1621 for (i, r) in rows.iter().enumerate() {
1622 assert_eq!(r.ts_seq, (i + 1) as u64);
1623 verify_this_hash(r);
1624 }
1625 }
1626
1627 #[test]
1628 fn concurrent_writers_in_same_process_serialize() {
1629 let dir = TempDir::new().expect("tmp");
1630 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1631 let log = Arc::new(open_log(&path));
1632
1633 let mut handles = Vec::with_capacity(8);
1634 for _ in 0..8 {
1635 let log = Arc::clone(&log);
1636 handles.push(thread::spawn(move || {
1637 log.append(empty_input()).expect("append")
1638 }));
1639 }
1640 let mut returned: Vec<u64> = handles
1641 .into_iter()
1642 .map(|h| h.join().expect("join"))
1643 .collect();
1644 returned.sort_unstable();
1645 assert_eq!(returned, vec![1, 2, 3, 4, 5, 6, 7, 8]);
1646
1647 let rows = read_rows(&path);
1648 assert_eq!(rows.len(), 8);
1649
1650 // The in-process mutex serializes appends, so file order MUST equal
1651 // ts_seq order: row N (0-indexed) on disk has ts_seq = N+1.
1652 for (i, r) in rows.iter().enumerate() {
1653 assert_eq!(r.ts_seq, (i + 1) as u64, "ts_seq gap at file row {}", i + 1);
1654 }
1655 // Hash chain follows file order.
1656 assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1657 for i in 1..rows.len() {
1658 assert_eq!(
1659 rows[i].prev_hash,
1660 rows[i - 1].this_hash,
1661 "chain break at file row {}",
1662 i + 1
1663 );
1664 }
1665 for r in &rows {
1666 verify_this_hash(r);
1667 }
1668 }
1669
1670 #[test]
1671 fn corrupted_existing_log_fails_open() {
1672 let dir = TempDir::new().expect("tmp");
1673 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1674
1675 // JSON but not a valid LogRow: missing required fields, has unknown
1676 // field. `deny_unknown_fields` ensures the parser refuses.
1677 fs::write(&path, "{\"ts_seq\": 1, \"garbage\": true}\n").expect("write");
1678
1679 let err =
1680 ProvenanceLog::open(&path, TEST_SESSION_ID.to_string()).expect_err("must fail open");
1681 match err {
1682 LogError::Io(io) => {
1683 let msg = io.to_string();
1684 assert!(
1685 msg.contains("corrupted log at line 1"),
1686 "expected synthetic corruption message, got: {}",
1687 msg
1688 );
1689 }
1690 other => panic!("expected LogError::Io, got {:?}", other),
1691 }
1692 }
1693
1694 #[test]
1695 fn rejects_non_regular_file() {
1696 // Pointing the log at a directory must fail with NotARegularFile.
1697 let dir = TempDir::new().expect("tmp");
1698 let err = ProvenanceLog::open(tmp_dir_utf8(&dir), TEST_SESSION_ID.to_string())
1699 .expect_err("must fail");
1700 match err {
1701 LogError::NotARegularFile(_) => {}
1702 other => panic!("expected NotARegularFile, got {:?}", other),
1703 }
1704 }
1705
1706 #[test]
1707 fn canonical_json_excludes_this_hash_field() {
1708 // Spec contract: the hashed bytes do not include `this_hash`. If
1709 // this ever regresses, every previously-written log becomes
1710 // unverifiable.
1711 let rfh = RowForHash {
1712 ts: Utc::now(),
1713 ts_seq: 1,
1714 event: LogEvent::Fetch,
1715 ref_: None,
1716 source: None,
1717 result: LogResult::Ok,
1718 license: None,
1719 size_bytes: None,
1720 store_path: None,
1721 capability: Capability::Oa,
1722 session_id: TEST_SESSION_ID,
1723 error_code: None,
1724 schema_version: LOG_SCHEMA_VERSION,
1725 canonical_digest: None,
1726 prev_hash: GENESIS_HASH,
1727 };
1728 let bytes = canonical_json_for_hash(&rfh).expect("canonicalize");
1729 let s = std::str::from_utf8(&bytes).expect("utf8");
1730 assert!(!s.contains("this_hash"), "this_hash leaked into hash input");
1731 assert!(s.contains("\"prev_hash\":"));
1732 }
1733
1734 #[test]
1735 fn canonical_json_keys_are_lexicographically_sorted() {
1736 // PROVENANCE_LOG.md §4: canonical JSON uses keys sorted
1737 // lexicographically. The lex-first top-level key of a row is
1738 // `capability` ("c..." < "e..." < ...). Build a row and assert the
1739 // canonical bytes start with that key.
1740 let rfh = RowForHash {
1741 ts: Utc::now(),
1742 ts_seq: 1,
1743 event: LogEvent::Fetch,
1744 ref_: Some("10.1234/example"),
1745 source: Some("unpaywall"),
1746 result: LogResult::Ok,
1747 license: Some("CC-BY-4.0"),
1748 size_bytes: Some(1234),
1749 store_path: Some("papers/x.pdf"),
1750 capability: Capability::Oa,
1751 session_id: TEST_SESSION_ID,
1752 error_code: None,
1753 schema_version: LOG_SCHEMA_VERSION,
1754 canonical_digest: Some(
1755 "0000000000000000000000000000000000000000000000000000000000000000",
1756 ),
1757 prev_hash: GENESIS_HASH,
1758 };
1759 let bytes = canonical_json_for_hash(&rfh).expect("canonicalize");
1760 let s = std::str::from_utf8(&bytes).expect("utf8");
1761 // v2: lex-first key is `canonical_digest` (< `capability` because
1762 // 'n' < 'p' at byte index 2). Pre-v2 it was `capability`.
1763 assert!(
1764 s.starts_with("{\"canonical_digest\":"),
1765 "canonical bytes must start with lex-first v2 key, got: {}",
1766 s
1767 );
1768 // Spot-check ordering: `prev_hash` (p) must come before `ref` (r),
1769 // which must come before `result` (re...) — wait, "ref" < "result"
1770 // lexicographically because 'f' < 's' in ascii at index 2 vs 'e' at
1771 // index 2 of "result"... let me just check a couple of unambiguous
1772 // pairs: `event` < `prev_hash`, and `ts` < `ts_seq`.
1773 let event_idx = s.find("\"event\":").expect("event key present");
1774 let prev_idx = s.find("\"prev_hash\":").expect("prev_hash key present");
1775 assert!(event_idx < prev_idx, "event must precede prev_hash");
1776 let ts_idx = s.find("\"ts\":").expect("ts key present");
1777 let tsseq_idx = s.find("\"ts_seq\":").expect("ts_seq key present");
1778 assert!(ts_idx < tsseq_idx, "ts must precede ts_seq");
1779 }
1780
1781 // -----------------------------------------------------------------
1782 // verify() tests — Phase 1 surface for `doiget audit-log --verify`.
1783 // -----------------------------------------------------------------
1784
1785 /// Rewrite a single field's quoted-string value on a specific 1-based
1786 /// line of `path`. Used to simulate tampering. Panics on malformed input
1787 /// — only valid inputs are produced by the test harness.
1788 ///
1789 /// `field_key` is matched as `"field_key":"...old..."` (quoted string
1790 /// JSON value). The new value is the literal string `new_value` (no
1791 /// JSON escaping needed for the test fixtures we use).
1792 fn tamper_string_field(
1793 path: &Utf8Path,
1794 line_no_1based: usize,
1795 field_key: &str,
1796 new_value: &str,
1797 ) {
1798 let raw = fs::read_to_string(path).expect("read log");
1799 let mut lines: Vec<String> = raw.lines().map(str::to_string).collect();
1800 let target = &lines[line_no_1based - 1];
1801 let needle = format!("\"{field_key}\":\"");
1802 let start = target
1803 .find(&needle)
1804 .unwrap_or_else(|| panic!("field {field_key} not found on line {line_no_1based}"))
1805 + needle.len();
1806 let end_rel = target[start..]
1807 .find('"')
1808 .unwrap_or_else(|| panic!("unterminated string for field {field_key}"));
1809 let end = start + end_rel;
1810 let mut new_line = String::with_capacity(target.len());
1811 new_line.push_str(&target[..start]);
1812 new_line.push_str(new_value);
1813 new_line.push_str(&target[end..]);
1814 lines[line_no_1based - 1] = new_line;
1815 let mut out = lines.join("\n");
1816 out.push('\n');
1817 fs::write(path, out).expect("write tampered log");
1818 }
1819
1820 #[test]
1821 fn verify_empty_log_is_ok() {
1822 // Missing file is a clean log — no tampering possible on bytes that
1823 // don't exist. `verify` returns an empty report, not an error.
1824 let dir = TempDir::new().expect("tmp");
1825 let path = tmp_dir_utf8(&dir).join("nonexistent.jsonl");
1826 assert!(!path.exists(), "precondition: file must not exist");
1827
1828 let report = verify(&path).expect("verify must not error on missing file");
1829 assert_eq!(report.total_rows, 0);
1830 assert_eq!(report.ok_rows, 0);
1831 assert!(report.errors.is_empty(), "errors: {:?}", report.errors);
1832 }
1833
1834 #[test]
1835 fn verify_well_formed_chain_passes() {
1836 // Three rows written via the real writer must verify clean.
1837 let dir = TempDir::new().expect("tmp");
1838 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1839 let log = open_log(&path);
1840 for _ in 0..3 {
1841 log.append(empty_input()).expect("append");
1842 }
1843
1844 let report = verify(&path).expect("verify must succeed");
1845 assert_eq!(report.total_rows, 3);
1846 assert_eq!(report.ok_rows, 3);
1847 assert!(
1848 report.errors.is_empty(),
1849 "expected no issues on a well-formed log; got: {:?}",
1850 report.errors
1851 );
1852 }
1853
1854 #[test]
1855 fn verify_detects_tampered_row_hash() {
1856 // Mutate the SECOND row's `this_hash` to a syntactically-valid but
1857 // wrong hash. The recomputed canonical-JSON SHA-256 will not match.
1858 let dir = TempDir::new().expect("tmp");
1859 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1860 let log = open_log(&path);
1861 log.append(empty_input()).expect("append 1");
1862 log.append(empty_input()).expect("append 2");
1863 drop(log);
1864
1865 // 64 lowercase hex chars, all zeros — passes `LogRow` parse, fails hash check.
1866 tamper_string_field(
1867 &path,
1868 2,
1869 "this_hash",
1870 "0000000000000000000000000000000000000000000000000000000000000000",
1871 );
1872
1873 let report = verify(&path).expect("verify must succeed");
1874 assert_eq!(report.total_rows, 2);
1875 // Row 2's hash mismatch breaks both the hash check on row 2 AND the
1876 // chain link from row 2's stored `prev_hash` (still correct) into the
1877 // forward direction. There's no row 3 to fail forward, so we expect
1878 // exactly one issue: the this-hash mismatch on line 2.
1879 let hash_issues: Vec<_> = report
1880 .errors
1881 .iter()
1882 .filter(|e| e.kind == VerifyIssueKind::ThisHashMismatch)
1883 .collect();
1884 assert_eq!(
1885 hash_issues.len(),
1886 1,
1887 "expected exactly one ThisHashMismatch, got {:?}",
1888 report.errors
1889 );
1890 assert_eq!(hash_issues[0].line, 2);
1891 }
1892
1893 #[test]
1894 fn verify_detects_tampered_prev_hash() {
1895 // Mutate the SECOND row's `prev_hash` to a wrong value. This
1896 // invalidates the chain link but the row's own `this_hash` was
1897 // computed with the original `prev_hash`, so the this-hash check
1898 // ALSO fails (hash input changed). We assert at least the prev-hash
1899 // issue is reported on line 2.
1900 let dir = TempDir::new().expect("tmp");
1901 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1902 let log = open_log(&path);
1903 log.append(empty_input()).expect("append 1");
1904 log.append(empty_input()).expect("append 2");
1905 drop(log);
1906
1907 tamper_string_field(
1908 &path,
1909 2,
1910 "prev_hash",
1911 "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
1912 );
1913
1914 let report = verify(&path).expect("verify must succeed");
1915 assert_eq!(report.total_rows, 2);
1916 let prev_issues: Vec<_> = report
1917 .errors
1918 .iter()
1919 .filter(|e| e.kind == VerifyIssueKind::PrevHashMismatch)
1920 .collect();
1921 assert_eq!(
1922 prev_issues.len(),
1923 1,
1924 "expected exactly one PrevHashMismatch, got {:?}",
1925 report.errors
1926 );
1927 assert_eq!(prev_issues[0].line, 2);
1928 }
1929
1930 #[test]
1931 fn verify_detects_corrupted_json() {
1932 // One valid row plus a literal `{"garbage":true}` line. The garbage
1933 // line fails `serde_json::from_str::<LogRow>` (missing fields +
1934 // `deny_unknown_fields`) and surfaces as a `ParseError` on line 2.
1935 let dir = TempDir::new().expect("tmp");
1936 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1937 let log = open_log(&path);
1938 log.append(empty_input()).expect("append 1");
1939 drop(log);
1940
1941 // Append a garbage line directly.
1942 let mut existing = fs::read_to_string(&path).expect("read");
1943 if !existing.ends_with('\n') {
1944 existing.push('\n');
1945 }
1946 existing.push_str("{\"garbage\":true}\n");
1947 fs::write(&path, existing).expect("write");
1948
1949 let report = verify(&path).expect("verify must succeed");
1950 // total_rows counts non-empty lines, so both lines are counted.
1951 assert_eq!(report.total_rows, 2);
1952 let parse_issues: Vec<_> = report
1953 .errors
1954 .iter()
1955 .filter(|e| e.kind == VerifyIssueKind::ParseError)
1956 .collect();
1957 assert_eq!(
1958 parse_issues.len(),
1959 1,
1960 "expected exactly one ParseError, got {:?}",
1961 report.errors
1962 );
1963 assert_eq!(parse_issues[0].line, 2);
1964 }
1965
1966 #[test]
1967 fn capability_serializes_kebab_case() {
1968 // PROVENANCE_LOG.md §3 requires `oa`, `metadata`, `tdm-elsevier`,
1969 // `tdm-aps`, `tdm-springer` on the wire (kebab-case).
1970 let cases = [
1971 (Capability::Oa, "\"oa\""),
1972 (Capability::Metadata, "\"metadata\""),
1973 (Capability::TdmElsevier, "\"tdm-elsevier\""),
1974 (Capability::TdmAps, "\"tdm-aps\""),
1975 (Capability::TdmSpringer, "\"tdm-springer\""),
1976 ];
1977 for (cap, expected) in cases {
1978 let got = serde_json::to_string(&cap).expect("serialize");
1979 assert_eq!(
1980 got, expected,
1981 "capability wire format mismatch for {:?}",
1982 cap
1983 );
1984 }
1985 }
1986
1987 // -----------------------------------------------------------------
1988 // #140 — §6 rotation, retention, multi-segment verify.
1989 // -----------------------------------------------------------------
1990
1991 fn gunzip_to_string(gz: &Utf8Path) -> String {
1992 use std::io::Read;
1993 let f = std::fs::File::open(gz.as_std_path()).expect("open gz");
1994 let mut dec = GzDecoder::new(f);
1995 let mut s = String::new();
1996 dec.read_to_string(&mut s).expect("gunzip");
1997 s
1998 }
1999
2000 #[test]
2001 fn rotation_archives_to_gz_and_restarts_genesis_chain() {
2002 let dir = TempDir::new().expect("tmp");
2003 let path = tmp_dir_utf8(&dir).join("access.log");
2004 // Inject a tiny threshold (NOT a global env var — that raced
2005 // non-#[serial] tests): row 1 fits, so the SECOND append
2006 // (size>=50) rotates before it writes. A freshly rotated `.gz`
2007 // is not retention-aged, so the default prune at open is a no-op.
2008 let log = ProvenanceLog::open_with_rotate_threshold(&path, TEST_SESSION_ID.to_string(), 50)
2009 .expect("open");
2010 log.append(empty_input()).expect("append 1");
2011 let row1 = read_rows(&path);
2012 assert_eq!(row1.len(), 1);
2013 assert_eq!(row1[0].prev_hash, GENESIS_HASH);
2014
2015 log.append(empty_input()).expect("append 2 (rotates first)");
2016
2017 // Exactly one rotated segment; it gunzips to the original row 1.
2018 let segs = rotated_segments(&path);
2019 assert_eq!(segs.len(), 1, "one .gz segment expected; got {segs:?}");
2020 let archived: Vec<LogRow> = gunzip_to_string(&segs[0])
2021 .lines()
2022 .filter(|l| !l.is_empty())
2023 .map(|l| serde_json::from_str(l).expect("row"))
2024 .collect();
2025 assert_eq!(archived.len(), 1);
2026 assert_eq!(archived[0].this_hash, row1[0].this_hash);
2027
2028 // The fresh access.log restarts the chain at GENESIS, ts_seq 1.
2029 let cur = read_rows(&path);
2030 assert_eq!(cur.len(), 1, "fresh segment holds only the post-rotate row");
2031 assert_eq!(cur[0].prev_hash, GENESIS_HASH);
2032 assert_eq!(cur[0].ts_seq, 1);
2033
2034 // verify_all sees both segments, each its own clean chain.
2035 let reports = verify_all(&path).expect("verify_all");
2036 assert_eq!(reports.len(), 2, "rotated .gz + current");
2037 for (p, r) in &reports {
2038 assert!(r.errors.is_empty(), "segment {p} must verify clean: {r:?}");
2039 }
2040 }
2041
2042 #[test]
2043 fn rotate_log_is_fail_closed_on_missing_source() {
2044 // The append path propagates this via `?`, so a rotation failure
2045 // aborts the fetch (fail-closed) rather than silently continuing.
2046 let dir = TempDir::new().expect("tmp");
2047 let missing = tmp_dir_utf8(&dir).join("nope.log");
2048 let err = rotate_log(&missing).expect_err("missing source must error");
2049 assert!(matches!(err, LogError::Io(_)), "got {err:?}");
2050 }
2051
2052 #[test]
2053 #[serial_test::serial]
2054 fn prune_respects_retention_window_and_disable() {
2055 let dir = TempDir::new().expect("tmp");
2056 let base = tmp_dir_utf8(&dir);
2057 let path = base.join("access.log");
2058 let old_gz = base.join("access.log.2020-01-01-000000.gz");
2059 let new_gz = base.join("access.log.2999-01-01-000000.gz");
2060
2061 let mk = |p: &Utf8Path, aged: bool| {
2062 let f = std::fs::File::create(p.as_std_path()).expect("create gz");
2063 if aged {
2064 // 100 days ago — older than the 90-day default & a 1-day window.
2065 let when =
2066 std::time::SystemTime::now() - std::time::Duration::from_secs(100 * 86_400);
2067 f.set_modified(when).expect("set mtime");
2068 }
2069 };
2070
2071 // (a) days=0 disables pruning entirely.
2072 mk(&old_gz, true);
2073 std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "0");
2074 let _ = open_log(&path);
2075 assert!(old_gz.exists(), "days=0 must NOT prune");
2076
2077 // (b) days=1 prunes the aged segment, keeps a fresh one.
2078 mk(&new_gz, false);
2079 std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "1");
2080 let _ = open_log(&path);
2081 assert!(!old_gz.exists(), "aged segment must be pruned at days=1");
2082 assert!(new_gz.exists(), "fresh segment must survive");
2083
2084 std::env::remove_var("DOIGET_LOG_RETENTION_DAYS");
2085 }
2086
2087 #[test]
2088 #[serial_test::serial]
2089 fn retention_days_env_parsing() {
2090 std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "0");
2091 assert_eq!(retention_days(), 0);
2092 std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "30");
2093 assert_eq!(retention_days(), 30);
2094 std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "garbage");
2095 assert_eq!(retention_days(), DEFAULT_RETENTION_DAYS);
2096 std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "-5");
2097 assert_eq!(retention_days(), DEFAULT_RETENTION_DAYS);
2098 std::env::remove_var("DOIGET_LOG_RETENTION_DAYS");
2099 assert_eq!(retention_days(), DEFAULT_RETENTION_DAYS);
2100 }
2101
2102 #[test]
2103 fn verify_all_flags_tampered_segment_independently() {
2104 let dir = TempDir::new().expect("tmp");
2105 let path = tmp_dir_utf8(&dir).join("access.log");
2106 // Inject the tiny threshold (no global env → no cross-test race).
2107 let log = ProvenanceLog::open_with_rotate_threshold(&path, TEST_SESSION_ID.to_string(), 50)
2108 .expect("open");
2109 log.append(empty_input()).expect("append 1");
2110 log.append(empty_input()).expect("append 2 (rotates)");
2111 drop(log);
2112
2113 // Tamper the CURRENT segment's row: set this_hash to a
2114 // syntactically-valid 64-hex string that cannot be the SHA-256
2115 // of any row (all zeros). NOTE: the previous "flip the last char
2116 // to '0'" was a no-op ~1/16 of runs when the real hash already
2117 // ended in '0' (this_hash depends on `Utc::now()`), which is the
2118 // flake this fixes — mirrors `verify_detects_tampered_row_hash`.
2119 let mut cur = read_rows(&path);
2120 let mut bad = cur.remove(0);
2121 bad.this_hash =
2122 "0000000000000000000000000000000000000000000000000000000000000000".to_string();
2123 std::fs::write(
2124 path.as_std_path(),
2125 format!("{}\n", serde_json::to_string(&bad).expect("ser")),
2126 )
2127 .expect("rewrite tampered current");
2128
2129 let reports = verify_all(&path).expect("verify_all");
2130 assert_eq!(reports.len(), 2);
2131 // Oldest first = the rotated .gz (clean); current last (tampered).
2132 let (gz_path, gz_rep) = &reports[0];
2133 let (cur_path, cur_rep) = &reports[1];
2134 assert!(
2135 gz_path.as_str().ends_with(".gz") && gz_rep.errors.is_empty(),
2136 "rotated segment must stay clean: {gz_path} {gz_rep:?}"
2137 );
2138 assert!(
2139 cur_path.file_name() == Some("access.log") && !cur_rep.errors.is_empty(),
2140 "tampered current segment must report issues: {cur_path} {cur_rep:?}"
2141 );
2142 }
2143}