doiget_core/provenance.rs
1//! JSON Lines + SHA-256 hash-chained provenance log.
2//!
3//! Binding spec: `docs/PROVENANCE_LOG.md` (NORMATIVE, §3 row schema, §4 hash
4//! chain). Failure semantics: **fail-closed** — callers MUST abort the fetch
5//! if a log write returns `Err`. See `docs/SECURITY.md` §1.8 and ADR-0006.
6//!
7//! # On-disk format
8//!
9//! - JSON Lines (`.jsonl`): one JSON object per line, terminated by `\n` (LF).
10//! - UTF-8. Timestamps are RFC3339 in UTC.
11//! - Each row is appended via a single `write_all` whose payload always ends
12//! in `\n`, so a partially-written row is detectable as a missing trailing
13//! newline rather than a torn JSON record.
14//! - In audit-grade mode (the only mode shipped here), the writer flushes the
15//! `BufWriter` and `fsync`s the file after every row.
16//!
17//! # Hash chain (PROVENANCE_LOG.md §4)
18//!
19//! Each row carries a `prev_hash` and a `this_hash`. The first row's
20//! `prev_hash` is the literal string `"GENESIS"`. Every subsequent row's
21//! `prev_hash` MUST equal the previous row's `this_hash`.
22//!
23//! When a log file rotates (§6 — not yet implemented in this crate; see TODO
24//! below), the first row of the NEW log file also uses `prev_hash =
25//! "GENESIS"`, restarting the chain.
26//!
27//! `this_hash` is computed as:
28//!
29//! ```text
30//! this_hash = lower_hex(SHA-256(canonical_json(row \ {this_hash})))
31//! ```
32//!
33//! where `canonical_json` is **compact JSON (no whitespace) with object keys
34//! sorted lexicographically** (PROVENANCE_LOG.md §4). For a row with fields
35//! `{ts: "...", ts_seq: 1, event: "fetch", ...}`, the canonical bytes begin
36//! with `{"capability":...` because `capability` is the lex-first top-level
37//! key. Downstream `doiget audit-log --verify` (Phase 1+) relies on this
38//! exact rule — do not change the canonicalization without bumping the spec.
39//!
40//! # In-process serialization
41//!
42//! `ProvenanceLog` holds a `Mutex<LogState>`. All `append` calls within the
43//! same process serialize on this mutex, satisfying the "process-local mutex
44//! on log appender" requirement of `docs/SECURITY.md` §1.8. Cross-process
45//! coordination (multiple `doiget` invocations) is out of scope here and
46//! handled by the higher-level `flock`-based store layer.
47//!
48//! # Session id
49//!
50//! `session_id` (PROVENANCE_LOG.md §3) is a 26-char ULID generated **once per
51//! process invocation** by the caller and stamped into every row written
52//! through the resulting [`ProvenanceLog`]. This crate does not generate the
53//! ULID itself — see [`ProvenanceLog::open`] for the contract.
54//!
55//! # Log rotation and retention (§6)
56//!
57//! Implemented (PROVENANCE_LOG.md §6): when `access.log` exceeds
58//! `ROTATE_BYTES` (100 MiB) a subsequent [`ProvenanceLog::append`]
59//! gzip-compresses the full file to `access.log.<YYYY-MM-DD-HHMMSS>.gz`,
60//! removes the old `access.log`, and writes the incoming row as the
61//! first row of a fresh file with `prev_hash = "GENESIS"` (the hash
62//! chain **restarts** per segment — segments are NOT linked). Rotation
63//! is fail-closed: any gzip / rename / unlink failure aborts the
64//! `append` (the caller's fetch aborts) so the chain never silently
65//! skips. At [`ProvenanceLog::open`], rotated `.gz` segments older than
66//! the retention window (`DOIGET_LOG_RETENTION_DAYS`, default 90; `0`
67//! disables) are deleted **best-effort** (a prune failure is logged,
68//! not fatal — pruning is housekeeping, not integrity).
69//! [`verify_all`] verifies the current file plus every rotated `.gz`
70//! segment (each its own GENESIS-rooted chain).
71
72use std::collections::BTreeMap;
73use std::fs::{File, OpenOptions};
74use std::io::{BufRead, BufReader, BufWriter, Write};
75use std::sync::Mutex;
76
77use flate2::read::GzDecoder;
78use flate2::write::GzEncoder;
79use flate2::Compression;
80
81use camino::{Utf8Path, Utf8PathBuf};
82use chrono::{DateTime, Utc};
83use serde::{Deserialize, Serialize};
84use sha2::{Digest, Sha256};
85
86/// One row of the provenance log (PROVENANCE_LOG.md §3).
87///
88/// The on-disk wire field names match the spec table; struct-field order is
89/// **not** load-bearing for the hash because canonicalization sorts keys
90/// lexicographically (see PROVENANCE_LOG.md §4).
91///
92/// **Schema version**: this struct is the **v2** row shape (ADR-0024).
93/// Every v2 row carries `schema_version = "v2"` literally; the
94/// `canonical_digest` field carries the ADR-0021 §1 audit identity of
95/// the fetch on rows where one applies (`Fetch` / `Resolve` /
96/// `StoreWrite`) and is `None` on session bookend rows
97/// (`SessionStart` / `SessionEnd` / `CapabilityResolved`) that have no
98/// ref. v1 rows (pre-Slice-4) lack both fields and MUST be migrated via
99/// [`migrate_v1_to_v2`] before the v2 binary can read them — the
100/// `deny_unknown_fields` + non-defaulted `schema_version` shape ensures
101/// v1 rows fail to parse loudly rather than producing silent hash-chain
102/// mismatches.
103#[derive(Debug, Clone, Serialize, Deserialize)]
104#[serde(deny_unknown_fields)]
105pub struct LogRow {
106 /// RFC3339 UTC timestamp of the append (millisecond precision).
107 pub ts: DateTime<Utc>,
108 /// Per-session monotonic sequence number, starting at 1.
109 pub ts_seq: u64,
110 /// Event class (see [`LogEvent`]).
111 pub event: LogEvent,
112 /// Optional reference (DOI / arXiv id). Wire field name is `ref`.
113 #[serde(rename = "ref")]
114 pub ref_: Option<String>,
115 /// Optional source name (e.g. `unpaywall`).
116 pub source: Option<String>,
117 /// Result (see [`LogResult`]).
118 pub result: LogResult,
119 /// OA license string (`event=fetch`, `result=ok`); `None` otherwise.
120 pub license: Option<String>,
121 /// Bytes written / fetched, on success rows.
122 pub size_bytes: Option<u64>,
123 /// Path to the stored payload, relative to the store root
124 /// (`event=fetch`, `result=ok`); `None` otherwise.
125 pub store_path: Option<String>,
126 /// Capability under which the row was written (REQUIRED, every row).
127 pub capability: Capability,
128 /// 26-char ULID identifying the process invocation (REQUIRED).
129 pub session_id: String,
130 /// Stable error code on failure rows.
131 pub error_code: Option<String>,
132 /// Row schema version. Always [`LOG_SCHEMA_VERSION`] (`"v2"`) for
133 /// new rows written by this build (ADR-0024). v1 rows lack this
134 /// field; they MUST be migrated via [`migrate_v1_to_v2`] first.
135 pub schema_version: String,
136 /// Canonical-digest of the fetch's audit identity (ADR-0021 §1) as
137 /// 64 lowercase hex chars. Present on rows with a `ref` (`Fetch`,
138 /// `Resolve`, `StoreWrite`); `None` on session bookend rows. The
139 /// digest is computed from a [`crate::CanonicalRef`] whose
140 /// `resolver_profile` matches this row's `source` field for
141 /// migrated v1 rows; new v2 rows MAY pass an explicit
142 /// `resolver_profile` distinct from `source`.
143 pub canonical_digest: Option<String>,
144 /// 64 lowercase hex chars, OR the literal string `"GENESIS"` for the
145 /// first row of a fresh log file.
146 pub prev_hash: String,
147 /// 64 lowercase hex chars. SHA-256 of canonical JSON of THIS row with
148 /// the `this_hash` field removed. See module docs.
149 pub this_hash: String,
150}
151
152/// Provenance-log row schema version this build writes
153/// (`docs/PROVENANCE_LOG.md` §3, ADR-0024).
154///
155/// Bumped from `"v1"` (implicit; pre-Slice-4 rows had no
156/// `schema_version` field) to `"v2"` when the `canonical_digest` column
157/// landed. The v1→v2 migration is one-shot, idempotent, and dry-runnable
158/// via [`migrate_v1_to_v2`].
159pub const LOG_SCHEMA_VERSION: &str = "v2";
160
161/// Event class for a log row (PROVENANCE_LOG.md §3).
162///
163/// Note: result-status (`ok`/`err`/`denied`) lives in [`LogResult`], NOT in
164/// the event variant. So `Fetch` covers both successful and failed fetch
165/// attempts; the row's `result` distinguishes them.
166///
167/// `non_exhaustive` so adding new variants is non-breaking.
168#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
169#[serde(rename_all = "snake_case")]
170#[non_exhaustive]
171pub enum LogEvent {
172 /// Process started; first row of a new session.
173 SessionStart,
174 /// Capability resolution finished (allowed / denied / which env var).
175 CapabilityResolved,
176 /// Reference resolved to a fetch URL.
177 Resolve,
178 /// Fetch attempt (success or failure determined by `result`).
179 Fetch,
180 /// Store write attempt (success or failure determined by `result`).
181 StoreWrite,
182 /// Process ended cleanly.
183 SessionEnd,
184}
185
186/// Per-row outcome (PROVENANCE_LOG.md §3). `non_exhaustive` for forward
187/// compatibility.
188#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
189#[serde(rename_all = "snake_case")]
190#[non_exhaustive]
191pub enum LogResult {
192 /// The operation succeeded.
193 Ok,
194 /// The operation failed with an error.
195 Err,
196 /// The operation was denied (e.g. capability gate).
197 Denied,
198}
199
200/// Capability under which a row was written (PROVENANCE_LOG.md §3).
201///
202/// `kebab-case` serde rename emits `oa`, `metadata`, `tdm-elsevier`,
203/// `tdm-aps`, `tdm-springer` exactly as the spec requires. `non_exhaustive`
204/// for forward compatibility.
205#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
206#[serde(rename_all = "kebab-case")]
207#[non_exhaustive]
208pub enum Capability {
209 /// Open access tier.
210 Oa,
211 /// Metadata-only access.
212 Metadata,
213 /// Elsevier TDM (Tier 3, opt-in build).
214 TdmElsevier,
215 /// APS TDM (Tier 3, opt-in build).
216 TdmAps,
217 /// Springer TDM (Tier 3, opt-in build).
218 TdmSpringer,
219}
220
221/// Errors emitted by the provenance log writer. Callers MUST treat any
222/// variant as a fail-closed signal and abort the surrounding fetch.
223#[derive(Debug, thiserror::Error)]
224#[non_exhaustive]
225pub enum LogError {
226 /// I/O error opening, reading, writing, or syncing the log file. Includes
227 /// recovery-time corruption detection where the synthetic message is
228 /// `"corrupted log at line N: …"`.
229 #[error("provenance log io error: {0}")]
230 Io(#[from] std::io::Error),
231 /// Serialization of a row to canonical JSON failed.
232 #[error("provenance log serialization error: {0}")]
233 Serialize(#[from] serde_json::Error),
234 /// Path supplied to [`ProvenanceLog::open`] exists but is not a regular
235 /// file (e.g. a directory or symlink).
236 #[error("provenance log path is not a regular file: {0}")]
237 NotARegularFile(Utf8PathBuf),
238}
239
240/// Append-only writer with in-process serialization.
241#[derive(Debug)]
242pub struct ProvenanceLog {
243 path: Utf8PathBuf,
244 state: Mutex<LogState>,
245 session_id: String,
246 /// §6 rotation threshold, resolved ONCE at [`ProvenanceLog::open`]
247 /// (not per-`append`). Reading `DOIGET_LOG_ROTATE_BYTES` once at
248 /// open — rather than on every append — means a log opened without
249 /// the env set keeps the real 100 MiB threshold for its whole life
250 /// even if another (test) thread later mutates that process-global
251 /// env var; this removes a parallel-test race without serializing
252 /// every multi-append test. `0` = rotation disabled.
253 rotate_threshold: u64,
254}
255
256/// Mutable internal state, guarded by [`ProvenanceLog::state`].
257#[derive(Debug)]
258struct LogState {
259 /// `ts_seq` of the **next** row to be appended.
260 next_seq: u64,
261 /// 64 lowercase hex chars; [`GENESIS_HASH`] if the log is empty.
262 last_hash: String,
263}
264
265/// The genesis sentinel used as `prev_hash` for the first row of a log file
266/// (PROVENANCE_LOG.md §3, §6). Also written verbatim as the prev-hash of the
267/// first row after a log rotation (the chain restarts per segment).
268const GENESIS_HASH: &str = "GENESIS";
269
270/// Rotate `access.log` once it reaches this size (PROVENANCE_LOG.md §6:
271/// "100 MB"). 100 MiB. Overridable via the `DOIGET_LOG_ROTATE_BYTES`
272/// env var — an internal ops/testing knob (NOT a documented public
273/// surface): tests set it tiny to exercise rotation without writing
274/// 100 MiB; a value of `0` disables rotation.
275const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
276
277/// Default rotated-segment retention (PROVENANCE_LOG.md §6: "90 days").
278/// Overridable via `DOIGET_LOG_RETENTION_DAYS`; `0` disables pruning.
279const DEFAULT_RETENTION_DAYS: i64 = 90;
280
281/// Resolve the rotation threshold: `DOIGET_LOG_ROTATE_BYTES` if set and
282/// parseable, else [`ROTATE_BYTES`]. `0` (or unparsable) → returns the
283/// value as-is (`0` means "never rotate").
284fn rotate_threshold_bytes() -> u64 {
285 match std::env::var("DOIGET_LOG_ROTATE_BYTES") {
286 Ok(s) => s.trim().parse::<u64>().unwrap_or(ROTATE_BYTES),
287 Err(_) => ROTATE_BYTES,
288 }
289}
290
291/// Resolve retention days from `DOIGET_LOG_RETENTION_DAYS`
292/// (default [`DEFAULT_RETENTION_DAYS`]). `0` disables pruning. A
293/// negative / unparsable value falls back to the default with a warn.
294fn retention_days() -> i64 {
295 match std::env::var("DOIGET_LOG_RETENTION_DAYS") {
296 Ok(s) => match s.trim().parse::<i64>() {
297 Ok(n) if n >= 0 => n,
298 _ => {
299 tracing::warn!(
300 value = %s,
301 "DOIGET_LOG_RETENTION_DAYS is not a non-negative integer; \
302 using the {DEFAULT_RETENTION_DAYS}-day default"
303 );
304 DEFAULT_RETENTION_DAYS
305 }
306 },
307 Err(_) => DEFAULT_RETENTION_DAYS,
308 }
309}
310
311/// gzip-compress `path` to `<file_name>.<YYYY-MM-DD-HHMMSS>.gz` (in the
312/// same directory) and unlink `path` (PROVENANCE_LOG.md §6).
313///
314/// Atomic & fail-closed: the gzip is written to a `.tmp`, fsynced, then
315/// `rename`d into place (so a partial `.gz` is never observable), and
316/// only then is the original removed. Every step propagates its error
317/// to the caller (`ProvenanceLog::append`), which is fail-closed — a
318/// rotation failure aborts the surrounding fetch. Crash safety: a crash
319/// after the rename but before the unlink leaves both the full `.gz`
320/// and the (over-size) `access.log`; the next `append` simply rotates
321/// again, producing a second independently-valid segment — wasteful but
322/// never lossy or corrupt.
323fn rotate_log(path: &Utf8Path) -> Result<(), LogError> {
324 let file_name = path.file_name().ok_or_else(|| {
325 LogError::Io(std::io::Error::other(
326 "provenance log path has no file name; cannot rotate",
327 ))
328 })?;
329 let ts = Utc::now().format("%Y-%m-%d-%H%M%S");
330 let gz_name = format!("{file_name}.{ts}.gz");
331 let dir = path.parent().unwrap_or_else(|| Utf8Path::new("."));
332 let gz_path = dir.join(&gz_name);
333 let tmp_path = dir.join(format!("{gz_name}.tmp"));
334
335 {
336 let mut src = File::open(path)?;
337 let tmp = File::create(&tmp_path)?;
338 let mut enc = GzEncoder::new(BufWriter::new(tmp), Compression::default());
339 std::io::copy(&mut src, &mut enc)?;
340 let bufw = enc.finish()?;
341 let tmp = bufw.into_inner().map_err(|e| {
342 LogError::Io(std::io::Error::other(format!(
343 "gz tmp buf flush failed: {}",
344 e.error()
345 )))
346 })?;
347 tmp.sync_all()?;
348 }
349 std::fs::rename(&tmp_path, &gz_path)?;
350 std::fs::remove_file(path)?;
351 Ok(())
352}
353
354/// Rotated `.gz` segments siblings of `current`, sorted ascending. The
355/// embedded `YYYY-MM-DD-HHMMSS` timestamp makes lexicographic order ==
356/// chronological order.
357fn rotated_segments(current: &Utf8Path) -> Vec<Utf8PathBuf> {
358 let Some(file_name) = current.file_name() else {
359 return Vec::new();
360 };
361 let dir = current.parent().unwrap_or_else(|| Utf8Path::new("."));
362 let prefix = format!("{file_name}.");
363 let mut segs: Vec<Utf8PathBuf> = match std::fs::read_dir(dir.as_std_path()) {
364 Ok(rd) => rd
365 .filter_map(|e| e.ok())
366 .filter_map(|e| Utf8PathBuf::from_path_buf(e.path()).ok())
367 .filter(|p| {
368 p.file_name()
369 .map(|n| n.starts_with(&prefix) && n.ends_with(".gz"))
370 .unwrap_or(false)
371 })
372 .collect(),
373 Err(_) => Vec::new(),
374 };
375 segs.sort();
376 segs
377}
378
379/// Delete rotated `.gz` segments older than `days` (PROVENANCE_LOG.md
380/// §6 retention). `days <= 0` is a no-op (disabled). **Best-effort**:
381/// pruning is housekeeping, not integrity, so any failure is logged and
382/// skipped — `ProvenanceLog::open` still succeeds.
383fn prune_rotated_segments(current: &Utf8Path, days: i64) {
384 if days <= 0 {
385 return;
386 }
387 let Some(cutoff) = std::time::SystemTime::now()
388 .checked_sub(std::time::Duration::from_secs(days as u64 * 86_400))
389 else {
390 return;
391 };
392 for seg in rotated_segments(current) {
393 let aged = std::fs::metadata(seg.as_std_path())
394 .and_then(|m| m.modified())
395 .map(|mt| mt < cutoff)
396 .unwrap_or(false);
397 if !aged {
398 continue;
399 }
400 match std::fs::remove_file(seg.as_std_path()) {
401 Ok(()) => tracing::info!(
402 segment = %seg,
403 "provenance: pruned rotated segment past retention"
404 ),
405 Err(e) => tracing::warn!(
406 segment = %seg, error = %e,
407 "provenance: failed to prune rotated segment (best-effort; continuing)"
408 ),
409 }
410 }
411}
412
413/// Verify the full provenance history: every rotated `.gz` segment
414/// (oldest→newest) followed by the current `access.log`. Each segment
415/// is its own GENESIS-rooted hash chain (segments are deliberately NOT
416/// linked across a rotation, PROVENANCE_LOG.md §6), so they are
417/// verified independently and reported per-segment.
418///
419/// The audited [`verify`] function itself is unchanged; this only
420/// orchestrates it over the segment set (gunzipping each `.gz` to a
421/// tempfile first).
422///
423/// # Errors
424///
425/// [`LogError::Io`] on a gunzip / tempfile failure. A missing current
426/// `access.log` is not an error ([`verify`] reports it empty).
427pub fn verify_all(current: &Utf8Path) -> Result<Vec<(Utf8PathBuf, VerifyReport)>, LogError> {
428 let mut out = Vec::new();
429 for seg in rotated_segments(current) {
430 let gz = File::open(seg.as_std_path())?;
431 let mut dec = GzDecoder::new(gz);
432 let tmp = tempfile::NamedTempFile::new().map_err(|e| {
433 LogError::Io(std::io::Error::other(format!(
434 "verify_all: tempfile for {seg}: {e}"
435 )))
436 })?;
437 {
438 let mut w = File::create(tmp.path())?;
439 std::io::copy(&mut dec, &mut w)?;
440 w.sync_all()?;
441 }
442 let tmp_utf8 = Utf8Path::from_path(tmp.path()).ok_or_else(|| {
443 LogError::Io(std::io::Error::other("verify_all: non-utf8 tempfile path"))
444 })?;
445 let report = verify(tmp_utf8)?;
446 out.push((seg, report));
447 // `tmp` (and the gunzipped file) drop here, after verify.
448 }
449 let report = verify(current)?;
450 out.push((current.to_path_buf(), report));
451 Ok(out)
452}
453
454/// Caller-supplied fields for a row. The writer fills in `ts`, `ts_seq`,
455/// `session_id`, `prev_hash`, `this_hash`, and the literal
456/// `schema_version = "v2"` (`LOG_SCHEMA_VERSION`).
457///
458/// Callers SHOULD populate [`Self::canonical_digest`] on rows that have
459/// a meaningful audit identity (`Fetch` / `Resolve` / `StoreWrite` rows
460/// with a `ref`), leaving it `None` on session bookend rows. The digest
461/// is produced by [`crate::CanonicalRef::digest_hex`] from a
462/// `(source_type, source_id, resolver_profile, version)` tuple — see
463/// ADR-0021 §1 for the algorithm and ADR-0024 for the implementation
464/// surface.
465#[derive(Debug, Clone)]
466pub struct RowInput<'a> {
467 /// Event class.
468 pub event: LogEvent,
469 /// Result.
470 pub result: LogResult,
471 /// Capability under which the row is written (REQUIRED for every row).
472 pub capability: Capability,
473 /// Optional DOI / arXiv id.
474 pub ref_: Option<&'a str>,
475 /// Optional source name.
476 pub source: Option<&'a str>,
477 /// Optional error code on failure rows.
478 pub error_code: Option<&'a str>,
479 /// Optional payload size in bytes.
480 pub size_bytes: Option<u64>,
481 /// Optional OA license string (set on `event=fetch`, `result=ok`).
482 pub license: Option<&'a str>,
483 /// Optional store path relative to the store root (set on `event=fetch`,
484 /// `result=ok`).
485 pub store_path: Option<&'a str>,
486 /// Optional canonical-digest (ADR-0021 §1) as 64 lowercase hex
487 /// chars. `None` for session bookend / capability-resolution rows;
488 /// SHOULD be `Some` for `Fetch` / `Resolve` / `StoreWrite` rows
489 /// whose `source` field names the resolver. Build via
490 /// [`crate::Ref::promote`] + [`crate::CanonicalRef::digest_hex`].
491 pub canonical_digest: Option<&'a str>,
492}
493
494// ---------------------------------------------------------------------------
495// Canonical-JSON helper (PROVENANCE_LOG.md §4)
496//
497// Hashing rule (CRITICAL — this is the spec contract for `audit-log --verify`):
498//
499// this_hash = lower_hex(SHA-256(canonical_json(row \ {this_hash})))
500//
501// Canonical JSON = **compact (no whitespace), keys sorted lexicographically,
502// no trailing whitespace** (§4). Struct field order is deliberately NOT
503// load-bearing here; the canonicalizer sorts the resulting object keys via
504// `BTreeMap<String, Value>`, which serializes in lex-sorted key order.
505//
506// Worked example: for the row fragment `{ts_seq: 1, ts: "..."}` (input order),
507// the canonical bytes after lex sort are `{"ts":"...","ts_seq":1}` because
508// `"ts"` < `"ts_seq"` lexicographically. In v2 (ADR-0024) the lex-first
509// top-level key is `"canonical_digest"` — `"canonical_digest"` < `"capability"`
510// because 'n'(110) < 'p'(112) at byte index 2 (both share the `"ca"`
511// prefix). The pre-v2 lex-first key was `"capability"`.
512// ---------------------------------------------------------------------------
513
514/// Serializable shadow of [`LogRow`] **without** `this_hash`. Used solely as
515/// an intermediate to compute the canonical bytes that `this_hash` is the
516/// SHA-256 of. The wire key names match [`LogRow`]'s `serde` attributes.
517///
518/// v2 shape (ADR-0024): includes `schema_version` and
519/// `canonical_digest`. Both fields participate in the hash chain — a
520/// tampered `canonical_digest` is detected by `audit-log --verify`
521/// exactly like a tampered `ref` or `source` would be.
522#[derive(Serialize)]
523struct RowForHash<'a> {
524 ts: DateTime<Utc>,
525 ts_seq: u64,
526 event: LogEvent,
527 #[serde(rename = "ref")]
528 ref_: Option<&'a str>,
529 source: Option<&'a str>,
530 result: LogResult,
531 license: Option<&'a str>,
532 size_bytes: Option<u64>,
533 store_path: Option<&'a str>,
534 capability: Capability,
535 session_id: &'a str,
536 error_code: Option<&'a str>,
537 schema_version: &'a str,
538 canonical_digest: Option<&'a str>,
539 prev_hash: &'a str,
540}
541
542/// Produce canonical-JSON bytes for a row-without-hash, with object keys
543/// sorted lexicographically per PROVENANCE_LOG.md §4.
544///
545/// Implementation: serialize via `serde_json::to_value` to get a `Value`,
546/// require it be an object, then move its entries into a
547/// `BTreeMap<String, Value>` (which serializes with lex-sorted keys) and
548/// re-serialize compactly. No new dependency required.
549fn canonical_json_for_hash(rfh: &RowForHash<'_>) -> Result<Vec<u8>, LogError> {
550 let value = serde_json::to_value(rfh)?;
551 let map = match value {
552 serde_json::Value::Object(m) => m,
553 // RowForHash is always a struct, so this branch is unreachable in
554 // practice; surface as a serde error if it ever changes.
555 _ => {
556 return Err(LogError::Serialize(serde::de::Error::custom(
557 "RowForHash did not serialize to a JSON object",
558 )));
559 }
560 };
561 let sorted: BTreeMap<String, serde_json::Value> = map.into_iter().collect();
562 Ok(serde_json::to_vec(&sorted)?)
563}
564
565/// Compute `this_hash` for the given row-without-hash. Returns 64 lowercase
566/// hex chars.
567fn compute_this_hash(rfh: &RowForHash<'_>) -> Result<String, LogError> {
568 let bytes = canonical_json_for_hash(rfh)?;
569 let digest = Sha256::digest(&bytes);
570 Ok(hex::encode(digest))
571}
572
573impl ProvenanceLog {
574 /// Open or create the log at `path`, stamping every row with
575 /// `session_id`.
576 ///
577 /// `session_id` MUST be a 26-char ULID generated **once per process**
578 /// invocation by the caller. Re-opening the log within the same process
579 /// reuses the same `session_id`; re-opening in a new process gets a new
580 /// one. This crate intentionally does NOT generate the ULID itself —
581 /// callers are responsible for creating one (e.g. via the `ulid` crate
582 /// already present in the workspace) and threading it through.
583 ///
584 /// If the file exists, scan it once to recover the last `ts_seq` and
585 /// `this_hash`. If the file is missing or empty, the first row will use
586 /// `prev_hash = "GENESIS"` and `ts_seq = 1`.
587 ///
588 /// # Errors
589 ///
590 /// Returns [`LogError::Io`] for I/O failures or if any line fails to
591 /// parse as a [`LogRow`] (synthetic message: `"corrupted log at line N: …"`).
592 /// The writer never silently truncates a corrupt log.
593 ///
594 /// Returns [`LogError::NotARegularFile`] if `path` exists but is not a
595 /// regular file (e.g. a directory).
596 pub fn open(path: impl Into<Utf8PathBuf>, session_id: String) -> Result<Self, LogError> {
597 // Production path: the §6 threshold comes from
598 // `DOIGET_LOG_ROTATE_BYTES` (default 100 MiB), resolved ONCE here.
599 Self::open_with_rotate_threshold(path, session_id, rotate_threshold_bytes())
600 }
601
602 /// [`open`](Self::open) with an explicit rotation threshold instead
603 /// of reading `DOIGET_LOG_ROTATE_BYTES`.
604 ///
605 /// This exists so the rotation tests inject a tiny threshold WITHOUT
606 /// mutating the process-global env var: a global env knob raced
607 /// non-`#[serial]` tests (a concurrent test's `open` would cache the
608 /// tiny threshold and spuriously rotate). `#[serial]` only
609 /// serializes `#[serial]` tests, so injection — not serialization —
610 /// is the robust fix. `0` disables rotation.
611 pub(crate) fn open_with_rotate_threshold(
612 path: impl Into<Utf8PathBuf>,
613 session_id: String,
614 rotate_threshold: u64,
615 ) -> Result<Self, LogError> {
616 let path: Utf8PathBuf = path.into();
617
618 // Reject obvious non-files up front so later `OpenOptions::append`
619 // doesn't produce a confusing platform-dependent error.
620 if path.exists() {
621 let md = std::fs::metadata(&path)?;
622 if !md.is_file() {
623 return Err(LogError::NotARegularFile(path));
624 }
625 }
626
627 let (next_seq, last_hash) = recover_state(&path)?;
628
629 // §6 retention: prune rotated `.gz` segments older than the
630 // window. Best-effort — pruning is housekeeping, not integrity,
631 // so a failure is logged and `open` still succeeds (unlike
632 // rotation, which is fail-closed).
633 prune_rotated_segments(&path, retention_days());
634
635 Ok(Self {
636 path,
637 state: Mutex::new(LogState {
638 next_seq,
639 last_hash,
640 }),
641 session_id,
642 rotate_threshold,
643 })
644 }
645
646 /// Append a row. Computes `prev_hash`, `ts_seq`, `ts`, `session_id`, and
647 /// `this_hash`; the caller only supplies the semantic fields via
648 /// [`RowInput`].
649 ///
650 /// Returns the assigned `ts_seq` on success.
651 ///
652 /// # Errors
653 ///
654 /// Returns [`LogError`] on serialization, I/O, or fsync failure. Callers
655 /// MUST treat this as fail-closed and abort the surrounding fetch.
656 pub fn append(&self, input: RowInput<'_>) -> Result<u64, LogError> {
657 // Hold the mutex for the entire append: serialize + write + flush +
658 // fsync + state update. This is the in-process serialization point
659 // promised by `docs/SECURITY.md` §1.8.
660 //
661 // A poisoned mutex only happens if a previous `append` panicked
662 // mid-write. Surface that as an I/O error rather than propagating
663 // a panic.
664 let mut state = self
665 .state
666 .lock()
667 .map_err(|_| LogError::Io(std::io::Error::other("provenance log mutex poisoned")))?;
668
669 // §6 rotation, BEFORE this row is written. If `access.log` has
670 // reached the threshold, gzip+rename it and reset the in-memory
671 // chain state so this row becomes the GENESIS-rooted first row of
672 // a fresh file. Fail-closed: a rotation error aborts the append
673 // (the `?`), so the caller's fetch aborts and the chain never
674 // silently continues in an over-size or half-rotated file. The
675 // `state` mutex is held, so rotation is serialized with appends.
676 let threshold = self.rotate_threshold;
677 if threshold > 0 {
678 let size = match std::fs::metadata(&self.path) {
679 Ok(m) => m.len(),
680 Err(e) if e.kind() == std::io::ErrorKind::NotFound => 0,
681 Err(e) => return Err(LogError::Io(e)),
682 };
683 if size >= threshold {
684 rotate_log(&self.path)?;
685 state.next_seq = 1;
686 state.last_hash = GENESIS_HASH.to_string();
687 }
688 }
689
690 let ts_seq = state.next_seq;
691 let prev_hash = state.last_hash.clone();
692 let ts = Utc::now();
693
694 let rfh = RowForHash {
695 ts,
696 ts_seq,
697 event: input.event,
698 ref_: input.ref_,
699 source: input.source,
700 result: input.result,
701 license: input.license,
702 size_bytes: input.size_bytes,
703 store_path: input.store_path,
704 capability: input.capability,
705 session_id: &self.session_id,
706 error_code: input.error_code,
707 schema_version: LOG_SCHEMA_VERSION,
708 canonical_digest: input.canonical_digest,
709 prev_hash: &prev_hash,
710 };
711
712 let this_hash = compute_this_hash(&rfh)?;
713
714 // Build the on-disk row. Owned strings here because `LogRow` does
715 // not borrow.
716 let row = LogRow {
717 ts,
718 ts_seq,
719 event: input.event,
720 ref_: input.ref_.map(str::to_string),
721 source: input.source.map(str::to_string),
722 result: input.result,
723 license: input.license.map(str::to_string),
724 size_bytes: input.size_bytes,
725 store_path: input.store_path.map(str::to_string),
726 capability: input.capability,
727 session_id: self.session_id.clone(),
728 error_code: input.error_code.map(str::to_string),
729 schema_version: LOG_SCHEMA_VERSION.to_string(),
730 canonical_digest: input.canonical_digest.map(str::to_string),
731 prev_hash,
732 this_hash: this_hash.clone(),
733 };
734
735 // Serialize, append `\n`, write_all in one syscall, flush BufWriter,
736 // fsync the underlying file. `\n` is part of the same buffer, so a
737 // crash mid-write leaves at most a partial line (no trailing `\n`),
738 // which is detectable on recovery as a corrupted final line.
739 let mut bytes = serde_json::to_vec(&row)?;
740 bytes.push(b'\n');
741
742 let file = OpenOptions::new()
743 .create(true)
744 .append(true)
745 .open(&self.path)?;
746 let mut writer = BufWriter::new(file);
747 writer.write_all(&bytes)?;
748 writer.flush()?;
749 // `into_inner` to recover the underlying File for `sync_all`.
750 let file = writer.into_inner().map_err(|e| {
751 LogError::Io(std::io::Error::other(format!(
752 "buf writer flush failed: {}",
753 e.error()
754 )))
755 })?;
756 file.sync_all()?;
757
758 // Only after a successful fsync do we advance the in-memory state.
759 // If any of the above fails, the next `append` retries from the
760 // same `(ts_seq, prev_hash)` — at most a torn last line on disk.
761 state.next_seq = ts_seq + 1;
762 state.last_hash = this_hash;
763
764 Ok(ts_seq)
765 }
766
767 /// Returns the path the log was opened at. Useful for tests and audit tooling.
768 pub fn path(&self) -> &Utf8Path {
769 &self.path
770 }
771
772 /// Returns the session id stamped into every row written through this
773 /// writer.
774 pub fn session_id(&self) -> &str {
775 &self.session_id
776 }
777}
778
779/// Scan an existing log to recover `(next_seq, last_hash)`.
780///
781/// Walk every line, parse as [`LogRow`], track the last successfully parsed
782/// row. If parsing fails, return [`LogError::Io`] with a synthetic
783/// `"corrupted log at line N: …"` message — never silently truncate.
784fn recover_state(path: &Utf8Path) -> Result<(u64, String), LogError> {
785 let file = match File::open(path) {
786 Ok(f) => f,
787 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
788 return Ok((1, GENESIS_HASH.to_string()));
789 }
790 Err(e) => return Err(LogError::Io(e)),
791 };
792
793 let reader = BufReader::new(file);
794 let mut last_seq: u64 = 0;
795 let mut last_hash: String = GENESIS_HASH.to_string();
796
797 for (idx, line_res) in reader.lines().enumerate() {
798 let line_no = idx + 1;
799 let line = line_res?;
800 if line.is_empty() {
801 // Tolerate trailing/empty lines silently — they are not data.
802 continue;
803 }
804 let row: LogRow = serde_json::from_str(&line).map_err(|e| {
805 LogError::Io(std::io::Error::new(
806 std::io::ErrorKind::InvalidData,
807 format!("corrupted log at line {}: {}", line_no, e),
808 ))
809 })?;
810 last_seq = row.ts_seq;
811 last_hash = row.this_hash;
812 }
813
814 if last_seq == 0 {
815 Ok((1, GENESIS_HASH.to_string()))
816 } else {
817 Ok((last_seq + 1, last_hash))
818 }
819}
820
821// ---------------------------------------------------------------------------
822// Verification (`doiget audit-log --verify`)
823//
824// The provenance log is a JSON Lines file with a SHA-256 hash chain
825// (PROVENANCE_LOG.md §4). Tampering is detected by recomputing every row's
826// `this_hash` and validating the chain. This module provides the offline
827// verifier; the CLI wrapper lives in `doiget-cli::commands::audit_log`.
828//
829// Failure model: returning `Err` is reserved for I/O failures opening / reading
830// the file. Per-row issues (parse failures, hash/chain mismatches, sequence
831// regressions) are accumulated into [`VerifyReport::errors`] so callers can
832// report them all in one pass — this is the contract Phase 1 ships.
833// ---------------------------------------------------------------------------
834
835/// Outcome of [`verify`]: per-row chain status across the entire log.
836#[derive(Debug, Clone)]
837#[non_exhaustive]
838pub struct VerifyReport {
839 /// Total non-empty lines processed (1-based count).
840 pub total_rows: usize,
841 /// Rows whose hash, chain link, and `ts_seq` all validated.
842 pub ok_rows: usize,
843 /// Issues encountered, in encounter order. Line numbers are 1-based.
844 pub errors: Vec<VerifyIssue>,
845}
846
847impl VerifyReport {
848 /// An empty, all-clear report — used when the log file is absent.
849 fn empty() -> Self {
850 Self {
851 total_rows: 0,
852 ok_rows: 0,
853 errors: Vec::new(),
854 }
855 }
856}
857
858/// A single issue discovered by [`verify`].
859#[derive(Debug, Clone)]
860#[non_exhaustive]
861pub struct VerifyIssue {
862 /// 1-based line number where the issue was detected.
863 pub line: usize,
864 /// Classification of the issue (see [`VerifyIssueKind`]).
865 pub kind: VerifyIssueKind,
866 /// Human-readable description (caller may format for stderr/stdout).
867 pub message: String,
868}
869
870/// Classification of a [`VerifyIssue`]. `non_exhaustive` for forward
871/// compatibility — future kinds may include `SessionIdChange`, etc.
872#[derive(Debug, Clone, Copy, PartialEq, Eq)]
873#[non_exhaustive]
874pub enum VerifyIssueKind {
875 /// Row failed to parse as [`LogRow`] (corrupted JSON or unknown field).
876 ParseError,
877 /// `prev_hash` did not match the previous row's `this_hash` (or the
878 /// genesis sentinel on row 1).
879 PrevHashMismatch,
880 /// Row's stored `this_hash` did not match the recomputed canonical-JSON
881 /// SHA-256.
882 ThisHashMismatch,
883 /// `ts_seq` did not increase strictly monotonically (within a session;
884 /// see PROVENANCE_LOG.md §3 + §6 — chain restarts after rotation are
885 /// permitted to reset `ts_seq` and are detected via the genesis sentinel).
886 SequenceJump,
887}
888
889/// Verify the entire log file at `path`.
890///
891/// Returns `Ok(VerifyReport)` regardless of whether the chain validates;
892/// callers inspect `report.errors.is_empty()` to determine pass/fail.
893/// Returns `Err` only when the file itself cannot be opened or read at the
894/// I/O level.
895///
896/// Behavior:
897///
898/// - A missing file is treated as a clean, empty log (no tampering possible
899/// on bytes that don't exist) and returns an empty report after a `warn!`.
900/// - Empty / blank lines are skipped — they are not data per the writer's
901/// on-disk format (PROVENANCE_LOG.md §2).
902/// - On a row that fails to parse as [`LogRow`], a `ParseError` is recorded
903/// and verification continues on the next line. The chain anchor does NOT
904/// advance through an unparsable row, so the next valid row's `prev_hash`
905/// is checked against the last successfully parsed row (or against
906/// `"GENESIS"` if no valid row has been seen yet).
907/// - A `prev_hash == "GENESIS"` sentinel marks a chain restart (first row of
908/// a fresh / rotated log per §6) and resets the `ts_seq` monotonicity
909/// anchor — `ts_seq` is NOT compared to the prior row across a restart.
910pub fn verify(path: &Utf8Path) -> Result<VerifyReport, LogError> {
911 let file = match File::open(path) {
912 Ok(f) => f,
913 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
914 tracing::warn!(
915 path = %path,
916 "audit-log verify: log file does not exist; reporting empty"
917 );
918 return Ok(VerifyReport::empty());
919 }
920 Err(e) => return Err(LogError::Io(e)),
921 };
922
923 let reader = BufReader::new(file);
924 let mut report = VerifyReport::empty();
925
926 // Anchor for the chain check: the LAST SUCCESSFULLY PARSED row. The chain
927 // is anchored to the bytes on disk, not to a hypothetical "should have
928 // been". This matches the spec — tampering at row N must surface both as
929 // a hash mismatch on N and as a chain break on N+1.
930 let mut prev_row: Option<LogRow> = None;
931
932 for (idx, line_res) in reader.lines().enumerate() {
933 let line_no = idx + 1;
934 let line = line_res?;
935 if line.is_empty() {
936 continue;
937 }
938
939 report.total_rows += 1;
940
941 let row: LogRow = match serde_json::from_str(&line) {
942 Ok(r) => r,
943 Err(e) => {
944 report.errors.push(VerifyIssue {
945 line: line_no,
946 kind: VerifyIssueKind::ParseError,
947 message: format!("failed to parse row as LogRow: {e}"),
948 });
949 // Chain anchor cannot advance through an unparsable row;
950 // leave `prev_row` untouched so the next valid row's
951 // `prev_hash` is checked against the last-known anchor (or
952 // GENESIS if we never had one).
953 continue;
954 }
955 };
956
957 let mut row_ok = true;
958
959 // 1. Recompute `this_hash` from canonical JSON (row \ {this_hash}).
960 let rfh = RowForHash {
961 ts: row.ts,
962 ts_seq: row.ts_seq,
963 event: row.event,
964 ref_: row.ref_.as_deref(),
965 source: row.source.as_deref(),
966 result: row.result,
967 license: row.license.as_deref(),
968 size_bytes: row.size_bytes,
969 store_path: row.store_path.as_deref(),
970 capability: row.capability,
971 session_id: &row.session_id,
972 error_code: row.error_code.as_deref(),
973 schema_version: &row.schema_version,
974 canonical_digest: row.canonical_digest.as_deref(),
975 prev_hash: &row.prev_hash,
976 };
977 match compute_this_hash(&rfh) {
978 Ok(recomputed) => {
979 if recomputed != row.this_hash {
980 report.errors.push(VerifyIssue {
981 line: line_no,
982 kind: VerifyIssueKind::ThisHashMismatch,
983 message: format!(
984 "this_hash mismatch: stored={}, recomputed={}",
985 row.this_hash, recomputed
986 ),
987 });
988 row_ok = false;
989 }
990 }
991 Err(e) => {
992 // Canonicalization itself failed — surface as a hash
993 // mismatch with the underlying error in the message.
994 report.errors.push(VerifyIssue {
995 line: line_no,
996 kind: VerifyIssueKind::ThisHashMismatch,
997 message: format!("failed to recompute this_hash: {e}"),
998 });
999 row_ok = false;
1000 }
1001 }
1002
1003 // 2. Chain link: `prev_hash` matches anchor (GENESIS on row 1 / after
1004 // a chain restart, prior row's `this_hash` otherwise).
1005 let is_genesis = row.prev_hash == GENESIS_HASH;
1006 match &prev_row {
1007 None => {
1008 // First non-empty row in the file: must declare GENESIS.
1009 if !is_genesis {
1010 report.errors.push(VerifyIssue {
1011 line: line_no,
1012 kind: VerifyIssueKind::PrevHashMismatch,
1013 message: format!(
1014 "first row must have prev_hash=\"GENESIS\", got {:?}",
1015 row.prev_hash
1016 ),
1017 });
1018 row_ok = false;
1019 }
1020 }
1021 Some(prev) => {
1022 if is_genesis {
1023 // Chain restart (rotation per §6) — accepted, no link
1024 // check, and the `ts_seq` monotonicity anchor resets
1025 // (handled below via `is_genesis`).
1026 } else if row.prev_hash != prev.this_hash {
1027 report.errors.push(VerifyIssue {
1028 line: line_no,
1029 kind: VerifyIssueKind::PrevHashMismatch,
1030 message: format!(
1031 "prev_hash mismatch: row stores {}, previous row's this_hash is {}",
1032 row.prev_hash, prev.this_hash
1033 ),
1034 });
1035 row_ok = false;
1036 }
1037 }
1038 }
1039
1040 // 3. ts_seq monotonicity — strictly greater than the previous row's
1041 // `ts_seq`, EXCEPT across a chain restart (where `ts_seq` resets).
1042 if let Some(prev) = &prev_row {
1043 if !is_genesis && row.ts_seq <= prev.ts_seq {
1044 report.errors.push(VerifyIssue {
1045 line: line_no,
1046 kind: VerifyIssueKind::SequenceJump,
1047 message: format!(
1048 "ts_seq did not increase strictly: previous={}, current={}",
1049 prev.ts_seq, row.ts_seq
1050 ),
1051 });
1052 row_ok = false;
1053 }
1054 }
1055
1056 if row_ok {
1057 report.ok_rows += 1;
1058 }
1059
1060 // Advance the anchor to the just-parsed row (whether or not it had
1061 // issues — the on-disk bytes ARE the chain).
1062 prev_row = Some(row);
1063 }
1064
1065 Ok(report)
1066}
1067
1068// ---------------------------------------------------------------------------
1069// v1 → v2 migration (ADR-0024, `docs/PROVENANCE_LOG.md` §"Schema migration").
1070//
1071// v1 rows lack `schema_version` and `canonical_digest`; the v2 binary
1072// fails loudly when asked to read them (see `recover_state` /
1073// `verify`). The migration recovers a v2 log from a v1 file by:
1074//
1075// 1. Parsing every v1 row via the [`V1LogRow`] shadow struct.
1076// 2. Deriving a [`crate::CanonicalRef`] from the v1 `(ref, source)`
1077// pair — `source` becomes `resolver_profile`, `version` is `None`
1078// (ADR-0021 §1 → ADR-0024 migration recipe).
1079// 3. Re-computing the SHA-256 hash chain across the new row
1080// payloads. The v1 chain is invalidated by the schema change; the
1081// v2 chain restarts at the first row's stored `prev_hash` (which
1082// is `"GENESIS"` on a fresh log).
1083// 4. Writing the new rows to `<log_path>.v2-migrated`, then
1084// atomically renaming it onto `<log_path>` after backing up the
1085// original to `<log_path>.v1-backup`.
1086//
1087// The migration is **idempotent**: running it on an already-v2 log
1088// re-parses every row as v2, recomputes the same hash chain, and
1089// produces a byte-equivalent output.
1090//
1091// The migration is **dry-runnable**: `dry_run = true` returns a
1092// [`MigrationReport`] summarizing what would change without touching
1093// disk.
1094// ---------------------------------------------------------------------------
1095
1096/// Summary of a [`migrate_v1_to_v2`] run.
1097///
1098/// Marked `#[non_exhaustive]` so future fields (e.g. a per-row error
1099/// list, an aborted-row count) can be added without breaking callers
1100/// that pattern-match.
1101///
1102/// `Serialize` enables `provenance migrate --mode json` (#204) — the
1103/// wire form is `{"rows_rewritten": N, "dry_run": bool,
1104/// "first_row_v1_chain_hash": "...", "first_row_v2_chain_hash": "..."}`.
1105///
1106/// # Wire-format stability (post-#208 self-review §1)
1107///
1108/// Once a release ships with the [`Serialize`] derive, the field
1109/// **names** below become part of the public API. Renaming a field is
1110/// then a semver minor bump and warrants a CHANGELOG \[BREAKING\] note;
1111/// new fields are still safe (per `#[non_exhaustive]`).
1112#[derive(Debug, Clone, Serialize)]
1113#[non_exhaustive]
1114pub struct MigrationReport {
1115 /// Number of rows rewritten (or that WOULD be rewritten under
1116 /// `dry_run`).
1117 pub rows_rewritten: u64,
1118 /// Whether this was a dry-run preview (`true`) or a live rewrite
1119 /// (`false`).
1120 pub dry_run: bool,
1121 /// Stored `this_hash` of the first input row (the v1 chain anchor).
1122 /// `"GENESIS"` is reported as the literal `"GENESIS"` when the log
1123 /// was empty.
1124 pub first_row_v1_chain_hash: String,
1125 /// Recomputed `this_hash` of the first migrated row under the v2
1126 /// canonicalization. Equal to [`Self::first_row_v1_chain_hash`]
1127 /// only if the input was already v2 (idempotent case).
1128 pub first_row_v2_chain_hash: String,
1129}
1130
1131/// v1 row shadow struct used ONLY by [`migrate_v1_to_v2`]. The
1132/// non-defaulted v2 fields (`schema_version`, `canonical_digest`) are
1133/// absent here; `deny_unknown_fields` rejects unexpected v2 fields so a
1134/// v2 row on disk fails to parse as v1, letting the migrator detect
1135/// already-v2 input via fallback to the v2 parser.
1136#[derive(Debug, Clone, Deserialize, Serialize)]
1137#[serde(deny_unknown_fields)]
1138struct V1LogRow {
1139 ts: DateTime<Utc>,
1140 ts_seq: u64,
1141 event: LogEvent,
1142 #[serde(rename = "ref")]
1143 ref_: Option<String>,
1144 source: Option<String>,
1145 result: LogResult,
1146 license: Option<String>,
1147 size_bytes: Option<u64>,
1148 store_path: Option<String>,
1149 capability: Capability,
1150 session_id: String,
1151 error_code: Option<String>,
1152 prev_hash: String,
1153 this_hash: String,
1154}
1155
1156/// Minimal in-memory representation a v1 OR v2 row can be promoted to
1157/// before re-hashing.
1158#[derive(Debug, Clone)]
1159struct MigrationRowSeed {
1160 ts: DateTime<Utc>,
1161 ts_seq: u64,
1162 event: LogEvent,
1163 ref_: Option<String>,
1164 source: Option<String>,
1165 result: LogResult,
1166 license: Option<String>,
1167 size_bytes: Option<u64>,
1168 store_path: Option<String>,
1169 capability: Capability,
1170 session_id: String,
1171 error_code: Option<String>,
1172 /// `None` for v1 inputs (the digest is computed during migration);
1173 /// `Some(...)` for already-v2 inputs (carried through verbatim for
1174 /// idempotency).
1175 canonical_digest_in: Option<String>,
1176 /// As stored on disk in the input. Used only for the
1177 /// `first_row_v1_chain_hash` field of [`MigrationReport`].
1178 stored_this_hash: String,
1179}
1180
1181/// Migrate a v1 provenance log to v2 (ADR-0024).
1182///
1183/// Returns a [`MigrationReport`] describing how many rows were (or
1184/// would be) rewritten and the first-row chain-anchor delta. The
1185/// migration is idempotent: running it twice produces byte-equivalent
1186/// output the second time.
1187///
1188/// On a missing log file, returns a no-op report (`rows_rewritten = 0`,
1189/// `first_row_v1_chain_hash = "GENESIS"`, `first_row_v2_chain_hash =
1190/// "GENESIS"`) — there is nothing to migrate.
1191///
1192/// # Errors
1193///
1194/// Returns [`LogError::Io`] on I/O failures and on rows that fail to
1195/// parse as either v1 or v2 (the synthetic message names the line
1196/// number). Returns [`LogError::Serialize`] on canonicalization
1197/// failures.
1198pub fn migrate_v1_to_v2(log_path: &Utf8Path, dry_run: bool) -> Result<MigrationReport, LogError> {
1199 use std::io::BufRead;
1200
1201 // -- 1. Read the input log, parsing each line as v1 OR (idempotent
1202 // fallback) v2. --------------------------------------------------
1203 let file = match File::open(log_path) {
1204 Ok(f) => f,
1205 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1206 return Ok(MigrationReport {
1207 rows_rewritten: 0,
1208 dry_run,
1209 first_row_v1_chain_hash: GENESIS_HASH.to_string(),
1210 first_row_v2_chain_hash: GENESIS_HASH.to_string(),
1211 });
1212 }
1213 Err(e) => return Err(LogError::Io(e)),
1214 };
1215 let reader = BufReader::new(file);
1216 let mut seeds: Vec<MigrationRowSeed> = Vec::new();
1217
1218 for (idx, line_res) in reader.lines().enumerate() {
1219 let line_no = idx + 1;
1220 let line = line_res?;
1221 if line.is_empty() {
1222 continue;
1223 }
1224 // Try v1 first. If it fails, try v2 (idempotency: re-migrating
1225 // a v2 log MUST succeed and produce equivalent output).
1226 let seed = if let Ok(v1) = serde_json::from_str::<V1LogRow>(&line) {
1227 MigrationRowSeed {
1228 ts: v1.ts,
1229 ts_seq: v1.ts_seq,
1230 event: v1.event,
1231 ref_: v1.ref_,
1232 source: v1.source,
1233 result: v1.result,
1234 license: v1.license,
1235 size_bytes: v1.size_bytes,
1236 store_path: v1.store_path,
1237 capability: v1.capability,
1238 session_id: v1.session_id,
1239 error_code: v1.error_code,
1240 canonical_digest_in: None,
1241 stored_this_hash: v1.this_hash,
1242 }
1243 } else {
1244 match serde_json::from_str::<LogRow>(&line) {
1245 Ok(v2) => MigrationRowSeed {
1246 ts: v2.ts,
1247 ts_seq: v2.ts_seq,
1248 event: v2.event,
1249 ref_: v2.ref_,
1250 source: v2.source,
1251 result: v2.result,
1252 license: v2.license,
1253 size_bytes: v2.size_bytes,
1254 store_path: v2.store_path,
1255 capability: v2.capability,
1256 session_id: v2.session_id,
1257 error_code: v2.error_code,
1258 canonical_digest_in: v2.canonical_digest,
1259 stored_this_hash: v2.this_hash,
1260 },
1261 Err(e) => {
1262 return Err(LogError::Io(std::io::Error::new(
1263 std::io::ErrorKind::InvalidData,
1264 format!("migration: line {line_no} is neither v1 nor v2: {e}"),
1265 )));
1266 }
1267 }
1268 };
1269 seeds.push(seed);
1270 }
1271
1272 // -- 2. Derive `canonical_digest` for each seed that lacks one. ------
1273 //
1274 // For v1 rows: build a CanonicalRef from
1275 // - source_type from `event`/`ref` shape (DOI prefix `10.` vs
1276 // arXiv) — we use a heuristic that matches `Ref::parse`'s rule
1277 // (`starts_with "10."` ⇒ DOI; else arXiv).
1278 // - source_id = ref value (verbatim).
1279 // - resolver_profile = source value (verbatim, ADR-0021 §3
1280 // migration recipe).
1281 // - version = None.
1282 //
1283 // Rows without a `ref` (session bookend) keep `canonical_digest =
1284 // None` per the v2 row contract.
1285
1286 fn derive_digest(seed: &MigrationRowSeed) -> Option<String> {
1287 let ref_str = seed.ref_.as_deref()?;
1288 let source_key = seed.source.as_deref().unwrap_or("");
1289 // Heuristic: bare DOIs always start `10.`; everything else is
1290 // treated as an arXiv id. Mirrors `Ref::parse` rule 3/4.
1291 let source_type = if ref_str.starts_with("10.") {
1292 crate::SourceType::Doi
1293 } else {
1294 crate::SourceType::Arxiv
1295 };
1296 let c = crate::CanonicalRef::new(source_type, ref_str, source_key, None);
1297 Some(c.digest_hex())
1298 }
1299
1300 let digests: Vec<Option<String>> = seeds
1301 .iter()
1302 .map(|s| s.canonical_digest_in.clone().or_else(|| derive_digest(s)))
1303 .collect();
1304
1305 // -- 3. Rebuild the hash chain across the v2 payloads. ----------------
1306 let mut out_rows: Vec<LogRow> = Vec::with_capacity(seeds.len());
1307 let mut prev_hash: String = GENESIS_HASH.to_string();
1308
1309 for (seed, digest) in seeds.iter().zip(digests.iter()) {
1310 let rfh = RowForHash {
1311 ts: seed.ts,
1312 ts_seq: seed.ts_seq,
1313 event: seed.event,
1314 ref_: seed.ref_.as_deref(),
1315 source: seed.source.as_deref(),
1316 result: seed.result,
1317 license: seed.license.as_deref(),
1318 size_bytes: seed.size_bytes,
1319 store_path: seed.store_path.as_deref(),
1320 capability: seed.capability,
1321 session_id: &seed.session_id,
1322 error_code: seed.error_code.as_deref(),
1323 schema_version: LOG_SCHEMA_VERSION,
1324 canonical_digest: digest.as_deref(),
1325 prev_hash: &prev_hash,
1326 };
1327 let this_hash = compute_this_hash(&rfh)?;
1328 let row = LogRow {
1329 ts: seed.ts,
1330 ts_seq: seed.ts_seq,
1331 event: seed.event,
1332 ref_: seed.ref_.clone(),
1333 source: seed.source.clone(),
1334 result: seed.result,
1335 license: seed.license.clone(),
1336 size_bytes: seed.size_bytes,
1337 store_path: seed.store_path.clone(),
1338 capability: seed.capability,
1339 session_id: seed.session_id.clone(),
1340 error_code: seed.error_code.clone(),
1341 schema_version: LOG_SCHEMA_VERSION.to_string(),
1342 canonical_digest: digest.clone(),
1343 prev_hash: prev_hash.clone(),
1344 this_hash: this_hash.clone(),
1345 };
1346 prev_hash = this_hash;
1347 out_rows.push(row);
1348 }
1349
1350 // -- 4. Build the report. --------------------------------------------
1351 let first_v1_hash = seeds
1352 .first()
1353 .map(|s| s.stored_this_hash.clone())
1354 .unwrap_or_else(|| GENESIS_HASH.to_string());
1355 let first_v2_hash = out_rows
1356 .first()
1357 .map(|r| r.this_hash.clone())
1358 .unwrap_or_else(|| GENESIS_HASH.to_string());
1359 let report = MigrationReport {
1360 rows_rewritten: out_rows.len() as u64,
1361 dry_run,
1362 first_row_v1_chain_hash: first_v1_hash,
1363 first_row_v2_chain_hash: first_v2_hash,
1364 };
1365
1366 if dry_run {
1367 return Ok(report);
1368 }
1369
1370 // -- 5. Live write: stage to `<log_path>.v2-migrated`, back up the
1371 // v1, then atomically rename. -----------------------------------
1372 let staged_path = with_suffix(log_path, ".v2-migrated");
1373 let backup_path = with_suffix(log_path, ".v1-backup");
1374
1375 {
1376 let staged_file = OpenOptions::new()
1377 .create(true)
1378 .write(true)
1379 .truncate(true)
1380 .open(&staged_path)?;
1381 let mut writer = BufWriter::new(staged_file);
1382 for row in &out_rows {
1383 let mut bytes = serde_json::to_vec(row)?;
1384 bytes.push(b'\n');
1385 writer.write_all(&bytes)?;
1386 }
1387 writer.flush()?;
1388 let file = writer.into_inner().map_err(|e| {
1389 LogError::Io(std::io::Error::other(format!(
1390 "migration buf writer flush failed: {}",
1391 e.error()
1392 )))
1393 })?;
1394 file.sync_all()?;
1395 }
1396
1397 // Sanity-check: the staged file MUST verify clean before we
1398 // commit the swap. If it doesn't, the migration is buggy — abort
1399 // without touching the live log.
1400 let verify_report = verify(&staged_path)?;
1401 if !verify_report.errors.is_empty() {
1402 return Err(LogError::Io(std::io::Error::other(format!(
1403 "migration: staged v2 log failed verify; first issue: {:?}",
1404 verify_report.errors.first()
1405 ))));
1406 }
1407
1408 // Move the original aside as `<log_path>.v1-backup`. Overwriting
1409 // any prior backup is intentional — the user re-running migrate
1410 // expects the most recent original preserved.
1411 if log_path.exists() {
1412 if backup_path.exists() {
1413 std::fs::remove_file(&backup_path)?;
1414 }
1415 std::fs::rename(log_path, &backup_path)?;
1416 }
1417 // Atomically promote the staged file to the live path.
1418 std::fs::rename(&staged_path, log_path)?;
1419
1420 Ok(report)
1421}
1422
1423/// Append a literal suffix to a [`Utf8Path`], producing a sibling path
1424/// in the same directory. Avoids `std::path::PathBuf` per the workspace
1425/// posture rule (`docs/SECURITY.md` §3 — camino-only file paths in
1426/// production code).
1427fn with_suffix(path: &Utf8Path, suffix: &str) -> Utf8PathBuf {
1428 let s = format!("{path}{suffix}");
1429 Utf8PathBuf::from(s)
1430}
1431
1432// ---------------------------------------------------------------------------
1433// Tests
1434// ---------------------------------------------------------------------------
1435
1436#[cfg(test)]
1437#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
1438mod tests {
1439 use super::*;
1440 use std::fs;
1441 use std::sync::Arc;
1442 use std::thread;
1443
1444 use tempfile::TempDir;
1445
1446 /// Convert a `TempDir`'s `&std::path::Path` to a `Utf8PathBuf`. Tests
1447 /// always run on UTF-8 temp paths in CI; if the OS returns a non-UTF-8
1448 /// path we panic, which is acceptable for a unit test.
1449 fn tmp_dir_utf8(dir: &TempDir) -> Utf8PathBuf {
1450 Utf8PathBuf::from_path_buf(dir.path().to_path_buf()).expect("temp dir path must be UTF-8")
1451 }
1452
1453 /// A fixed 26-char ULID-shaped string used in tests. Real callers use
1454 /// the `ulid` crate; tests pin a constant so output is reproducible.
1455 const TEST_SESSION_ID: &str = "01JCKZ7Q0000000000000000AB";
1456
1457 fn open_log(path: &Utf8Path) -> ProvenanceLog {
1458 ProvenanceLog::open(path, TEST_SESSION_ID.to_string()).expect("open")
1459 }
1460
1461 fn empty_input() -> RowInput<'static> {
1462 RowInput {
1463 event: LogEvent::Fetch,
1464 result: LogResult::Ok,
1465 capability: Capability::Oa,
1466 ref_: None,
1467 source: None,
1468 error_code: None,
1469 size_bytes: None,
1470 license: None,
1471 store_path: None,
1472 canonical_digest: None,
1473 }
1474 }
1475
1476 /// Read the on-disk log and parse every line into a `LogRow`.
1477 fn read_rows(path: &Utf8Path) -> Vec<LogRow> {
1478 let raw = fs::read_to_string(path).expect("read log");
1479 raw.lines()
1480 .filter(|l| !l.is_empty())
1481 .map(|l| serde_json::from_str::<LogRow>(l).expect("valid LogRow"))
1482 .collect()
1483 }
1484
1485 /// Recompute `this_hash` for a stored row and assert it matches the
1486 /// stored value. Walks the same canonicalization rule as
1487 /// [`compute_this_hash`].
1488 fn verify_this_hash(row: &LogRow) {
1489 let rfh = RowForHash {
1490 ts: row.ts,
1491 ts_seq: row.ts_seq,
1492 event: row.event,
1493 ref_: row.ref_.as_deref(),
1494 source: row.source.as_deref(),
1495 result: row.result,
1496 license: row.license.as_deref(),
1497 size_bytes: row.size_bytes,
1498 store_path: row.store_path.as_deref(),
1499 capability: row.capability,
1500 session_id: &row.session_id,
1501 error_code: row.error_code.as_deref(),
1502 schema_version: &row.schema_version,
1503 canonical_digest: row.canonical_digest.as_deref(),
1504 prev_hash: &row.prev_hash,
1505 };
1506 let recomputed = compute_this_hash(&rfh).expect("hash");
1507 assert_eq!(
1508 recomputed, row.this_hash,
1509 "this_hash mismatch on ts_seq {}",
1510 row.ts_seq
1511 );
1512 }
1513
1514 #[test]
1515 fn first_row_uses_genesis_prev_hash() {
1516 let dir = TempDir::new().expect("tmp");
1517 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1518 let log = open_log(&path);
1519 let seq = log.append(empty_input()).expect("append");
1520 assert_eq!(seq, 1);
1521
1522 let rows = read_rows(&path);
1523 assert_eq!(rows.len(), 1);
1524 assert_eq!(rows[0].ts_seq, 1);
1525 assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1526 assert_eq!(rows[0].this_hash.len(), 64);
1527 assert_eq!(rows[0].session_id, TEST_SESSION_ID);
1528 verify_this_hash(&rows[0]);
1529 }
1530
1531 #[test]
1532 fn subsequent_rows_chain_correctly() {
1533 let dir = TempDir::new().expect("tmp");
1534 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1535 let log = open_log(&path);
1536
1537 for _ in 0..3 {
1538 log.append(empty_input()).expect("append");
1539 }
1540
1541 let rows = read_rows(&path);
1542 assert_eq!(rows.len(), 3);
1543 assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1544 assert_eq!(rows[1].prev_hash, rows[0].this_hash);
1545 assert_eq!(rows[2].prev_hash, rows[1].this_hash);
1546 for r in &rows {
1547 verify_this_hash(r);
1548 }
1549 assert_eq!(rows[0].ts_seq, 1);
1550 assert_eq!(rows[1].ts_seq, 2);
1551 assert_eq!(rows[2].ts_seq, 3);
1552 }
1553
1554 #[test]
1555 fn recovery_after_reopen() {
1556 let dir = TempDir::new().expect("tmp");
1557 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1558
1559 {
1560 let log = open_log(&path);
1561 for _ in 0..3 {
1562 log.append(empty_input()).expect("append");
1563 }
1564 } // drop writer
1565
1566 let log2 = open_log(&path);
1567 let seq = log2.append(empty_input()).expect("append after reopen");
1568 assert_eq!(seq, 4);
1569
1570 let rows = read_rows(&path);
1571 assert_eq!(rows.len(), 4);
1572 assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1573 for i in 1..rows.len() {
1574 assert_eq!(
1575 rows[i].prev_hash,
1576 rows[i - 1].this_hash,
1577 "chain break at row {}",
1578 i + 1
1579 );
1580 }
1581 for (i, r) in rows.iter().enumerate() {
1582 assert_eq!(r.ts_seq, (i + 1) as u64);
1583 verify_this_hash(r);
1584 }
1585 }
1586
1587 #[test]
1588 fn concurrent_writers_in_same_process_serialize() {
1589 let dir = TempDir::new().expect("tmp");
1590 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1591 let log = Arc::new(open_log(&path));
1592
1593 let mut handles = Vec::with_capacity(8);
1594 for _ in 0..8 {
1595 let log = Arc::clone(&log);
1596 handles.push(thread::spawn(move || {
1597 log.append(empty_input()).expect("append")
1598 }));
1599 }
1600 let mut returned: Vec<u64> = handles
1601 .into_iter()
1602 .map(|h| h.join().expect("join"))
1603 .collect();
1604 returned.sort_unstable();
1605 assert_eq!(returned, vec![1, 2, 3, 4, 5, 6, 7, 8]);
1606
1607 let rows = read_rows(&path);
1608 assert_eq!(rows.len(), 8);
1609
1610 // The in-process mutex serializes appends, so file order MUST equal
1611 // ts_seq order: row N (0-indexed) on disk has ts_seq = N+1.
1612 for (i, r) in rows.iter().enumerate() {
1613 assert_eq!(r.ts_seq, (i + 1) as u64, "ts_seq gap at file row {}", i + 1);
1614 }
1615 // Hash chain follows file order.
1616 assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1617 for i in 1..rows.len() {
1618 assert_eq!(
1619 rows[i].prev_hash,
1620 rows[i - 1].this_hash,
1621 "chain break at file row {}",
1622 i + 1
1623 );
1624 }
1625 for r in &rows {
1626 verify_this_hash(r);
1627 }
1628 }
1629
1630 #[test]
1631 fn corrupted_existing_log_fails_open() {
1632 let dir = TempDir::new().expect("tmp");
1633 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1634
1635 // JSON but not a valid LogRow: missing required fields, has unknown
1636 // field. `deny_unknown_fields` ensures the parser refuses.
1637 fs::write(&path, "{\"ts_seq\": 1, \"garbage\": true}\n").expect("write");
1638
1639 let err =
1640 ProvenanceLog::open(&path, TEST_SESSION_ID.to_string()).expect_err("must fail open");
1641 match err {
1642 LogError::Io(io) => {
1643 let msg = io.to_string();
1644 assert!(
1645 msg.contains("corrupted log at line 1"),
1646 "expected synthetic corruption message, got: {}",
1647 msg
1648 );
1649 }
1650 other => panic!("expected LogError::Io, got {:?}", other),
1651 }
1652 }
1653
1654 #[test]
1655 fn rejects_non_regular_file() {
1656 // Pointing the log at a directory must fail with NotARegularFile.
1657 let dir = TempDir::new().expect("tmp");
1658 let err = ProvenanceLog::open(tmp_dir_utf8(&dir), TEST_SESSION_ID.to_string())
1659 .expect_err("must fail");
1660 match err {
1661 LogError::NotARegularFile(_) => {}
1662 other => panic!("expected NotARegularFile, got {:?}", other),
1663 }
1664 }
1665
1666 #[test]
1667 fn canonical_json_excludes_this_hash_field() {
1668 // Spec contract: the hashed bytes do not include `this_hash`. If
1669 // this ever regresses, every previously-written log becomes
1670 // unverifiable.
1671 let rfh = RowForHash {
1672 ts: Utc::now(),
1673 ts_seq: 1,
1674 event: LogEvent::Fetch,
1675 ref_: None,
1676 source: None,
1677 result: LogResult::Ok,
1678 license: None,
1679 size_bytes: None,
1680 store_path: None,
1681 capability: Capability::Oa,
1682 session_id: TEST_SESSION_ID,
1683 error_code: None,
1684 schema_version: LOG_SCHEMA_VERSION,
1685 canonical_digest: None,
1686 prev_hash: GENESIS_HASH,
1687 };
1688 let bytes = canonical_json_for_hash(&rfh).expect("canonicalize");
1689 let s = std::str::from_utf8(&bytes).expect("utf8");
1690 assert!(!s.contains("this_hash"), "this_hash leaked into hash input");
1691 assert!(s.contains("\"prev_hash\":"));
1692 }
1693
1694 #[test]
1695 fn canonical_json_keys_are_lexicographically_sorted() {
1696 // PROVENANCE_LOG.md §4: canonical JSON uses keys sorted
1697 // lexicographically. The lex-first top-level key of a row is
1698 // `capability` ("c..." < "e..." < ...). Build a row and assert the
1699 // canonical bytes start with that key.
1700 let rfh = RowForHash {
1701 ts: Utc::now(),
1702 ts_seq: 1,
1703 event: LogEvent::Fetch,
1704 ref_: Some("10.1234/example"),
1705 source: Some("unpaywall"),
1706 result: LogResult::Ok,
1707 license: Some("CC-BY-4.0"),
1708 size_bytes: Some(1234),
1709 store_path: Some("papers/x.pdf"),
1710 capability: Capability::Oa,
1711 session_id: TEST_SESSION_ID,
1712 error_code: None,
1713 schema_version: LOG_SCHEMA_VERSION,
1714 canonical_digest: Some(
1715 "0000000000000000000000000000000000000000000000000000000000000000",
1716 ),
1717 prev_hash: GENESIS_HASH,
1718 };
1719 let bytes = canonical_json_for_hash(&rfh).expect("canonicalize");
1720 let s = std::str::from_utf8(&bytes).expect("utf8");
1721 // v2: lex-first key is `canonical_digest` (< `capability` because
1722 // 'n' < 'p' at byte index 2). Pre-v2 it was `capability`.
1723 assert!(
1724 s.starts_with("{\"canonical_digest\":"),
1725 "canonical bytes must start with lex-first v2 key, got: {}",
1726 s
1727 );
1728 // Spot-check ordering: `prev_hash` (p) must come before `ref` (r),
1729 // which must come before `result` (re...) — wait, "ref" < "result"
1730 // lexicographically because 'f' < 's' in ascii at index 2 vs 'e' at
1731 // index 2 of "result"... let me just check a couple of unambiguous
1732 // pairs: `event` < `prev_hash`, and `ts` < `ts_seq`.
1733 let event_idx = s.find("\"event\":").expect("event key present");
1734 let prev_idx = s.find("\"prev_hash\":").expect("prev_hash key present");
1735 assert!(event_idx < prev_idx, "event must precede prev_hash");
1736 let ts_idx = s.find("\"ts\":").expect("ts key present");
1737 let tsseq_idx = s.find("\"ts_seq\":").expect("ts_seq key present");
1738 assert!(ts_idx < tsseq_idx, "ts must precede ts_seq");
1739 }
1740
1741 // -----------------------------------------------------------------
1742 // verify() tests — Phase 1 surface for `doiget audit-log --verify`.
1743 // -----------------------------------------------------------------
1744
1745 /// Rewrite a single field's quoted-string value on a specific 1-based
1746 /// line of `path`. Used to simulate tampering. Panics on malformed input
1747 /// — only valid inputs are produced by the test harness.
1748 ///
1749 /// `field_key` is matched as `"field_key":"...old..."` (quoted string
1750 /// JSON value). The new value is the literal string `new_value` (no
1751 /// JSON escaping needed for the test fixtures we use).
1752 fn tamper_string_field(
1753 path: &Utf8Path,
1754 line_no_1based: usize,
1755 field_key: &str,
1756 new_value: &str,
1757 ) {
1758 let raw = fs::read_to_string(path).expect("read log");
1759 let mut lines: Vec<String> = raw.lines().map(str::to_string).collect();
1760 let target = &lines[line_no_1based - 1];
1761 let needle = format!("\"{field_key}\":\"");
1762 let start = target
1763 .find(&needle)
1764 .unwrap_or_else(|| panic!("field {field_key} not found on line {line_no_1based}"))
1765 + needle.len();
1766 let end_rel = target[start..]
1767 .find('"')
1768 .unwrap_or_else(|| panic!("unterminated string for field {field_key}"));
1769 let end = start + end_rel;
1770 let mut new_line = String::with_capacity(target.len());
1771 new_line.push_str(&target[..start]);
1772 new_line.push_str(new_value);
1773 new_line.push_str(&target[end..]);
1774 lines[line_no_1based - 1] = new_line;
1775 let mut out = lines.join("\n");
1776 out.push('\n');
1777 fs::write(path, out).expect("write tampered log");
1778 }
1779
1780 #[test]
1781 fn verify_empty_log_is_ok() {
1782 // Missing file is a clean log — no tampering possible on bytes that
1783 // don't exist. `verify` returns an empty report, not an error.
1784 let dir = TempDir::new().expect("tmp");
1785 let path = tmp_dir_utf8(&dir).join("nonexistent.jsonl");
1786 assert!(!path.exists(), "precondition: file must not exist");
1787
1788 let report = verify(&path).expect("verify must not error on missing file");
1789 assert_eq!(report.total_rows, 0);
1790 assert_eq!(report.ok_rows, 0);
1791 assert!(report.errors.is_empty(), "errors: {:?}", report.errors);
1792 }
1793
1794 #[test]
1795 fn verify_well_formed_chain_passes() {
1796 // Three rows written via the real writer must verify clean.
1797 let dir = TempDir::new().expect("tmp");
1798 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1799 let log = open_log(&path);
1800 for _ in 0..3 {
1801 log.append(empty_input()).expect("append");
1802 }
1803
1804 let report = verify(&path).expect("verify must succeed");
1805 assert_eq!(report.total_rows, 3);
1806 assert_eq!(report.ok_rows, 3);
1807 assert!(
1808 report.errors.is_empty(),
1809 "expected no issues on a well-formed log; got: {:?}",
1810 report.errors
1811 );
1812 }
1813
1814 #[test]
1815 fn verify_detects_tampered_row_hash() {
1816 // Mutate the SECOND row's `this_hash` to a syntactically-valid but
1817 // wrong hash. The recomputed canonical-JSON SHA-256 will not match.
1818 let dir = TempDir::new().expect("tmp");
1819 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1820 let log = open_log(&path);
1821 log.append(empty_input()).expect("append 1");
1822 log.append(empty_input()).expect("append 2");
1823 drop(log);
1824
1825 // 64 lowercase hex chars, all zeros — passes `LogRow` parse, fails hash check.
1826 tamper_string_field(
1827 &path,
1828 2,
1829 "this_hash",
1830 "0000000000000000000000000000000000000000000000000000000000000000",
1831 );
1832
1833 let report = verify(&path).expect("verify must succeed");
1834 assert_eq!(report.total_rows, 2);
1835 // Row 2's hash mismatch breaks both the hash check on row 2 AND the
1836 // chain link from row 2's stored `prev_hash` (still correct) into the
1837 // forward direction. There's no row 3 to fail forward, so we expect
1838 // exactly one issue: the this-hash mismatch on line 2.
1839 let hash_issues: Vec<_> = report
1840 .errors
1841 .iter()
1842 .filter(|e| e.kind == VerifyIssueKind::ThisHashMismatch)
1843 .collect();
1844 assert_eq!(
1845 hash_issues.len(),
1846 1,
1847 "expected exactly one ThisHashMismatch, got {:?}",
1848 report.errors
1849 );
1850 assert_eq!(hash_issues[0].line, 2);
1851 }
1852
1853 #[test]
1854 fn verify_detects_tampered_prev_hash() {
1855 // Mutate the SECOND row's `prev_hash` to a wrong value. This
1856 // invalidates the chain link but the row's own `this_hash` was
1857 // computed with the original `prev_hash`, so the this-hash check
1858 // ALSO fails (hash input changed). We assert at least the prev-hash
1859 // issue is reported on line 2.
1860 let dir = TempDir::new().expect("tmp");
1861 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1862 let log = open_log(&path);
1863 log.append(empty_input()).expect("append 1");
1864 log.append(empty_input()).expect("append 2");
1865 drop(log);
1866
1867 tamper_string_field(
1868 &path,
1869 2,
1870 "prev_hash",
1871 "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
1872 );
1873
1874 let report = verify(&path).expect("verify must succeed");
1875 assert_eq!(report.total_rows, 2);
1876 let prev_issues: Vec<_> = report
1877 .errors
1878 .iter()
1879 .filter(|e| e.kind == VerifyIssueKind::PrevHashMismatch)
1880 .collect();
1881 assert_eq!(
1882 prev_issues.len(),
1883 1,
1884 "expected exactly one PrevHashMismatch, got {:?}",
1885 report.errors
1886 );
1887 assert_eq!(prev_issues[0].line, 2);
1888 }
1889
1890 #[test]
1891 fn verify_detects_corrupted_json() {
1892 // One valid row plus a literal `{"garbage":true}` line. The garbage
1893 // line fails `serde_json::from_str::<LogRow>` (missing fields +
1894 // `deny_unknown_fields`) and surfaces as a `ParseError` on line 2.
1895 let dir = TempDir::new().expect("tmp");
1896 let path = tmp_dir_utf8(&dir).join("log.jsonl");
1897 let log = open_log(&path);
1898 log.append(empty_input()).expect("append 1");
1899 drop(log);
1900
1901 // Append a garbage line directly.
1902 let mut existing = fs::read_to_string(&path).expect("read");
1903 if !existing.ends_with('\n') {
1904 existing.push('\n');
1905 }
1906 existing.push_str("{\"garbage\":true}\n");
1907 fs::write(&path, existing).expect("write");
1908
1909 let report = verify(&path).expect("verify must succeed");
1910 // total_rows counts non-empty lines, so both lines are counted.
1911 assert_eq!(report.total_rows, 2);
1912 let parse_issues: Vec<_> = report
1913 .errors
1914 .iter()
1915 .filter(|e| e.kind == VerifyIssueKind::ParseError)
1916 .collect();
1917 assert_eq!(
1918 parse_issues.len(),
1919 1,
1920 "expected exactly one ParseError, got {:?}",
1921 report.errors
1922 );
1923 assert_eq!(parse_issues[0].line, 2);
1924 }
1925
1926 #[test]
1927 fn capability_serializes_kebab_case() {
1928 // PROVENANCE_LOG.md §3 requires `oa`, `metadata`, `tdm-elsevier`,
1929 // `tdm-aps`, `tdm-springer` on the wire (kebab-case).
1930 let cases = [
1931 (Capability::Oa, "\"oa\""),
1932 (Capability::Metadata, "\"metadata\""),
1933 (Capability::TdmElsevier, "\"tdm-elsevier\""),
1934 (Capability::TdmAps, "\"tdm-aps\""),
1935 (Capability::TdmSpringer, "\"tdm-springer\""),
1936 ];
1937 for (cap, expected) in cases {
1938 let got = serde_json::to_string(&cap).expect("serialize");
1939 assert_eq!(
1940 got, expected,
1941 "capability wire format mismatch for {:?}",
1942 cap
1943 );
1944 }
1945 }
1946
1947 // -----------------------------------------------------------------
1948 // #140 — §6 rotation, retention, multi-segment verify.
1949 // -----------------------------------------------------------------
1950
1951 fn gunzip_to_string(gz: &Utf8Path) -> String {
1952 use std::io::Read;
1953 let f = std::fs::File::open(gz.as_std_path()).expect("open gz");
1954 let mut dec = GzDecoder::new(f);
1955 let mut s = String::new();
1956 dec.read_to_string(&mut s).expect("gunzip");
1957 s
1958 }
1959
1960 #[test]
1961 fn rotation_archives_to_gz_and_restarts_genesis_chain() {
1962 let dir = TempDir::new().expect("tmp");
1963 let path = tmp_dir_utf8(&dir).join("access.log");
1964 // Inject a tiny threshold (NOT a global env var — that raced
1965 // non-#[serial] tests): row 1 fits, so the SECOND append
1966 // (size>=50) rotates before it writes. A freshly rotated `.gz`
1967 // is not retention-aged, so the default prune at open is a no-op.
1968 let log = ProvenanceLog::open_with_rotate_threshold(&path, TEST_SESSION_ID.to_string(), 50)
1969 .expect("open");
1970 log.append(empty_input()).expect("append 1");
1971 let row1 = read_rows(&path);
1972 assert_eq!(row1.len(), 1);
1973 assert_eq!(row1[0].prev_hash, GENESIS_HASH);
1974
1975 log.append(empty_input()).expect("append 2 (rotates first)");
1976
1977 // Exactly one rotated segment; it gunzips to the original row 1.
1978 let segs = rotated_segments(&path);
1979 assert_eq!(segs.len(), 1, "one .gz segment expected; got {segs:?}");
1980 let archived: Vec<LogRow> = gunzip_to_string(&segs[0])
1981 .lines()
1982 .filter(|l| !l.is_empty())
1983 .map(|l| serde_json::from_str(l).expect("row"))
1984 .collect();
1985 assert_eq!(archived.len(), 1);
1986 assert_eq!(archived[0].this_hash, row1[0].this_hash);
1987
1988 // The fresh access.log restarts the chain at GENESIS, ts_seq 1.
1989 let cur = read_rows(&path);
1990 assert_eq!(cur.len(), 1, "fresh segment holds only the post-rotate row");
1991 assert_eq!(cur[0].prev_hash, GENESIS_HASH);
1992 assert_eq!(cur[0].ts_seq, 1);
1993
1994 // verify_all sees both segments, each its own clean chain.
1995 let reports = verify_all(&path).expect("verify_all");
1996 assert_eq!(reports.len(), 2, "rotated .gz + current");
1997 for (p, r) in &reports {
1998 assert!(r.errors.is_empty(), "segment {p} must verify clean: {r:?}");
1999 }
2000 }
2001
2002 #[test]
2003 fn rotate_log_is_fail_closed_on_missing_source() {
2004 // The append path propagates this via `?`, so a rotation failure
2005 // aborts the fetch (fail-closed) rather than silently continuing.
2006 let dir = TempDir::new().expect("tmp");
2007 let missing = tmp_dir_utf8(&dir).join("nope.log");
2008 let err = rotate_log(&missing).expect_err("missing source must error");
2009 assert!(matches!(err, LogError::Io(_)), "got {err:?}");
2010 }
2011
2012 #[test]
2013 #[serial_test::serial]
2014 fn prune_respects_retention_window_and_disable() {
2015 let dir = TempDir::new().expect("tmp");
2016 let base = tmp_dir_utf8(&dir);
2017 let path = base.join("access.log");
2018 let old_gz = base.join("access.log.2020-01-01-000000.gz");
2019 let new_gz = base.join("access.log.2999-01-01-000000.gz");
2020
2021 let mk = |p: &Utf8Path, aged: bool| {
2022 let f = std::fs::File::create(p.as_std_path()).expect("create gz");
2023 if aged {
2024 // 100 days ago — older than the 90-day default & a 1-day window.
2025 let when =
2026 std::time::SystemTime::now() - std::time::Duration::from_secs(100 * 86_400);
2027 f.set_modified(when).expect("set mtime");
2028 }
2029 };
2030
2031 // (a) days=0 disables pruning entirely.
2032 mk(&old_gz, true);
2033 std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "0");
2034 let _ = open_log(&path);
2035 assert!(old_gz.exists(), "days=0 must NOT prune");
2036
2037 // (b) days=1 prunes the aged segment, keeps a fresh one.
2038 mk(&new_gz, false);
2039 std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "1");
2040 let _ = open_log(&path);
2041 assert!(!old_gz.exists(), "aged segment must be pruned at days=1");
2042 assert!(new_gz.exists(), "fresh segment must survive");
2043
2044 std::env::remove_var("DOIGET_LOG_RETENTION_DAYS");
2045 }
2046
2047 #[test]
2048 #[serial_test::serial]
2049 fn retention_days_env_parsing() {
2050 std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "0");
2051 assert_eq!(retention_days(), 0);
2052 std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "30");
2053 assert_eq!(retention_days(), 30);
2054 std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "garbage");
2055 assert_eq!(retention_days(), DEFAULT_RETENTION_DAYS);
2056 std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "-5");
2057 assert_eq!(retention_days(), DEFAULT_RETENTION_DAYS);
2058 std::env::remove_var("DOIGET_LOG_RETENTION_DAYS");
2059 assert_eq!(retention_days(), DEFAULT_RETENTION_DAYS);
2060 }
2061
2062 #[test]
2063 fn verify_all_flags_tampered_segment_independently() {
2064 let dir = TempDir::new().expect("tmp");
2065 let path = tmp_dir_utf8(&dir).join("access.log");
2066 // Inject the tiny threshold (no global env → no cross-test race).
2067 let log = ProvenanceLog::open_with_rotate_threshold(&path, TEST_SESSION_ID.to_string(), 50)
2068 .expect("open");
2069 log.append(empty_input()).expect("append 1");
2070 log.append(empty_input()).expect("append 2 (rotates)");
2071 drop(log);
2072
2073 // Tamper the CURRENT segment's row: set this_hash to a
2074 // syntactically-valid 64-hex string that cannot be the SHA-256
2075 // of any row (all zeros). NOTE: the previous "flip the last char
2076 // to '0'" was a no-op ~1/16 of runs when the real hash already
2077 // ended in '0' (this_hash depends on `Utc::now()`), which is the
2078 // flake this fixes — mirrors `verify_detects_tampered_row_hash`.
2079 let mut cur = read_rows(&path);
2080 let mut bad = cur.remove(0);
2081 bad.this_hash =
2082 "0000000000000000000000000000000000000000000000000000000000000000".to_string();
2083 std::fs::write(
2084 path.as_std_path(),
2085 format!("{}\n", serde_json::to_string(&bad).expect("ser")),
2086 )
2087 .expect("rewrite tampered current");
2088
2089 let reports = verify_all(&path).expect("verify_all");
2090 assert_eq!(reports.len(), 2);
2091 // Oldest first = the rotated .gz (clean); current last (tampered).
2092 let (gz_path, gz_rep) = &reports[0];
2093 let (cur_path, cur_rep) = &reports[1];
2094 assert!(
2095 gz_path.as_str().ends_with(".gz") && gz_rep.errors.is_empty(),
2096 "rotated segment must stay clean: {gz_path} {gz_rep:?}"
2097 );
2098 assert!(
2099 cur_path.file_name() == Some("access.log") && !cur_rep.errors.is_empty(),
2100 "tampered current segment must report issues: {cur_path} {cur_rep:?}"
2101 );
2102 }
2103}