Skip to main content

doiget_core/
provenance.rs

1//! JSON Lines + SHA-256 hash-chained provenance log.
2//!
3//! Binding spec: `docs/PROVENANCE_LOG.md` (NORMATIVE, §3 row schema, §4 hash
4//! chain). Failure semantics: **fail-closed** — callers MUST abort the fetch
5//! if a log write returns `Err`. See `docs/SECURITY.md` §1.8 and ADR-0006.
6//!
7//! # On-disk format
8//!
9//! - JSON Lines (`.jsonl`): one JSON object per line, terminated by `\n` (LF).
10//! - UTF-8. Timestamps are RFC3339 in UTC.
11//! - Each row is appended via a single `write_all` whose payload always ends
12//!   in `\n`, so a partially-written row is detectable as a missing trailing
13//!   newline rather than a torn JSON record.
14//! - In audit-grade mode (the only mode shipped here), the writer flushes the
15//!   `BufWriter` and `fsync`s the file after every row.
16//!
17//! # Hash chain (PROVENANCE_LOG.md §4)
18//!
19//! Each row carries a `prev_hash` and a `this_hash`. The first row's
20//! `prev_hash` is the literal string `"GENESIS"`. Every subsequent row's
21//! `prev_hash` MUST equal the previous row's `this_hash`.
22//!
23//! When a log file rotates (§6 — not yet implemented in this crate; see TODO
24//! below), the first row of the NEW log file also uses `prev_hash =
25//! "GENESIS"`, restarting the chain.
26//!
27//! `this_hash` is computed as:
28//!
29//! ```text
30//! this_hash = lower_hex(SHA-256(canonical_json(row \ {this_hash})))
31//! ```
32//!
33//! where `canonical_json` is **compact JSON (no whitespace) with object keys
34//! sorted lexicographically** (PROVENANCE_LOG.md §4). For a row with fields
35//! `{ts: "...", ts_seq: 1, event: "fetch", ...}`, the canonical bytes begin
36//! with `{"capability":...` because `capability` is the lex-first top-level
37//! key. Downstream `doiget audit-log --verify` (Phase 1+) relies on this
38//! exact rule — do not change the canonicalization without bumping the spec.
39//!
40//! # In-process serialization
41//!
42//! `ProvenanceLog` holds a `Mutex<LogState>`. All `append` calls within the
43//! same process serialize on this mutex, satisfying the "process-local mutex
44//! on log appender" requirement of `docs/SECURITY.md` §1.8. Cross-process
45//! coordination (multiple `doiget` invocations) is out of scope here and
46//! handled by the higher-level `flock`-based store layer.
47//!
48//! # Session id
49//!
50//! `session_id` (PROVENANCE_LOG.md §3) is a 26-char ULID generated **once per
51//! process invocation** by the caller and stamped into every row written
52//! through the resulting [`ProvenanceLog`]. This crate does not generate the
53//! ULID itself — see [`ProvenanceLog::open`] for the contract.
54//!
55//! # Log rotation and retention (§6)
56//!
57//! Implemented (PROVENANCE_LOG.md §6): when `access.log` exceeds
58//! `ROTATE_BYTES` (100 MiB) a subsequent [`ProvenanceLog::append`]
59//! gzip-compresses the full file to `access.log.<YYYY-MM-DD-HHMMSS>.gz`,
60//! removes the old `access.log`, and writes the incoming row as the
61//! first row of a fresh file with `prev_hash = "GENESIS"` (the hash
62//! chain **restarts** per segment — segments are NOT linked). Rotation
63//! is fail-closed: any gzip / rename / unlink failure aborts the
64//! `append` (the caller's fetch aborts) so the chain never silently
65//! skips. At [`ProvenanceLog::open`], rotated `.gz` segments older than
66//! the retention window (`DOIGET_LOG_RETENTION_DAYS`, default 90; `0`
67//! disables) are deleted **best-effort** (a prune failure is logged,
68//! not fatal — pruning is housekeeping, not integrity).
69//! [`verify_all`] verifies the current file plus every rotated `.gz`
70//! segment (each its own GENESIS-rooted chain).
71
72use std::collections::BTreeMap;
73use std::fs::{File, OpenOptions};
74use std::io::{BufRead, BufReader, BufWriter, Write};
75use std::sync::Mutex;
76
77use flate2::read::GzDecoder;
78use flate2::write::GzEncoder;
79use flate2::Compression;
80
81use camino::{Utf8Path, Utf8PathBuf};
82use chrono::{DateTime, Utc};
83use serde::{Deserialize, Serialize};
84use sha2::{Digest, Sha256};
85
86/// One row of the provenance log (PROVENANCE_LOG.md §3).
87///
88/// The on-disk wire field names match the spec table; struct-field order is
89/// **not** load-bearing for the hash because canonicalization sorts keys
90/// lexicographically (see PROVENANCE_LOG.md §4).
91///
92/// **Schema version**: this struct is the **v2** row shape (ADR-0024).
93/// Every v2 row carries `schema_version = "v2"` literally; the
94/// `canonical_digest` field carries the ADR-0021 §1 audit identity of
95/// the fetch on rows where one applies (`Fetch` / `Resolve` /
96/// `StoreWrite`) and is `None` on session bookend rows
97/// (`SessionStart` / `SessionEnd` / `CapabilityResolved`) that have no
98/// ref. v1 rows (pre-Slice-4) lack both fields and MUST be migrated via
99/// [`migrate_v1_to_v2`] before the v2 binary can read them — the
100/// `deny_unknown_fields` + non-defaulted `schema_version` shape ensures
101/// v1 rows fail to parse loudly rather than producing silent hash-chain
102/// mismatches.
103#[derive(Debug, Clone, Serialize, Deserialize)]
104#[serde(deny_unknown_fields)]
105pub struct LogRow {
106    /// RFC3339 UTC timestamp of the append (millisecond precision).
107    pub ts: DateTime<Utc>,
108    /// Per-session monotonic sequence number, starting at 1.
109    pub ts_seq: u64,
110    /// Event class (see [`LogEvent`]).
111    pub event: LogEvent,
112    /// Optional reference (DOI / arXiv id). Wire field name is `ref`.
113    #[serde(rename = "ref")]
114    pub ref_: Option<String>,
115    /// Optional source name (e.g. `unpaywall`).
116    pub source: Option<String>,
117    /// Result (see [`LogResult`]).
118    pub result: LogResult,
119    /// OA license string (`event=fetch`, `result=ok`); `None` otherwise.
120    pub license: Option<String>,
121    /// Bytes written / fetched, on success rows.
122    pub size_bytes: Option<u64>,
123    /// Path to the stored payload, relative to the store root
124    /// (`event=fetch`, `result=ok`); `None` otherwise.
125    pub store_path: Option<String>,
126    /// Capability under which the row was written (REQUIRED, every row).
127    pub capability: Capability,
128    /// 26-char ULID identifying the process invocation (REQUIRED).
129    pub session_id: String,
130    /// Stable error code on failure rows.
131    pub error_code: Option<String>,
132    /// Row schema version. Always [`LOG_SCHEMA_VERSION`] (`"v2"`) for
133    /// new rows written by this build (ADR-0024). v1 rows lack this
134    /// field; they MUST be migrated via [`migrate_v1_to_v2`] first.
135    pub schema_version: String,
136    /// Canonical-digest of the fetch's audit identity (ADR-0021 §1) as
137    /// 64 lowercase hex chars. Present on rows with a `ref` (`Fetch`,
138    /// `Resolve`, `StoreWrite`); `None` on session bookend rows. The
139    /// digest is computed from a [`crate::CanonicalRef`] whose
140    /// `resolver_profile` matches this row's `source` field for
141    /// migrated v1 rows; new v2 rows MAY pass an explicit
142    /// `resolver_profile` distinct from `source`.
143    pub canonical_digest: Option<String>,
144    /// 64 lowercase hex chars, OR the literal string `"GENESIS"` for the
145    /// first row of a fresh log file.
146    pub prev_hash: String,
147    /// 64 lowercase hex chars. SHA-256 of canonical JSON of THIS row with
148    /// the `this_hash` field removed. See module docs.
149    pub this_hash: String,
150}
151
152/// Provenance-log row schema version this build writes
153/// (`docs/PROVENANCE_LOG.md` §3, ADR-0024).
154///
155/// Bumped from `"v1"` (implicit; pre-Slice-4 rows had no
156/// `schema_version` field) to `"v2"` when the `canonical_digest` column
157/// landed. The v1→v2 migration is one-shot, idempotent, and dry-runnable
158/// via [`migrate_v1_to_v2`].
159pub const LOG_SCHEMA_VERSION: &str = "v2";
160
161/// Event class for a log row (PROVENANCE_LOG.md §3).
162///
163/// Note: result-status (`ok`/`err`/`denied`) lives in [`LogResult`], NOT in
164/// the event variant. So `Fetch` covers both successful and failed fetch
165/// attempts; the row's `result` distinguishes them.
166///
167/// `non_exhaustive` so adding new variants is non-breaking.
168#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
169#[serde(rename_all = "snake_case")]
170#[non_exhaustive]
171pub enum LogEvent {
172    /// Process started; first row of a new session.
173    SessionStart,
174    /// Capability resolution finished (allowed / denied / which env var).
175    CapabilityResolved,
176    /// Reference resolved to a fetch URL.
177    Resolve,
178    /// Fetch attempt (success or failure determined by `result`).
179    Fetch,
180    /// Store write attempt (success or failure determined by `result`).
181    StoreWrite,
182    /// Process ended cleanly.
183    SessionEnd,
184}
185
186/// Per-row outcome (PROVENANCE_LOG.md §3). `non_exhaustive` for forward
187/// compatibility.
188#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
189#[serde(rename_all = "snake_case")]
190#[non_exhaustive]
191pub enum LogResult {
192    /// The operation succeeded.
193    Ok,
194    /// The operation failed with an error.
195    Err,
196    /// The operation was denied (e.g. capability gate).
197    Denied,
198}
199
200/// Capability under which a row was written (PROVENANCE_LOG.md §3).
201///
202/// `kebab-case` serde rename emits `oa`, `metadata`, `tdm-elsevier`,
203/// `tdm-aps`, `tdm-springer` exactly as the spec requires. `non_exhaustive`
204/// for forward compatibility.
205#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
206#[serde(rename_all = "kebab-case")]
207#[non_exhaustive]
208pub enum Capability {
209    /// Open access tier.
210    Oa,
211    /// Metadata-only access.
212    Metadata,
213    /// Elsevier TDM (Tier 3, opt-in build).
214    TdmElsevier,
215    /// APS TDM (Tier 3, opt-in build).
216    TdmAps,
217    /// Springer TDM (Tier 3, opt-in build).
218    TdmSpringer,
219}
220
221/// Errors emitted by the provenance log writer. Callers MUST treat any
222/// variant as a fail-closed signal and abort the surrounding fetch.
223#[derive(Debug, thiserror::Error)]
224#[non_exhaustive]
225pub enum LogError {
226    /// I/O error opening, reading, writing, or syncing the log file. Includes
227    /// recovery-time corruption detection where the synthetic message is
228    /// `"corrupted log at line N: …"`.
229    #[error("provenance log io error: {0}")]
230    Io(#[from] std::io::Error),
231    /// Serialization of a row to canonical JSON failed.
232    #[error("provenance log serialization error: {0}")]
233    Serialize(#[from] serde_json::Error),
234    /// Path supplied to [`ProvenanceLog::open`] exists but is not a regular
235    /// file (e.g. a directory or symlink).
236    #[error("provenance log path is not a regular file: {0}")]
237    NotARegularFile(Utf8PathBuf),
238}
239
240/// Append-only writer with in-process serialization.
241#[derive(Debug)]
242pub struct ProvenanceLog {
243    path: Utf8PathBuf,
244    state: Mutex<LogState>,
245    session_id: String,
246    /// §6 rotation threshold, resolved ONCE at [`ProvenanceLog::open`]
247    /// (not per-`append`). Reading `DOIGET_LOG_ROTATE_BYTES` once at
248    /// open — rather than on every append — means a log opened without
249    /// the env set keeps the real 100 MiB threshold for its whole life
250    /// even if another (test) thread later mutates that process-global
251    /// env var; this removes a parallel-test race without serializing
252    /// every multi-append test. `0` = rotation disabled.
253    rotate_threshold: u64,
254}
255
256/// Mutable internal state, guarded by [`ProvenanceLog::state`].
257#[derive(Debug)]
258struct LogState {
259    /// `ts_seq` of the **next** row to be appended.
260    next_seq: u64,
261    /// 64 lowercase hex chars; [`GENESIS_HASH`] if the log is empty.
262    last_hash: String,
263}
264
265/// The genesis sentinel used as `prev_hash` for the first row of a log file
266/// (PROVENANCE_LOG.md §3, §6). Also written verbatim as the prev-hash of the
267/// first row after a log rotation (the chain restarts per segment).
268const GENESIS_HASH: &str = "GENESIS";
269
270/// Rotate `access.log` once it reaches this size (PROVENANCE_LOG.md §6:
271/// "100 MB"). 100 MiB. Overridable via the `DOIGET_LOG_ROTATE_BYTES`
272/// env var — an internal ops/testing knob (NOT a documented public
273/// surface): tests set it tiny to exercise rotation without writing
274/// 100 MiB; a value of `0` disables rotation.
275const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
276
277/// Default rotated-segment retention (PROVENANCE_LOG.md §6: "90 days").
278/// Overridable via `DOIGET_LOG_RETENTION_DAYS`; `0` disables pruning.
279const DEFAULT_RETENTION_DAYS: i64 = 90;
280
281/// Resolve the rotation threshold: `DOIGET_LOG_ROTATE_BYTES` if set and
282/// parseable, else [`ROTATE_BYTES`]. `0` (or unparsable) → returns the
283/// value as-is (`0` means "never rotate").
284fn rotate_threshold_bytes() -> u64 {
285    match std::env::var("DOIGET_LOG_ROTATE_BYTES") {
286        Ok(s) => s.trim().parse::<u64>().unwrap_or(ROTATE_BYTES),
287        Err(_) => ROTATE_BYTES,
288    }
289}
290
291/// Resolve retention days from `DOIGET_LOG_RETENTION_DAYS`
292/// (default [`DEFAULT_RETENTION_DAYS`]). `0` disables pruning. A
293/// negative / unparsable value falls back to the default with a warn.
294fn retention_days() -> i64 {
295    match std::env::var("DOIGET_LOG_RETENTION_DAYS") {
296        Ok(s) => match s.trim().parse::<i64>() {
297            Ok(n) if n >= 0 => n,
298            _ => {
299                tracing::warn!(
300                    value = %s,
301                    "DOIGET_LOG_RETENTION_DAYS is not a non-negative integer; \
302                     using the {DEFAULT_RETENTION_DAYS}-day default"
303                );
304                DEFAULT_RETENTION_DAYS
305            }
306        },
307        Err(_) => DEFAULT_RETENTION_DAYS,
308    }
309}
310
311/// gzip-compress `path` to `<file_name>.<YYYY-MM-DD-HHMMSS>.gz` (in the
312/// same directory) and unlink `path` (PROVENANCE_LOG.md §6).
313///
314/// Atomic & fail-closed: the gzip is written to a `.tmp`, fsynced, then
315/// `rename`d into place (so a partial `.gz` is never observable), and
316/// only then is the original removed. Every step propagates its error
317/// to the caller (`ProvenanceLog::append`), which is fail-closed — a
318/// rotation failure aborts the surrounding fetch. Crash safety: a crash
319/// after the rename but before the unlink leaves both the full `.gz`
320/// and the (over-size) `access.log`; the next `append` simply rotates
321/// again, producing a second independently-valid segment — wasteful but
322/// never lossy or corrupt.
323fn rotate_log(path: &Utf8Path) -> Result<(), LogError> {
324    let file_name = path.file_name().ok_or_else(|| {
325        LogError::Io(std::io::Error::other(
326            "provenance log path has no file name; cannot rotate",
327        ))
328    })?;
329    let ts = Utc::now().format("%Y-%m-%d-%H%M%S");
330    let gz_name = format!("{file_name}.{ts}.gz");
331    let dir = path.parent().unwrap_or_else(|| Utf8Path::new("."));
332    let gz_path = dir.join(&gz_name);
333    let tmp_path = dir.join(format!("{gz_name}.tmp"));
334
335    {
336        let mut src = File::open(path)?;
337        let tmp = File::create(&tmp_path)?;
338        let mut enc = GzEncoder::new(BufWriter::new(tmp), Compression::default());
339        std::io::copy(&mut src, &mut enc)?;
340        let bufw = enc.finish()?;
341        let tmp = bufw.into_inner().map_err(|e| {
342            LogError::Io(std::io::Error::other(format!(
343                "gz tmp buf flush failed: {}",
344                e.error()
345            )))
346        })?;
347        tmp.sync_all()?;
348    }
349    std::fs::rename(&tmp_path, &gz_path)?;
350    std::fs::remove_file(path)?;
351    Ok(())
352}
353
354/// Rotated `.gz` segments siblings of `current`, sorted ascending. The
355/// embedded `YYYY-MM-DD-HHMMSS` timestamp makes lexicographic order ==
356/// chronological order.
357fn rotated_segments(current: &Utf8Path) -> Vec<Utf8PathBuf> {
358    let Some(file_name) = current.file_name() else {
359        return Vec::new();
360    };
361    let dir = current.parent().unwrap_or_else(|| Utf8Path::new("."));
362    let prefix = format!("{file_name}.");
363    let mut segs: Vec<Utf8PathBuf> = match std::fs::read_dir(dir.as_std_path()) {
364        Ok(rd) => rd
365            .filter_map(|e| e.ok())
366            .filter_map(|e| Utf8PathBuf::from_path_buf(e.path()).ok())
367            .filter(|p| {
368                p.file_name()
369                    .map(|n| n.starts_with(&prefix) && n.ends_with(".gz"))
370                    .unwrap_or(false)
371            })
372            .collect(),
373        Err(_) => Vec::new(),
374    };
375    segs.sort();
376    segs
377}
378
379/// Delete rotated `.gz` segments older than `days` (PROVENANCE_LOG.md
380/// §6 retention). `days <= 0` is a no-op (disabled). **Best-effort**:
381/// pruning is housekeeping, not integrity, so any failure is logged and
382/// skipped — `ProvenanceLog::open` still succeeds.
383fn prune_rotated_segments(current: &Utf8Path, days: i64) {
384    if days <= 0 {
385        return;
386    }
387    let Some(cutoff) = std::time::SystemTime::now()
388        .checked_sub(std::time::Duration::from_secs(days as u64 * 86_400))
389    else {
390        return;
391    };
392    for seg in rotated_segments(current) {
393        let aged = std::fs::metadata(seg.as_std_path())
394            .and_then(|m| m.modified())
395            .map(|mt| mt < cutoff)
396            .unwrap_or(false);
397        if !aged {
398            continue;
399        }
400        match std::fs::remove_file(seg.as_std_path()) {
401            Ok(()) => tracing::info!(
402                segment = %seg,
403                "provenance: pruned rotated segment past retention"
404            ),
405            Err(e) => tracing::warn!(
406                segment = %seg, error = %e,
407                "provenance: failed to prune rotated segment (best-effort; continuing)"
408            ),
409        }
410    }
411}
412
413/// Verify the full provenance history: every rotated `.gz` segment
414/// (oldest→newest) followed by the current `access.log`. Each segment
415/// is its own GENESIS-rooted hash chain (segments are deliberately NOT
416/// linked across a rotation, PROVENANCE_LOG.md §6), so they are
417/// verified independently and reported per-segment.
418///
419/// The audited [`verify`] function itself is unchanged; this only
420/// orchestrates it over the segment set (gunzipping each `.gz` to a
421/// tempfile first).
422///
423/// # Errors
424///
425/// [`LogError::Io`] on a gunzip / tempfile failure. A missing current
426/// `access.log` is not an error ([`verify`] reports it empty).
427pub fn verify_all(current: &Utf8Path) -> Result<Vec<(Utf8PathBuf, VerifyReport)>, LogError> {
428    let mut out = Vec::new();
429    for seg in rotated_segments(current) {
430        let gz = File::open(seg.as_std_path())?;
431        let mut dec = GzDecoder::new(gz);
432        let tmp = tempfile::NamedTempFile::new().map_err(|e| {
433            LogError::Io(std::io::Error::other(format!(
434                "verify_all: tempfile for {seg}: {e}"
435            )))
436        })?;
437        {
438            let mut w = File::create(tmp.path())?;
439            std::io::copy(&mut dec, &mut w)?;
440            w.sync_all()?;
441        }
442        let tmp_utf8 = Utf8Path::from_path(tmp.path()).ok_or_else(|| {
443            LogError::Io(std::io::Error::other("verify_all: non-utf8 tempfile path"))
444        })?;
445        let report = verify(tmp_utf8)?;
446        out.push((seg, report));
447        // `tmp` (and the gunzipped file) drop here, after verify.
448    }
449    let report = verify(current)?;
450    out.push((current.to_path_buf(), report));
451    Ok(out)
452}
453
454/// Caller-supplied fields for a row. The writer fills in `ts`, `ts_seq`,
455/// `session_id`, `prev_hash`, `this_hash`, and the literal
456/// `schema_version = "v2"` (`LOG_SCHEMA_VERSION`).
457///
458/// Callers SHOULD populate [`Self::canonical_digest`] on rows that have
459/// a meaningful audit identity (`Fetch` / `Resolve` / `StoreWrite` rows
460/// with a `ref`), leaving it `None` on session bookend rows. The digest
461/// is produced by [`crate::CanonicalRef::digest_hex`] from a
462/// `(source_type, source_id, resolver_profile, version)` tuple — see
463/// ADR-0021 §1 for the algorithm and ADR-0024 for the implementation
464/// surface.
465#[derive(Debug, Clone)]
466pub struct RowInput<'a> {
467    /// Event class.
468    pub event: LogEvent,
469    /// Result.
470    pub result: LogResult,
471    /// Capability under which the row is written (REQUIRED for every row).
472    pub capability: Capability,
473    /// Optional DOI / arXiv id.
474    pub ref_: Option<&'a str>,
475    /// Optional source name.
476    pub source: Option<&'a str>,
477    /// Optional error code on failure rows.
478    pub error_code: Option<&'a str>,
479    /// Optional payload size in bytes.
480    pub size_bytes: Option<u64>,
481    /// Optional OA license string (set on `event=fetch`, `result=ok`).
482    pub license: Option<&'a str>,
483    /// Optional store path relative to the store root (set on `event=fetch`,
484    /// `result=ok`).
485    pub store_path: Option<&'a str>,
486    /// Optional canonical-digest (ADR-0021 §1) as 64 lowercase hex
487    /// chars. `None` for session bookend / capability-resolution rows;
488    /// SHOULD be `Some` for `Fetch` / `Resolve` / `StoreWrite` rows
489    /// whose `source` field names the resolver. Build via
490    /// [`crate::Ref::promote`] + [`crate::CanonicalRef::digest_hex`].
491    pub canonical_digest: Option<&'a str>,
492}
493
494// ---------------------------------------------------------------------------
495// Canonical-JSON helper (PROVENANCE_LOG.md §4)
496//
497// Hashing rule (CRITICAL — this is the spec contract for `audit-log --verify`):
498//
499//   this_hash = lower_hex(SHA-256(canonical_json(row \ {this_hash})))
500//
501// Canonical JSON = **compact (no whitespace), keys sorted lexicographically,
502// no trailing whitespace** (§4). Struct field order is deliberately NOT
503// load-bearing here; the canonicalizer sorts the resulting object keys via
504// `BTreeMap<String, Value>`, which serializes in lex-sorted key order.
505//
506// Worked example: for the row fragment `{ts_seq: 1, ts: "..."}` (input order),
507// the canonical bytes after lex sort are `{"ts":"...","ts_seq":1}` because
508// `"ts"` < `"ts_seq"` lexicographically. In v2 (ADR-0024) the lex-first
509// top-level key is `"canonical_digest"` — `"canonical_digest"` < `"capability"`
510// because 'n'(110) < 'p'(112) at byte index 2 (both share the `"ca"`
511// prefix). The pre-v2 lex-first key was `"capability"`.
512// ---------------------------------------------------------------------------
513
514/// Serializable shadow of [`LogRow`] **without** `this_hash`. Used solely as
515/// an intermediate to compute the canonical bytes that `this_hash` is the
516/// SHA-256 of. The wire key names match [`LogRow`]'s `serde` attributes.
517///
518/// v2 shape (ADR-0024): includes `schema_version` and
519/// `canonical_digest`. Both fields participate in the hash chain — a
520/// tampered `canonical_digest` is detected by `audit-log --verify`
521/// exactly like a tampered `ref` or `source` would be.
522#[derive(Serialize)]
523struct RowForHash<'a> {
524    ts: DateTime<Utc>,
525    ts_seq: u64,
526    event: LogEvent,
527    #[serde(rename = "ref")]
528    ref_: Option<&'a str>,
529    source: Option<&'a str>,
530    result: LogResult,
531    license: Option<&'a str>,
532    size_bytes: Option<u64>,
533    store_path: Option<&'a str>,
534    capability: Capability,
535    session_id: &'a str,
536    error_code: Option<&'a str>,
537    schema_version: &'a str,
538    canonical_digest: Option<&'a str>,
539    prev_hash: &'a str,
540}
541
542/// Produce canonical-JSON bytes for a row-without-hash, with object keys
543/// sorted lexicographically per PROVENANCE_LOG.md §4.
544///
545/// Implementation: serialize via `serde_json::to_value` to get a `Value`,
546/// require it be an object, then move its entries into a
547/// `BTreeMap<String, Value>` (which serializes with lex-sorted keys) and
548/// re-serialize compactly. No new dependency required.
549fn canonical_json_for_hash(rfh: &RowForHash<'_>) -> Result<Vec<u8>, LogError> {
550    let value = serde_json::to_value(rfh)?;
551    let map = match value {
552        serde_json::Value::Object(m) => m,
553        // RowForHash is always a struct, so this branch is unreachable in
554        // practice; surface as a serde error if it ever changes.
555        _ => {
556            return Err(LogError::Serialize(serde::de::Error::custom(
557                "RowForHash did not serialize to a JSON object",
558            )));
559        }
560    };
561    let sorted: BTreeMap<String, serde_json::Value> = map.into_iter().collect();
562    Ok(serde_json::to_vec(&sorted)?)
563}
564
565/// Compute `this_hash` for the given row-without-hash. Returns 64 lowercase
566/// hex chars.
567fn compute_this_hash(rfh: &RowForHash<'_>) -> Result<String, LogError> {
568    let bytes = canonical_json_for_hash(rfh)?;
569    let digest = Sha256::digest(&bytes);
570    Ok(hex::encode(digest))
571}
572
573impl ProvenanceLog {
574    /// Open or create the log at `path`, stamping every row with
575    /// `session_id`.
576    ///
577    /// `session_id` MUST be a 26-char ULID generated **once per process**
578    /// invocation by the caller. Re-opening the log within the same process
579    /// reuses the same `session_id`; re-opening in a new process gets a new
580    /// one. This crate intentionally does NOT generate the ULID itself —
581    /// callers are responsible for creating one (e.g. via the `ulid` crate
582    /// already present in the workspace) and threading it through.
583    ///
584    /// If the file exists, scan it once to recover the last `ts_seq` and
585    /// `this_hash`. If the file is missing or empty, the first row will use
586    /// `prev_hash = "GENESIS"` and `ts_seq = 1`.
587    ///
588    /// # Errors
589    ///
590    /// Returns [`LogError::Io`] for I/O failures or if any line fails to
591    /// parse as a [`LogRow`] (synthetic message: `"corrupted log at line N: …"`).
592    /// The writer never silently truncates a corrupt log.
593    ///
594    /// Returns [`LogError::NotARegularFile`] if `path` exists but is not a
595    /// regular file (e.g. a directory).
596    pub fn open(path: impl Into<Utf8PathBuf>, session_id: String) -> Result<Self, LogError> {
597        // Production path: the §6 threshold comes from
598        // `DOIGET_LOG_ROTATE_BYTES` (default 100 MiB), resolved ONCE here.
599        Self::open_with_rotate_threshold(path, session_id, rotate_threshold_bytes())
600    }
601
602    /// [`open`](Self::open) with an explicit rotation threshold instead
603    /// of reading `DOIGET_LOG_ROTATE_BYTES`.
604    ///
605    /// This exists so the rotation tests inject a tiny threshold WITHOUT
606    /// mutating the process-global env var: a global env knob raced
607    /// non-`#[serial]` tests (a concurrent test's `open` would cache the
608    /// tiny threshold and spuriously rotate). `#[serial]` only
609    /// serializes `#[serial]` tests, so injection — not serialization —
610    /// is the robust fix. `0` disables rotation.
611    pub(crate) fn open_with_rotate_threshold(
612        path: impl Into<Utf8PathBuf>,
613        session_id: String,
614        rotate_threshold: u64,
615    ) -> Result<Self, LogError> {
616        let path: Utf8PathBuf = path.into();
617
618        // Ensure the parent directory exists. The provenance log defaults to
619        // `<config>/doiget/access.jsonl`, and on a fresh machine (e.g. a CI
620        // runner where `~/.config/doiget` was never created) neither the
621        // recover-state read nor the first append can open the file — the
622        // append fails with ENOENT and `verify` fail-closes on the LogError.
623        // `create_dir_all` is idempotent; a genuine permission failure still
624        // surfaces as a `LogError` (the correct fail-closed signal).
625        if let Some(parent) = path.parent() {
626            if !parent.as_str().is_empty() {
627                std::fs::create_dir_all(parent.as_std_path())?;
628            }
629        }
630
631        // Reject obvious non-files up front so later `OpenOptions::append`
632        // doesn't produce a confusing platform-dependent error.
633        if path.exists() {
634            let md = std::fs::metadata(&path)?;
635            if !md.is_file() {
636                return Err(LogError::NotARegularFile(path));
637            }
638        }
639
640        let (next_seq, last_hash) = recover_state(&path)?;
641
642        // §6 retention: prune rotated `.gz` segments older than the
643        // window. Best-effort — pruning is housekeeping, not integrity,
644        // so a failure is logged and `open` still succeeds (unlike
645        // rotation, which is fail-closed).
646        prune_rotated_segments(&path, retention_days());
647
648        Ok(Self {
649            path,
650            state: Mutex::new(LogState {
651                next_seq,
652                last_hash,
653            }),
654            session_id,
655            rotate_threshold,
656        })
657    }
658
659    /// Append a row. Computes `prev_hash`, `ts_seq`, `ts`, `session_id`, and
660    /// `this_hash`; the caller only supplies the semantic fields via
661    /// [`RowInput`].
662    ///
663    /// Returns the assigned `ts_seq` on success.
664    ///
665    /// # Errors
666    ///
667    /// Returns [`LogError`] on serialization, I/O, or fsync failure. Callers
668    /// MUST treat this as fail-closed and abort the surrounding fetch.
669    pub fn append(&self, input: RowInput<'_>) -> Result<u64, LogError> {
670        // Hold the mutex for the entire append: serialize + write + flush +
671        // fsync + state update. This is the in-process serialization point
672        // promised by `docs/SECURITY.md` §1.8.
673        //
674        // A poisoned mutex only happens if a previous `append` panicked
675        // mid-write. Surface that as an I/O error rather than propagating
676        // a panic.
677        let mut state = self
678            .state
679            .lock()
680            .map_err(|_| LogError::Io(std::io::Error::other("provenance log mutex poisoned")))?;
681
682        // §6 rotation, BEFORE this row is written. If `access.log` has
683        // reached the threshold, gzip+rename it and reset the in-memory
684        // chain state so this row becomes the GENESIS-rooted first row of
685        // a fresh file. Fail-closed: a rotation error aborts the append
686        // (the `?`), so the caller's fetch aborts and the chain never
687        // silently continues in an over-size or half-rotated file. The
688        // `state` mutex is held, so rotation is serialized with appends.
689        let threshold = self.rotate_threshold;
690        if threshold > 0 {
691            let size = match std::fs::metadata(&self.path) {
692                Ok(m) => m.len(),
693                Err(e) if e.kind() == std::io::ErrorKind::NotFound => 0,
694                Err(e) => return Err(LogError::Io(e)),
695            };
696            if size >= threshold {
697                rotate_log(&self.path)?;
698                state.next_seq = 1;
699                state.last_hash = GENESIS_HASH.to_string();
700            }
701        }
702
703        let ts_seq = state.next_seq;
704        let prev_hash = state.last_hash.clone();
705        let ts = Utc::now();
706
707        let rfh = RowForHash {
708            ts,
709            ts_seq,
710            event: input.event,
711            ref_: input.ref_,
712            source: input.source,
713            result: input.result,
714            license: input.license,
715            size_bytes: input.size_bytes,
716            store_path: input.store_path,
717            capability: input.capability,
718            session_id: &self.session_id,
719            error_code: input.error_code,
720            schema_version: LOG_SCHEMA_VERSION,
721            canonical_digest: input.canonical_digest,
722            prev_hash: &prev_hash,
723        };
724
725        let this_hash = compute_this_hash(&rfh)?;
726
727        // Build the on-disk row. Owned strings here because `LogRow` does
728        // not borrow.
729        let row = LogRow {
730            ts,
731            ts_seq,
732            event: input.event,
733            ref_: input.ref_.map(str::to_string),
734            source: input.source.map(str::to_string),
735            result: input.result,
736            license: input.license.map(str::to_string),
737            size_bytes: input.size_bytes,
738            store_path: input.store_path.map(str::to_string),
739            capability: input.capability,
740            session_id: self.session_id.clone(),
741            error_code: input.error_code.map(str::to_string),
742            schema_version: LOG_SCHEMA_VERSION.to_string(),
743            canonical_digest: input.canonical_digest.map(str::to_string),
744            prev_hash,
745            this_hash: this_hash.clone(),
746        };
747
748        // Serialize, append `\n`, write_all in one syscall, flush BufWriter,
749        // fsync the underlying file. `\n` is part of the same buffer, so a
750        // crash mid-write leaves at most a partial line (no trailing `\n`),
751        // which is detectable on recovery as a corrupted final line.
752        let mut bytes = serde_json::to_vec(&row)?;
753        bytes.push(b'\n');
754
755        let file = OpenOptions::new()
756            .create(true)
757            .append(true)
758            .open(&self.path)?;
759        let mut writer = BufWriter::new(file);
760        writer.write_all(&bytes)?;
761        writer.flush()?;
762        // `into_inner` to recover the underlying File for `sync_all`.
763        let file = writer.into_inner().map_err(|e| {
764            LogError::Io(std::io::Error::other(format!(
765                "buf writer flush failed: {}",
766                e.error()
767            )))
768        })?;
769        file.sync_all()?;
770
771        // Only after a successful fsync do we advance the in-memory state.
772        // If any of the above fails, the next `append` retries from the
773        // same `(ts_seq, prev_hash)` — at most a torn last line on disk.
774        state.next_seq = ts_seq + 1;
775        state.last_hash = this_hash;
776
777        Ok(ts_seq)
778    }
779
780    /// Returns the path the log was opened at. Useful for tests and audit tooling.
781    pub fn path(&self) -> &Utf8Path {
782        &self.path
783    }
784
785    /// Returns the session id stamped into every row written through this
786    /// writer.
787    pub fn session_id(&self) -> &str {
788        &self.session_id
789    }
790}
791
792/// Scan an existing log to recover `(next_seq, last_hash)`.
793///
794/// Walk every line, parse as [`LogRow`], track the last successfully parsed
795/// row. If parsing fails, return [`LogError::Io`] with a synthetic
796/// `"corrupted log at line N: …"` message — never silently truncate.
797fn recover_state(path: &Utf8Path) -> Result<(u64, String), LogError> {
798    let file = match File::open(path) {
799        Ok(f) => f,
800        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
801            return Ok((1, GENESIS_HASH.to_string()));
802        }
803        Err(e) => return Err(LogError::Io(e)),
804    };
805
806    let reader = BufReader::new(file);
807    let mut last_seq: u64 = 0;
808    let mut last_hash: String = GENESIS_HASH.to_string();
809
810    for (idx, line_res) in reader.lines().enumerate() {
811        let line_no = idx + 1;
812        let line = line_res?;
813        if line.is_empty() {
814            // Tolerate trailing/empty lines silently — they are not data.
815            continue;
816        }
817        let row: LogRow = serde_json::from_str(&line).map_err(|e| {
818            LogError::Io(std::io::Error::new(
819                std::io::ErrorKind::InvalidData,
820                format!("corrupted log at line {}: {}", line_no, e),
821            ))
822        })?;
823        last_seq = row.ts_seq;
824        last_hash = row.this_hash;
825    }
826
827    if last_seq == 0 {
828        Ok((1, GENESIS_HASH.to_string()))
829    } else {
830        Ok((last_seq + 1, last_hash))
831    }
832}
833
834// ---------------------------------------------------------------------------
835// Verification (`doiget audit-log --verify`)
836//
837// The provenance log is a JSON Lines file with a SHA-256 hash chain
838// (PROVENANCE_LOG.md §4). Tampering is detected by recomputing every row's
839// `this_hash` and validating the chain. This module provides the offline
840// verifier; the CLI wrapper lives in `doiget-cli::commands::audit_log`.
841//
842// Failure model: returning `Err` is reserved for I/O failures opening / reading
843// the file. Per-row issues (parse failures, hash/chain mismatches, sequence
844// regressions) are accumulated into [`VerifyReport::errors`] so callers can
845// report them all in one pass — this is the contract Phase 1 ships.
846// ---------------------------------------------------------------------------
847
848/// Outcome of [`verify`]: per-row chain status across the entire log.
849#[derive(Debug, Clone)]
850#[non_exhaustive]
851pub struct VerifyReport {
852    /// Total non-empty lines processed (1-based count).
853    pub total_rows: usize,
854    /// Rows whose hash, chain link, and `ts_seq` all validated.
855    pub ok_rows: usize,
856    /// Issues encountered, in encounter order. Line numbers are 1-based.
857    pub errors: Vec<VerifyIssue>,
858}
859
860impl VerifyReport {
861    /// An empty, all-clear report — used when the log file is absent.
862    fn empty() -> Self {
863        Self {
864            total_rows: 0,
865            ok_rows: 0,
866            errors: Vec::new(),
867        }
868    }
869}
870
871/// A single issue discovered by [`verify`].
872#[derive(Debug, Clone)]
873#[non_exhaustive]
874pub struct VerifyIssue {
875    /// 1-based line number where the issue was detected.
876    pub line: usize,
877    /// Classification of the issue (see [`VerifyIssueKind`]).
878    pub kind: VerifyIssueKind,
879    /// Human-readable description (caller may format for stderr/stdout).
880    pub message: String,
881}
882
883/// Classification of a [`VerifyIssue`]. `non_exhaustive` for forward
884/// compatibility — future kinds may include `SessionIdChange`, etc.
885#[derive(Debug, Clone, Copy, PartialEq, Eq)]
886#[non_exhaustive]
887pub enum VerifyIssueKind {
888    /// Row failed to parse as [`LogRow`] (corrupted JSON or unknown field).
889    ParseError,
890    /// `prev_hash` did not match the previous row's `this_hash` (or the
891    /// genesis sentinel on row 1).
892    PrevHashMismatch,
893    /// Row's stored `this_hash` did not match the recomputed canonical-JSON
894    /// SHA-256.
895    ThisHashMismatch,
896    /// `ts_seq` did not increase strictly monotonically (within a session;
897    /// see PROVENANCE_LOG.md §3 + §6 — chain restarts after rotation are
898    /// permitted to reset `ts_seq` and are detected via the genesis sentinel).
899    SequenceJump,
900}
901
902/// Verify the entire log file at `path`.
903///
904/// Returns `Ok(VerifyReport)` regardless of whether the chain validates;
905/// callers inspect `report.errors.is_empty()` to determine pass/fail.
906/// Returns `Err` only when the file itself cannot be opened or read at the
907/// I/O level.
908///
909/// Behavior:
910///
911/// - A missing file is treated as a clean, empty log (no tampering possible
912///   on bytes that don't exist) and returns an empty report after a `warn!`.
913/// - Empty / blank lines are skipped — they are not data per the writer's
914///   on-disk format (PROVENANCE_LOG.md §2).
915/// - On a row that fails to parse as [`LogRow`], a `ParseError` is recorded
916///   and verification continues on the next line. The chain anchor does NOT
917///   advance through an unparsable row, so the next valid row's `prev_hash`
918///   is checked against the last successfully parsed row (or against
919///   `"GENESIS"` if no valid row has been seen yet).
920/// - A `prev_hash == "GENESIS"` sentinel marks a chain restart (first row of
921///   a fresh / rotated log per §6) and resets the `ts_seq` monotonicity
922///   anchor — `ts_seq` is NOT compared to the prior row across a restart.
923pub fn verify(path: &Utf8Path) -> Result<VerifyReport, LogError> {
924    let file = match File::open(path) {
925        Ok(f) => f,
926        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
927            tracing::warn!(
928                path = %path,
929                "audit-log verify: log file does not exist; reporting empty"
930            );
931            return Ok(VerifyReport::empty());
932        }
933        Err(e) => return Err(LogError::Io(e)),
934    };
935
936    let reader = BufReader::new(file);
937    let mut report = VerifyReport::empty();
938
939    // Anchor for the chain check: the LAST SUCCESSFULLY PARSED row. The chain
940    // is anchored to the bytes on disk, not to a hypothetical "should have
941    // been". This matches the spec — tampering at row N must surface both as
942    // a hash mismatch on N and as a chain break on N+1.
943    let mut prev_row: Option<LogRow> = None;
944
945    for (idx, line_res) in reader.lines().enumerate() {
946        let line_no = idx + 1;
947        let line = line_res?;
948        if line.is_empty() {
949            continue;
950        }
951
952        report.total_rows += 1;
953
954        let row: LogRow = match serde_json::from_str(&line) {
955            Ok(r) => r,
956            Err(e) => {
957                report.errors.push(VerifyIssue {
958                    line: line_no,
959                    kind: VerifyIssueKind::ParseError,
960                    message: format!("failed to parse row as LogRow: {e}"),
961                });
962                // Chain anchor cannot advance through an unparsable row;
963                // leave `prev_row` untouched so the next valid row's
964                // `prev_hash` is checked against the last-known anchor (or
965                // GENESIS if we never had one).
966                continue;
967            }
968        };
969
970        let mut row_ok = true;
971
972        // 1. Recompute `this_hash` from canonical JSON (row \ {this_hash}).
973        let rfh = RowForHash {
974            ts: row.ts,
975            ts_seq: row.ts_seq,
976            event: row.event,
977            ref_: row.ref_.as_deref(),
978            source: row.source.as_deref(),
979            result: row.result,
980            license: row.license.as_deref(),
981            size_bytes: row.size_bytes,
982            store_path: row.store_path.as_deref(),
983            capability: row.capability,
984            session_id: &row.session_id,
985            error_code: row.error_code.as_deref(),
986            schema_version: &row.schema_version,
987            canonical_digest: row.canonical_digest.as_deref(),
988            prev_hash: &row.prev_hash,
989        };
990        match compute_this_hash(&rfh) {
991            Ok(recomputed) => {
992                if recomputed != row.this_hash {
993                    report.errors.push(VerifyIssue {
994                        line: line_no,
995                        kind: VerifyIssueKind::ThisHashMismatch,
996                        message: format!(
997                            "this_hash mismatch: stored={}, recomputed={}",
998                            row.this_hash, recomputed
999                        ),
1000                    });
1001                    row_ok = false;
1002                }
1003            }
1004            Err(e) => {
1005                // Canonicalization itself failed — surface as a hash
1006                // mismatch with the underlying error in the message.
1007                report.errors.push(VerifyIssue {
1008                    line: line_no,
1009                    kind: VerifyIssueKind::ThisHashMismatch,
1010                    message: format!("failed to recompute this_hash: {e}"),
1011                });
1012                row_ok = false;
1013            }
1014        }
1015
1016        // 2. Chain link: `prev_hash` matches anchor (GENESIS on row 1 / after
1017        //    a chain restart, prior row's `this_hash` otherwise).
1018        let is_genesis = row.prev_hash == GENESIS_HASH;
1019        match &prev_row {
1020            None => {
1021                // First non-empty row in the file: must declare GENESIS.
1022                if !is_genesis {
1023                    report.errors.push(VerifyIssue {
1024                        line: line_no,
1025                        kind: VerifyIssueKind::PrevHashMismatch,
1026                        message: format!(
1027                            "first row must have prev_hash=\"GENESIS\", got {:?}",
1028                            row.prev_hash
1029                        ),
1030                    });
1031                    row_ok = false;
1032                }
1033            }
1034            Some(prev) => {
1035                if is_genesis {
1036                    // Chain restart (rotation per §6) — accepted, no link
1037                    // check, and the `ts_seq` monotonicity anchor resets
1038                    // (handled below via `is_genesis`).
1039                } else if row.prev_hash != prev.this_hash {
1040                    report.errors.push(VerifyIssue {
1041                        line: line_no,
1042                        kind: VerifyIssueKind::PrevHashMismatch,
1043                        message: format!(
1044                            "prev_hash mismatch: row stores {}, previous row's this_hash is {}",
1045                            row.prev_hash, prev.this_hash
1046                        ),
1047                    });
1048                    row_ok = false;
1049                }
1050            }
1051        }
1052
1053        // 3. ts_seq monotonicity — strictly greater than the previous row's
1054        //    `ts_seq`, EXCEPT across a chain restart (where `ts_seq` resets).
1055        if let Some(prev) = &prev_row {
1056            if !is_genesis && row.ts_seq <= prev.ts_seq {
1057                report.errors.push(VerifyIssue {
1058                    line: line_no,
1059                    kind: VerifyIssueKind::SequenceJump,
1060                    message: format!(
1061                        "ts_seq did not increase strictly: previous={}, current={}",
1062                        prev.ts_seq, row.ts_seq
1063                    ),
1064                });
1065                row_ok = false;
1066            }
1067        }
1068
1069        if row_ok {
1070            report.ok_rows += 1;
1071        }
1072
1073        // Advance the anchor to the just-parsed row (whether or not it had
1074        // issues — the on-disk bytes ARE the chain).
1075        prev_row = Some(row);
1076    }
1077
1078    Ok(report)
1079}
1080
1081// ---------------------------------------------------------------------------
1082// v1 → v2 migration (ADR-0024, `docs/PROVENANCE_LOG.md` §"Schema migration").
1083//
1084// v1 rows lack `schema_version` and `canonical_digest`; the v2 binary
1085// fails loudly when asked to read them (see `recover_state` /
1086// `verify`). The migration recovers a v2 log from a v1 file by:
1087//
1088//   1. Parsing every v1 row via the [`V1LogRow`] shadow struct.
1089//   2. Deriving a [`crate::CanonicalRef`] from the v1 `(ref, source)`
1090//      pair — `source` becomes `resolver_profile`, `version` is `None`
1091//      (ADR-0021 §1 → ADR-0024 migration recipe).
1092//   3. Re-computing the SHA-256 hash chain across the new row
1093//      payloads. The v1 chain is invalidated by the schema change; the
1094//      v2 chain restarts at the first row's stored `prev_hash` (which
1095//      is `"GENESIS"` on a fresh log).
1096//   4. Writing the new rows to `<log_path>.v2-migrated`, then
1097//      atomically renaming it onto `<log_path>` after backing up the
1098//      original to `<log_path>.v1-backup`.
1099//
1100// The migration is **idempotent**: running it on an already-v2 log
1101// re-parses every row as v2, recomputes the same hash chain, and
1102// produces a byte-equivalent output.
1103//
1104// The migration is **dry-runnable**: `dry_run = true` returns a
1105// [`MigrationReport`] summarizing what would change without touching
1106// disk.
1107// ---------------------------------------------------------------------------
1108
1109/// Summary of a [`migrate_v1_to_v2`] run.
1110///
1111/// Marked `#[non_exhaustive]` so future fields (e.g. a per-row error
1112/// list, an aborted-row count) can be added without breaking callers
1113/// that pattern-match.
1114///
1115/// `Serialize` enables `provenance migrate --mode json` (#204) — the
1116/// wire form is `{"rows_rewritten": N, "dry_run": bool,
1117/// "first_row_v1_chain_hash": "...", "first_row_v2_chain_hash": "..."}`.
1118///
1119/// # Wire-format stability (post-#208 self-review §1)
1120///
1121/// Once a release ships with the [`Serialize`] derive, the field
1122/// **names** below become part of the public API. Renaming a field is
1123/// then a semver minor bump and warrants a CHANGELOG \[BREAKING\] note;
1124/// new fields are still safe (per `#[non_exhaustive]`).
1125#[derive(Debug, Clone, Serialize)]
1126#[non_exhaustive]
1127pub struct MigrationReport {
1128    /// Number of rows rewritten (or that WOULD be rewritten under
1129    /// `dry_run`).
1130    pub rows_rewritten: u64,
1131    /// Whether this was a dry-run preview (`true`) or a live rewrite
1132    /// (`false`).
1133    pub dry_run: bool,
1134    /// Stored `this_hash` of the first input row (the v1 chain anchor).
1135    /// `"GENESIS"` is reported as the literal `"GENESIS"` when the log
1136    /// was empty.
1137    pub first_row_v1_chain_hash: String,
1138    /// Recomputed `this_hash` of the first migrated row under the v2
1139    /// canonicalization. Equal to [`Self::first_row_v1_chain_hash`]
1140    /// only if the input was already v2 (idempotent case).
1141    pub first_row_v2_chain_hash: String,
1142}
1143
1144/// v1 row shadow struct used ONLY by [`migrate_v1_to_v2`]. The
1145/// non-defaulted v2 fields (`schema_version`, `canonical_digest`) are
1146/// absent here; `deny_unknown_fields` rejects unexpected v2 fields so a
1147/// v2 row on disk fails to parse as v1, letting the migrator detect
1148/// already-v2 input via fallback to the v2 parser.
1149#[derive(Debug, Clone, Deserialize, Serialize)]
1150#[serde(deny_unknown_fields)]
1151struct V1LogRow {
1152    ts: DateTime<Utc>,
1153    ts_seq: u64,
1154    event: LogEvent,
1155    #[serde(rename = "ref")]
1156    ref_: Option<String>,
1157    source: Option<String>,
1158    result: LogResult,
1159    license: Option<String>,
1160    size_bytes: Option<u64>,
1161    store_path: Option<String>,
1162    capability: Capability,
1163    session_id: String,
1164    error_code: Option<String>,
1165    prev_hash: String,
1166    this_hash: String,
1167}
1168
1169/// Minimal in-memory representation a v1 OR v2 row can be promoted to
1170/// before re-hashing.
1171#[derive(Debug, Clone)]
1172struct MigrationRowSeed {
1173    ts: DateTime<Utc>,
1174    ts_seq: u64,
1175    event: LogEvent,
1176    ref_: Option<String>,
1177    source: Option<String>,
1178    result: LogResult,
1179    license: Option<String>,
1180    size_bytes: Option<u64>,
1181    store_path: Option<String>,
1182    capability: Capability,
1183    session_id: String,
1184    error_code: Option<String>,
1185    /// `None` for v1 inputs (the digest is computed during migration);
1186    /// `Some(...)` for already-v2 inputs (carried through verbatim for
1187    /// idempotency).
1188    canonical_digest_in: Option<String>,
1189    /// As stored on disk in the input. Used only for the
1190    /// `first_row_v1_chain_hash` field of [`MigrationReport`].
1191    stored_this_hash: String,
1192}
1193
1194/// Migrate a v1 provenance log to v2 (ADR-0024).
1195///
1196/// Returns a [`MigrationReport`] describing how many rows were (or
1197/// would be) rewritten and the first-row chain-anchor delta. The
1198/// migration is idempotent: running it twice produces byte-equivalent
1199/// output the second time.
1200///
1201/// On a missing log file, returns a no-op report (`rows_rewritten = 0`,
1202/// `first_row_v1_chain_hash = "GENESIS"`, `first_row_v2_chain_hash =
1203/// "GENESIS"`) — there is nothing to migrate.
1204///
1205/// # Errors
1206///
1207/// Returns [`LogError::Io`] on I/O failures and on rows that fail to
1208/// parse as either v1 or v2 (the synthetic message names the line
1209/// number). Returns [`LogError::Serialize`] on canonicalization
1210/// failures.
1211pub fn migrate_v1_to_v2(log_path: &Utf8Path, dry_run: bool) -> Result<MigrationReport, LogError> {
1212    use std::io::BufRead;
1213
1214    // -- 1. Read the input log, parsing each line as v1 OR (idempotent
1215    //       fallback) v2. --------------------------------------------------
1216    let file = match File::open(log_path) {
1217        Ok(f) => f,
1218        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1219            return Ok(MigrationReport {
1220                rows_rewritten: 0,
1221                dry_run,
1222                first_row_v1_chain_hash: GENESIS_HASH.to_string(),
1223                first_row_v2_chain_hash: GENESIS_HASH.to_string(),
1224            });
1225        }
1226        Err(e) => return Err(LogError::Io(e)),
1227    };
1228    let reader = BufReader::new(file);
1229    let mut seeds: Vec<MigrationRowSeed> = Vec::new();
1230
1231    for (idx, line_res) in reader.lines().enumerate() {
1232        let line_no = idx + 1;
1233        let line = line_res?;
1234        if line.is_empty() {
1235            continue;
1236        }
1237        // Try v1 first. If it fails, try v2 (idempotency: re-migrating
1238        // a v2 log MUST succeed and produce equivalent output).
1239        let seed = if let Ok(v1) = serde_json::from_str::<V1LogRow>(&line) {
1240            MigrationRowSeed {
1241                ts: v1.ts,
1242                ts_seq: v1.ts_seq,
1243                event: v1.event,
1244                ref_: v1.ref_,
1245                source: v1.source,
1246                result: v1.result,
1247                license: v1.license,
1248                size_bytes: v1.size_bytes,
1249                store_path: v1.store_path,
1250                capability: v1.capability,
1251                session_id: v1.session_id,
1252                error_code: v1.error_code,
1253                canonical_digest_in: None,
1254                stored_this_hash: v1.this_hash,
1255            }
1256        } else {
1257            match serde_json::from_str::<LogRow>(&line) {
1258                Ok(v2) => MigrationRowSeed {
1259                    ts: v2.ts,
1260                    ts_seq: v2.ts_seq,
1261                    event: v2.event,
1262                    ref_: v2.ref_,
1263                    source: v2.source,
1264                    result: v2.result,
1265                    license: v2.license,
1266                    size_bytes: v2.size_bytes,
1267                    store_path: v2.store_path,
1268                    capability: v2.capability,
1269                    session_id: v2.session_id,
1270                    error_code: v2.error_code,
1271                    canonical_digest_in: v2.canonical_digest,
1272                    stored_this_hash: v2.this_hash,
1273                },
1274                Err(e) => {
1275                    return Err(LogError::Io(std::io::Error::new(
1276                        std::io::ErrorKind::InvalidData,
1277                        format!("migration: line {line_no} is neither v1 nor v2: {e}"),
1278                    )));
1279                }
1280            }
1281        };
1282        seeds.push(seed);
1283    }
1284
1285    // -- 2. Derive `canonical_digest` for each seed that lacks one. ------
1286    //
1287    // For v1 rows: build a CanonicalRef from
1288    //   - source_type from `event`/`ref` shape (DOI prefix `10.` vs
1289    //     arXiv) — we use a heuristic that matches `Ref::parse`'s rule
1290    //     (`starts_with "10."` ⇒ DOI; else arXiv).
1291    //   - source_id = ref value (verbatim).
1292    //   - resolver_profile = source value (verbatim, ADR-0021 §3
1293    //     migration recipe).
1294    //   - version = None.
1295    //
1296    // Rows without a `ref` (session bookend) keep `canonical_digest =
1297    // None` per the v2 row contract.
1298
1299    fn derive_digest(seed: &MigrationRowSeed) -> Option<String> {
1300        let ref_str = seed.ref_.as_deref()?;
1301        let source_key = seed.source.as_deref().unwrap_or("");
1302        // Heuristic: bare DOIs always start `10.`; everything else is
1303        // treated as an arXiv id. Mirrors `Ref::parse` rule 3/4.
1304        let source_type = if ref_str.starts_with("10.") {
1305            crate::SourceType::Doi
1306        } else {
1307            crate::SourceType::Arxiv
1308        };
1309        let c = crate::CanonicalRef::new(source_type, ref_str, source_key, None);
1310        Some(c.digest_hex())
1311    }
1312
1313    let digests: Vec<Option<String>> = seeds
1314        .iter()
1315        .map(|s| s.canonical_digest_in.clone().or_else(|| derive_digest(s)))
1316        .collect();
1317
1318    // -- 3. Rebuild the hash chain across the v2 payloads. ----------------
1319    let mut out_rows: Vec<LogRow> = Vec::with_capacity(seeds.len());
1320    let mut prev_hash: String = GENESIS_HASH.to_string();
1321
1322    for (seed, digest) in seeds.iter().zip(digests.iter()) {
1323        let rfh = RowForHash {
1324            ts: seed.ts,
1325            ts_seq: seed.ts_seq,
1326            event: seed.event,
1327            ref_: seed.ref_.as_deref(),
1328            source: seed.source.as_deref(),
1329            result: seed.result,
1330            license: seed.license.as_deref(),
1331            size_bytes: seed.size_bytes,
1332            store_path: seed.store_path.as_deref(),
1333            capability: seed.capability,
1334            session_id: &seed.session_id,
1335            error_code: seed.error_code.as_deref(),
1336            schema_version: LOG_SCHEMA_VERSION,
1337            canonical_digest: digest.as_deref(),
1338            prev_hash: &prev_hash,
1339        };
1340        let this_hash = compute_this_hash(&rfh)?;
1341        let row = LogRow {
1342            ts: seed.ts,
1343            ts_seq: seed.ts_seq,
1344            event: seed.event,
1345            ref_: seed.ref_.clone(),
1346            source: seed.source.clone(),
1347            result: seed.result,
1348            license: seed.license.clone(),
1349            size_bytes: seed.size_bytes,
1350            store_path: seed.store_path.clone(),
1351            capability: seed.capability,
1352            session_id: seed.session_id.clone(),
1353            error_code: seed.error_code.clone(),
1354            schema_version: LOG_SCHEMA_VERSION.to_string(),
1355            canonical_digest: digest.clone(),
1356            prev_hash: prev_hash.clone(),
1357            this_hash: this_hash.clone(),
1358        };
1359        prev_hash = this_hash;
1360        out_rows.push(row);
1361    }
1362
1363    // -- 4. Build the report. --------------------------------------------
1364    let first_v1_hash = seeds
1365        .first()
1366        .map(|s| s.stored_this_hash.clone())
1367        .unwrap_or_else(|| GENESIS_HASH.to_string());
1368    let first_v2_hash = out_rows
1369        .first()
1370        .map(|r| r.this_hash.clone())
1371        .unwrap_or_else(|| GENESIS_HASH.to_string());
1372    let report = MigrationReport {
1373        rows_rewritten: out_rows.len() as u64,
1374        dry_run,
1375        first_row_v1_chain_hash: first_v1_hash,
1376        first_row_v2_chain_hash: first_v2_hash,
1377    };
1378
1379    if dry_run {
1380        return Ok(report);
1381    }
1382
1383    // -- 5. Live write: stage to `<log_path>.v2-migrated`, back up the
1384    //       v1, then atomically rename. -----------------------------------
1385    let staged_path = with_suffix(log_path, ".v2-migrated");
1386    let backup_path = with_suffix(log_path, ".v1-backup");
1387
1388    {
1389        let staged_file = OpenOptions::new()
1390            .create(true)
1391            .write(true)
1392            .truncate(true)
1393            .open(&staged_path)?;
1394        let mut writer = BufWriter::new(staged_file);
1395        for row in &out_rows {
1396            let mut bytes = serde_json::to_vec(row)?;
1397            bytes.push(b'\n');
1398            writer.write_all(&bytes)?;
1399        }
1400        writer.flush()?;
1401        let file = writer.into_inner().map_err(|e| {
1402            LogError::Io(std::io::Error::other(format!(
1403                "migration buf writer flush failed: {}",
1404                e.error()
1405            )))
1406        })?;
1407        file.sync_all()?;
1408    }
1409
1410    // Sanity-check: the staged file MUST verify clean before we
1411    // commit the swap. If it doesn't, the migration is buggy — abort
1412    // without touching the live log.
1413    let verify_report = verify(&staged_path)?;
1414    if !verify_report.errors.is_empty() {
1415        return Err(LogError::Io(std::io::Error::other(format!(
1416            "migration: staged v2 log failed verify; first issue: {:?}",
1417            verify_report.errors.first()
1418        ))));
1419    }
1420
1421    // Move the original aside as `<log_path>.v1-backup`. Overwriting
1422    // any prior backup is intentional — the user re-running migrate
1423    // expects the most recent original preserved.
1424    if log_path.exists() {
1425        if backup_path.exists() {
1426            std::fs::remove_file(&backup_path)?;
1427        }
1428        std::fs::rename(log_path, &backup_path)?;
1429    }
1430    // Atomically promote the staged file to the live path.
1431    std::fs::rename(&staged_path, log_path)?;
1432
1433    Ok(report)
1434}
1435
1436/// Append a literal suffix to a [`Utf8Path`], producing a sibling path
1437/// in the same directory. Avoids `std::path::PathBuf` per the workspace
1438/// posture rule (`docs/SECURITY.md` §3 — camino-only file paths in
1439/// production code).
1440fn with_suffix(path: &Utf8Path, suffix: &str) -> Utf8PathBuf {
1441    let s = format!("{path}{suffix}");
1442    Utf8PathBuf::from(s)
1443}
1444
1445// ---------------------------------------------------------------------------
1446// Tests
1447// ---------------------------------------------------------------------------
1448
1449#[cfg(test)]
1450#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
1451mod tests {
1452    use super::*;
1453    use std::fs;
1454    use std::sync::Arc;
1455    use std::thread;
1456
1457    use tempfile::TempDir;
1458
1459    /// Convert a `TempDir`'s `&std::path::Path` to a `Utf8PathBuf`. Tests
1460    /// always run on UTF-8 temp paths in CI; if the OS returns a non-UTF-8
1461    /// path we panic, which is acceptable for a unit test.
1462    fn tmp_dir_utf8(dir: &TempDir) -> Utf8PathBuf {
1463        Utf8PathBuf::from_path_buf(dir.path().to_path_buf()).expect("temp dir path must be UTF-8")
1464    }
1465
1466    /// A fixed 26-char ULID-shaped string used in tests. Real callers use
1467    /// the `ulid` crate; tests pin a constant so output is reproducible.
1468    const TEST_SESSION_ID: &str = "01JCKZ7Q0000000000000000AB";
1469
1470    fn open_log(path: &Utf8Path) -> ProvenanceLog {
1471        ProvenanceLog::open(path, TEST_SESSION_ID.to_string()).expect("open")
1472    }
1473
1474    #[test]
1475    fn open_creates_missing_parent_dir() {
1476        // Regression: opening a log whose parent dir does not yet exist must
1477        // create the dir and succeed (then a row appends cleanly), not abort
1478        // with ENOENT. This is the `doiget verify` failure on a fresh CI
1479        // runner where `<config>/doiget/` was never created.
1480        let dir = TempDir::new().expect("tempdir");
1481        let path = tmp_dir_utf8(&dir)
1482            .join("nested")
1483            .join("doiget")
1484            .join("access.jsonl");
1485        assert!(
1486            !path.parent().expect("has parent").exists(),
1487            "parent dir must not pre-exist for this test to be meaningful"
1488        );
1489        let log = ProvenanceLog::open(&path, TEST_SESSION_ID.to_string())
1490            .expect("open must create the parent dir and succeed");
1491        log.append(empty_input())
1492            .expect("append after auto-created dir");
1493        assert!(path.exists(), "log file written under the auto-created dir");
1494        // End-to-end: the row the verify path would write is actually
1495        // readable back (exercises the full OpenOptions/flush/sync write,
1496        // not just that the file exists).
1497        let rows = read_rows(&path);
1498        assert_eq!(rows.len(), 1, "exactly one row in the auto-created log");
1499    }
1500
1501    fn empty_input() -> RowInput<'static> {
1502        RowInput {
1503            event: LogEvent::Fetch,
1504            result: LogResult::Ok,
1505            capability: Capability::Oa,
1506            ref_: None,
1507            source: None,
1508            error_code: None,
1509            size_bytes: None,
1510            license: None,
1511            store_path: None,
1512            canonical_digest: None,
1513        }
1514    }
1515
1516    /// Read the on-disk log and parse every line into a `LogRow`.
1517    fn read_rows(path: &Utf8Path) -> Vec<LogRow> {
1518        let raw = fs::read_to_string(path).expect("read log");
1519        raw.lines()
1520            .filter(|l| !l.is_empty())
1521            .map(|l| serde_json::from_str::<LogRow>(l).expect("valid LogRow"))
1522            .collect()
1523    }
1524
1525    /// Recompute `this_hash` for a stored row and assert it matches the
1526    /// stored value. Walks the same canonicalization rule as
1527    /// [`compute_this_hash`].
1528    fn verify_this_hash(row: &LogRow) {
1529        let rfh = RowForHash {
1530            ts: row.ts,
1531            ts_seq: row.ts_seq,
1532            event: row.event,
1533            ref_: row.ref_.as_deref(),
1534            source: row.source.as_deref(),
1535            result: row.result,
1536            license: row.license.as_deref(),
1537            size_bytes: row.size_bytes,
1538            store_path: row.store_path.as_deref(),
1539            capability: row.capability,
1540            session_id: &row.session_id,
1541            error_code: row.error_code.as_deref(),
1542            schema_version: &row.schema_version,
1543            canonical_digest: row.canonical_digest.as_deref(),
1544            prev_hash: &row.prev_hash,
1545        };
1546        let recomputed = compute_this_hash(&rfh).expect("hash");
1547        assert_eq!(
1548            recomputed, row.this_hash,
1549            "this_hash mismatch on ts_seq {}",
1550            row.ts_seq
1551        );
1552    }
1553
1554    #[test]
1555    fn first_row_uses_genesis_prev_hash() {
1556        let dir = TempDir::new().expect("tmp");
1557        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1558        let log = open_log(&path);
1559        let seq = log.append(empty_input()).expect("append");
1560        assert_eq!(seq, 1);
1561
1562        let rows = read_rows(&path);
1563        assert_eq!(rows.len(), 1);
1564        assert_eq!(rows[0].ts_seq, 1);
1565        assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1566        assert_eq!(rows[0].this_hash.len(), 64);
1567        assert_eq!(rows[0].session_id, TEST_SESSION_ID);
1568        verify_this_hash(&rows[0]);
1569    }
1570
1571    #[test]
1572    fn subsequent_rows_chain_correctly() {
1573        let dir = TempDir::new().expect("tmp");
1574        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1575        let log = open_log(&path);
1576
1577        for _ in 0..3 {
1578            log.append(empty_input()).expect("append");
1579        }
1580
1581        let rows = read_rows(&path);
1582        assert_eq!(rows.len(), 3);
1583        assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1584        assert_eq!(rows[1].prev_hash, rows[0].this_hash);
1585        assert_eq!(rows[2].prev_hash, rows[1].this_hash);
1586        for r in &rows {
1587            verify_this_hash(r);
1588        }
1589        assert_eq!(rows[0].ts_seq, 1);
1590        assert_eq!(rows[1].ts_seq, 2);
1591        assert_eq!(rows[2].ts_seq, 3);
1592    }
1593
1594    #[test]
1595    fn recovery_after_reopen() {
1596        let dir = TempDir::new().expect("tmp");
1597        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1598
1599        {
1600            let log = open_log(&path);
1601            for _ in 0..3 {
1602                log.append(empty_input()).expect("append");
1603            }
1604        } // drop writer
1605
1606        let log2 = open_log(&path);
1607        let seq = log2.append(empty_input()).expect("append after reopen");
1608        assert_eq!(seq, 4);
1609
1610        let rows = read_rows(&path);
1611        assert_eq!(rows.len(), 4);
1612        assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1613        for i in 1..rows.len() {
1614            assert_eq!(
1615                rows[i].prev_hash,
1616                rows[i - 1].this_hash,
1617                "chain break at row {}",
1618                i + 1
1619            );
1620        }
1621        for (i, r) in rows.iter().enumerate() {
1622            assert_eq!(r.ts_seq, (i + 1) as u64);
1623            verify_this_hash(r);
1624        }
1625    }
1626
1627    #[test]
1628    fn concurrent_writers_in_same_process_serialize() {
1629        let dir = TempDir::new().expect("tmp");
1630        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1631        let log = Arc::new(open_log(&path));
1632
1633        let mut handles = Vec::with_capacity(8);
1634        for _ in 0..8 {
1635            let log = Arc::clone(&log);
1636            handles.push(thread::spawn(move || {
1637                log.append(empty_input()).expect("append")
1638            }));
1639        }
1640        let mut returned: Vec<u64> = handles
1641            .into_iter()
1642            .map(|h| h.join().expect("join"))
1643            .collect();
1644        returned.sort_unstable();
1645        assert_eq!(returned, vec![1, 2, 3, 4, 5, 6, 7, 8]);
1646
1647        let rows = read_rows(&path);
1648        assert_eq!(rows.len(), 8);
1649
1650        // The in-process mutex serializes appends, so file order MUST equal
1651        // ts_seq order: row N (0-indexed) on disk has ts_seq = N+1.
1652        for (i, r) in rows.iter().enumerate() {
1653            assert_eq!(r.ts_seq, (i + 1) as u64, "ts_seq gap at file row {}", i + 1);
1654        }
1655        // Hash chain follows file order.
1656        assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1657        for i in 1..rows.len() {
1658            assert_eq!(
1659                rows[i].prev_hash,
1660                rows[i - 1].this_hash,
1661                "chain break at file row {}",
1662                i + 1
1663            );
1664        }
1665        for r in &rows {
1666            verify_this_hash(r);
1667        }
1668    }
1669
1670    #[test]
1671    fn corrupted_existing_log_fails_open() {
1672        let dir = TempDir::new().expect("tmp");
1673        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1674
1675        // JSON but not a valid LogRow: missing required fields, has unknown
1676        // field. `deny_unknown_fields` ensures the parser refuses.
1677        fs::write(&path, "{\"ts_seq\": 1, \"garbage\": true}\n").expect("write");
1678
1679        let err =
1680            ProvenanceLog::open(&path, TEST_SESSION_ID.to_string()).expect_err("must fail open");
1681        match err {
1682            LogError::Io(io) => {
1683                let msg = io.to_string();
1684                assert!(
1685                    msg.contains("corrupted log at line 1"),
1686                    "expected synthetic corruption message, got: {}",
1687                    msg
1688                );
1689            }
1690            other => panic!("expected LogError::Io, got {:?}", other),
1691        }
1692    }
1693
1694    #[test]
1695    fn rejects_non_regular_file() {
1696        // Pointing the log at a directory must fail with NotARegularFile.
1697        let dir = TempDir::new().expect("tmp");
1698        let err = ProvenanceLog::open(tmp_dir_utf8(&dir), TEST_SESSION_ID.to_string())
1699            .expect_err("must fail");
1700        match err {
1701            LogError::NotARegularFile(_) => {}
1702            other => panic!("expected NotARegularFile, got {:?}", other),
1703        }
1704    }
1705
1706    #[test]
1707    fn canonical_json_excludes_this_hash_field() {
1708        // Spec contract: the hashed bytes do not include `this_hash`. If
1709        // this ever regresses, every previously-written log becomes
1710        // unverifiable.
1711        let rfh = RowForHash {
1712            ts: Utc::now(),
1713            ts_seq: 1,
1714            event: LogEvent::Fetch,
1715            ref_: None,
1716            source: None,
1717            result: LogResult::Ok,
1718            license: None,
1719            size_bytes: None,
1720            store_path: None,
1721            capability: Capability::Oa,
1722            session_id: TEST_SESSION_ID,
1723            error_code: None,
1724            schema_version: LOG_SCHEMA_VERSION,
1725            canonical_digest: None,
1726            prev_hash: GENESIS_HASH,
1727        };
1728        let bytes = canonical_json_for_hash(&rfh).expect("canonicalize");
1729        let s = std::str::from_utf8(&bytes).expect("utf8");
1730        assert!(!s.contains("this_hash"), "this_hash leaked into hash input");
1731        assert!(s.contains("\"prev_hash\":"));
1732    }
1733
1734    #[test]
1735    fn canonical_json_keys_are_lexicographically_sorted() {
1736        // PROVENANCE_LOG.md §4: canonical JSON uses keys sorted
1737        // lexicographically. The lex-first top-level key of a row is
1738        // `capability` ("c..." < "e..." < ...). Build a row and assert the
1739        // canonical bytes start with that key.
1740        let rfh = RowForHash {
1741            ts: Utc::now(),
1742            ts_seq: 1,
1743            event: LogEvent::Fetch,
1744            ref_: Some("10.1234/example"),
1745            source: Some("unpaywall"),
1746            result: LogResult::Ok,
1747            license: Some("CC-BY-4.0"),
1748            size_bytes: Some(1234),
1749            store_path: Some("papers/x.pdf"),
1750            capability: Capability::Oa,
1751            session_id: TEST_SESSION_ID,
1752            error_code: None,
1753            schema_version: LOG_SCHEMA_VERSION,
1754            canonical_digest: Some(
1755                "0000000000000000000000000000000000000000000000000000000000000000",
1756            ),
1757            prev_hash: GENESIS_HASH,
1758        };
1759        let bytes = canonical_json_for_hash(&rfh).expect("canonicalize");
1760        let s = std::str::from_utf8(&bytes).expect("utf8");
1761        // v2: lex-first key is `canonical_digest` (< `capability` because
1762        // 'n' < 'p' at byte index 2). Pre-v2 it was `capability`.
1763        assert!(
1764            s.starts_with("{\"canonical_digest\":"),
1765            "canonical bytes must start with lex-first v2 key, got: {}",
1766            s
1767        );
1768        // Spot-check ordering: `prev_hash` (p) must come before `ref` (r),
1769        // which must come before `result` (re...) — wait, "ref" < "result"
1770        // lexicographically because 'f' < 's' in ascii at index 2 vs 'e' at
1771        // index 2 of "result"... let me just check a couple of unambiguous
1772        // pairs: `event` < `prev_hash`, and `ts` < `ts_seq`.
1773        let event_idx = s.find("\"event\":").expect("event key present");
1774        let prev_idx = s.find("\"prev_hash\":").expect("prev_hash key present");
1775        assert!(event_idx < prev_idx, "event must precede prev_hash");
1776        let ts_idx = s.find("\"ts\":").expect("ts key present");
1777        let tsseq_idx = s.find("\"ts_seq\":").expect("ts_seq key present");
1778        assert!(ts_idx < tsseq_idx, "ts must precede ts_seq");
1779    }
1780
1781    // -----------------------------------------------------------------
1782    // verify() tests — Phase 1 surface for `doiget audit-log --verify`.
1783    // -----------------------------------------------------------------
1784
1785    /// Rewrite a single field's quoted-string value on a specific 1-based
1786    /// line of `path`. Used to simulate tampering. Panics on malformed input
1787    /// — only valid inputs are produced by the test harness.
1788    ///
1789    /// `field_key` is matched as `"field_key":"...old..."` (quoted string
1790    /// JSON value). The new value is the literal string `new_value` (no
1791    /// JSON escaping needed for the test fixtures we use).
1792    fn tamper_string_field(
1793        path: &Utf8Path,
1794        line_no_1based: usize,
1795        field_key: &str,
1796        new_value: &str,
1797    ) {
1798        let raw = fs::read_to_string(path).expect("read log");
1799        let mut lines: Vec<String> = raw.lines().map(str::to_string).collect();
1800        let target = &lines[line_no_1based - 1];
1801        let needle = format!("\"{field_key}\":\"");
1802        let start = target
1803            .find(&needle)
1804            .unwrap_or_else(|| panic!("field {field_key} not found on line {line_no_1based}"))
1805            + needle.len();
1806        let end_rel = target[start..]
1807            .find('"')
1808            .unwrap_or_else(|| panic!("unterminated string for field {field_key}"));
1809        let end = start + end_rel;
1810        let mut new_line = String::with_capacity(target.len());
1811        new_line.push_str(&target[..start]);
1812        new_line.push_str(new_value);
1813        new_line.push_str(&target[end..]);
1814        lines[line_no_1based - 1] = new_line;
1815        let mut out = lines.join("\n");
1816        out.push('\n');
1817        fs::write(path, out).expect("write tampered log");
1818    }
1819
1820    #[test]
1821    fn verify_empty_log_is_ok() {
1822        // Missing file is a clean log — no tampering possible on bytes that
1823        // don't exist. `verify` returns an empty report, not an error.
1824        let dir = TempDir::new().expect("tmp");
1825        let path = tmp_dir_utf8(&dir).join("nonexistent.jsonl");
1826        assert!(!path.exists(), "precondition: file must not exist");
1827
1828        let report = verify(&path).expect("verify must not error on missing file");
1829        assert_eq!(report.total_rows, 0);
1830        assert_eq!(report.ok_rows, 0);
1831        assert!(report.errors.is_empty(), "errors: {:?}", report.errors);
1832    }
1833
1834    #[test]
1835    fn verify_well_formed_chain_passes() {
1836        // Three rows written via the real writer must verify clean.
1837        let dir = TempDir::new().expect("tmp");
1838        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1839        let log = open_log(&path);
1840        for _ in 0..3 {
1841            log.append(empty_input()).expect("append");
1842        }
1843
1844        let report = verify(&path).expect("verify must succeed");
1845        assert_eq!(report.total_rows, 3);
1846        assert_eq!(report.ok_rows, 3);
1847        assert!(
1848            report.errors.is_empty(),
1849            "expected no issues on a well-formed log; got: {:?}",
1850            report.errors
1851        );
1852    }
1853
1854    #[test]
1855    fn verify_detects_tampered_row_hash() {
1856        // Mutate the SECOND row's `this_hash` to a syntactically-valid but
1857        // wrong hash. The recomputed canonical-JSON SHA-256 will not match.
1858        let dir = TempDir::new().expect("tmp");
1859        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1860        let log = open_log(&path);
1861        log.append(empty_input()).expect("append 1");
1862        log.append(empty_input()).expect("append 2");
1863        drop(log);
1864
1865        // 64 lowercase hex chars, all zeros — passes `LogRow` parse, fails hash check.
1866        tamper_string_field(
1867            &path,
1868            2,
1869            "this_hash",
1870            "0000000000000000000000000000000000000000000000000000000000000000",
1871        );
1872
1873        let report = verify(&path).expect("verify must succeed");
1874        assert_eq!(report.total_rows, 2);
1875        // Row 2's hash mismatch breaks both the hash check on row 2 AND the
1876        // chain link from row 2's stored `prev_hash` (still correct) into the
1877        // forward direction. There's no row 3 to fail forward, so we expect
1878        // exactly one issue: the this-hash mismatch on line 2.
1879        let hash_issues: Vec<_> = report
1880            .errors
1881            .iter()
1882            .filter(|e| e.kind == VerifyIssueKind::ThisHashMismatch)
1883            .collect();
1884        assert_eq!(
1885            hash_issues.len(),
1886            1,
1887            "expected exactly one ThisHashMismatch, got {:?}",
1888            report.errors
1889        );
1890        assert_eq!(hash_issues[0].line, 2);
1891    }
1892
1893    #[test]
1894    fn verify_detects_tampered_prev_hash() {
1895        // Mutate the SECOND row's `prev_hash` to a wrong value. This
1896        // invalidates the chain link but the row's own `this_hash` was
1897        // computed with the original `prev_hash`, so the this-hash check
1898        // ALSO fails (hash input changed). We assert at least the prev-hash
1899        // issue is reported on line 2.
1900        let dir = TempDir::new().expect("tmp");
1901        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1902        let log = open_log(&path);
1903        log.append(empty_input()).expect("append 1");
1904        log.append(empty_input()).expect("append 2");
1905        drop(log);
1906
1907        tamper_string_field(
1908            &path,
1909            2,
1910            "prev_hash",
1911            "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
1912        );
1913
1914        let report = verify(&path).expect("verify must succeed");
1915        assert_eq!(report.total_rows, 2);
1916        let prev_issues: Vec<_> = report
1917            .errors
1918            .iter()
1919            .filter(|e| e.kind == VerifyIssueKind::PrevHashMismatch)
1920            .collect();
1921        assert_eq!(
1922            prev_issues.len(),
1923            1,
1924            "expected exactly one PrevHashMismatch, got {:?}",
1925            report.errors
1926        );
1927        assert_eq!(prev_issues[0].line, 2);
1928    }
1929
1930    #[test]
1931    fn verify_detects_corrupted_json() {
1932        // One valid row plus a literal `{"garbage":true}` line. The garbage
1933        // line fails `serde_json::from_str::<LogRow>` (missing fields +
1934        // `deny_unknown_fields`) and surfaces as a `ParseError` on line 2.
1935        let dir = TempDir::new().expect("tmp");
1936        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1937        let log = open_log(&path);
1938        log.append(empty_input()).expect("append 1");
1939        drop(log);
1940
1941        // Append a garbage line directly.
1942        let mut existing = fs::read_to_string(&path).expect("read");
1943        if !existing.ends_with('\n') {
1944            existing.push('\n');
1945        }
1946        existing.push_str("{\"garbage\":true}\n");
1947        fs::write(&path, existing).expect("write");
1948
1949        let report = verify(&path).expect("verify must succeed");
1950        // total_rows counts non-empty lines, so both lines are counted.
1951        assert_eq!(report.total_rows, 2);
1952        let parse_issues: Vec<_> = report
1953            .errors
1954            .iter()
1955            .filter(|e| e.kind == VerifyIssueKind::ParseError)
1956            .collect();
1957        assert_eq!(
1958            parse_issues.len(),
1959            1,
1960            "expected exactly one ParseError, got {:?}",
1961            report.errors
1962        );
1963        assert_eq!(parse_issues[0].line, 2);
1964    }
1965
1966    #[test]
1967    fn capability_serializes_kebab_case() {
1968        // PROVENANCE_LOG.md §3 requires `oa`, `metadata`, `tdm-elsevier`,
1969        // `tdm-aps`, `tdm-springer` on the wire (kebab-case).
1970        let cases = [
1971            (Capability::Oa, "\"oa\""),
1972            (Capability::Metadata, "\"metadata\""),
1973            (Capability::TdmElsevier, "\"tdm-elsevier\""),
1974            (Capability::TdmAps, "\"tdm-aps\""),
1975            (Capability::TdmSpringer, "\"tdm-springer\""),
1976        ];
1977        for (cap, expected) in cases {
1978            let got = serde_json::to_string(&cap).expect("serialize");
1979            assert_eq!(
1980                got, expected,
1981                "capability wire format mismatch for {:?}",
1982                cap
1983            );
1984        }
1985    }
1986
1987    // -----------------------------------------------------------------
1988    // #140 — §6 rotation, retention, multi-segment verify.
1989    // -----------------------------------------------------------------
1990
1991    fn gunzip_to_string(gz: &Utf8Path) -> String {
1992        use std::io::Read;
1993        let f = std::fs::File::open(gz.as_std_path()).expect("open gz");
1994        let mut dec = GzDecoder::new(f);
1995        let mut s = String::new();
1996        dec.read_to_string(&mut s).expect("gunzip");
1997        s
1998    }
1999
2000    #[test]
2001    fn rotation_archives_to_gz_and_restarts_genesis_chain() {
2002        let dir = TempDir::new().expect("tmp");
2003        let path = tmp_dir_utf8(&dir).join("access.log");
2004        // Inject a tiny threshold (NOT a global env var — that raced
2005        // non-#[serial] tests): row 1 fits, so the SECOND append
2006        // (size>=50) rotates before it writes. A freshly rotated `.gz`
2007        // is not retention-aged, so the default prune at open is a no-op.
2008        let log = ProvenanceLog::open_with_rotate_threshold(&path, TEST_SESSION_ID.to_string(), 50)
2009            .expect("open");
2010        log.append(empty_input()).expect("append 1");
2011        let row1 = read_rows(&path);
2012        assert_eq!(row1.len(), 1);
2013        assert_eq!(row1[0].prev_hash, GENESIS_HASH);
2014
2015        log.append(empty_input()).expect("append 2 (rotates first)");
2016
2017        // Exactly one rotated segment; it gunzips to the original row 1.
2018        let segs = rotated_segments(&path);
2019        assert_eq!(segs.len(), 1, "one .gz segment expected; got {segs:?}");
2020        let archived: Vec<LogRow> = gunzip_to_string(&segs[0])
2021            .lines()
2022            .filter(|l| !l.is_empty())
2023            .map(|l| serde_json::from_str(l).expect("row"))
2024            .collect();
2025        assert_eq!(archived.len(), 1);
2026        assert_eq!(archived[0].this_hash, row1[0].this_hash);
2027
2028        // The fresh access.log restarts the chain at GENESIS, ts_seq 1.
2029        let cur = read_rows(&path);
2030        assert_eq!(cur.len(), 1, "fresh segment holds only the post-rotate row");
2031        assert_eq!(cur[0].prev_hash, GENESIS_HASH);
2032        assert_eq!(cur[0].ts_seq, 1);
2033
2034        // verify_all sees both segments, each its own clean chain.
2035        let reports = verify_all(&path).expect("verify_all");
2036        assert_eq!(reports.len(), 2, "rotated .gz + current");
2037        for (p, r) in &reports {
2038            assert!(r.errors.is_empty(), "segment {p} must verify clean: {r:?}");
2039        }
2040    }
2041
2042    #[test]
2043    fn rotate_log_is_fail_closed_on_missing_source() {
2044        // The append path propagates this via `?`, so a rotation failure
2045        // aborts the fetch (fail-closed) rather than silently continuing.
2046        let dir = TempDir::new().expect("tmp");
2047        let missing = tmp_dir_utf8(&dir).join("nope.log");
2048        let err = rotate_log(&missing).expect_err("missing source must error");
2049        assert!(matches!(err, LogError::Io(_)), "got {err:?}");
2050    }
2051
2052    #[test]
2053    #[serial_test::serial]
2054    fn prune_respects_retention_window_and_disable() {
2055        let dir = TempDir::new().expect("tmp");
2056        let base = tmp_dir_utf8(&dir);
2057        let path = base.join("access.log");
2058        let old_gz = base.join("access.log.2020-01-01-000000.gz");
2059        let new_gz = base.join("access.log.2999-01-01-000000.gz");
2060
2061        let mk = |p: &Utf8Path, aged: bool| {
2062            let f = std::fs::File::create(p.as_std_path()).expect("create gz");
2063            if aged {
2064                // 100 days ago — older than the 90-day default & a 1-day window.
2065                let when =
2066                    std::time::SystemTime::now() - std::time::Duration::from_secs(100 * 86_400);
2067                f.set_modified(when).expect("set mtime");
2068            }
2069        };
2070
2071        // (a) days=0 disables pruning entirely.
2072        mk(&old_gz, true);
2073        std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "0");
2074        let _ = open_log(&path);
2075        assert!(old_gz.exists(), "days=0 must NOT prune");
2076
2077        // (b) days=1 prunes the aged segment, keeps a fresh one.
2078        mk(&new_gz, false);
2079        std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "1");
2080        let _ = open_log(&path);
2081        assert!(!old_gz.exists(), "aged segment must be pruned at days=1");
2082        assert!(new_gz.exists(), "fresh segment must survive");
2083
2084        std::env::remove_var("DOIGET_LOG_RETENTION_DAYS");
2085    }
2086
2087    #[test]
2088    #[serial_test::serial]
2089    fn retention_days_env_parsing() {
2090        std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "0");
2091        assert_eq!(retention_days(), 0);
2092        std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "30");
2093        assert_eq!(retention_days(), 30);
2094        std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "garbage");
2095        assert_eq!(retention_days(), DEFAULT_RETENTION_DAYS);
2096        std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "-5");
2097        assert_eq!(retention_days(), DEFAULT_RETENTION_DAYS);
2098        std::env::remove_var("DOIGET_LOG_RETENTION_DAYS");
2099        assert_eq!(retention_days(), DEFAULT_RETENTION_DAYS);
2100    }
2101
2102    #[test]
2103    fn verify_all_flags_tampered_segment_independently() {
2104        let dir = TempDir::new().expect("tmp");
2105        let path = tmp_dir_utf8(&dir).join("access.log");
2106        // Inject the tiny threshold (no global env → no cross-test race).
2107        let log = ProvenanceLog::open_with_rotate_threshold(&path, TEST_SESSION_ID.to_string(), 50)
2108            .expect("open");
2109        log.append(empty_input()).expect("append 1");
2110        log.append(empty_input()).expect("append 2 (rotates)");
2111        drop(log);
2112
2113        // Tamper the CURRENT segment's row: set this_hash to a
2114        // syntactically-valid 64-hex string that cannot be the SHA-256
2115        // of any row (all zeros). NOTE: the previous "flip the last char
2116        // to '0'" was a no-op ~1/16 of runs when the real hash already
2117        // ended in '0' (this_hash depends on `Utc::now()`), which is the
2118        // flake this fixes — mirrors `verify_detects_tampered_row_hash`.
2119        let mut cur = read_rows(&path);
2120        let mut bad = cur.remove(0);
2121        bad.this_hash =
2122            "0000000000000000000000000000000000000000000000000000000000000000".to_string();
2123        std::fs::write(
2124            path.as_std_path(),
2125            format!("{}\n", serde_json::to_string(&bad).expect("ser")),
2126        )
2127        .expect("rewrite tampered current");
2128
2129        let reports = verify_all(&path).expect("verify_all");
2130        assert_eq!(reports.len(), 2);
2131        // Oldest first = the rotated .gz (clean); current last (tampered).
2132        let (gz_path, gz_rep) = &reports[0];
2133        let (cur_path, cur_rep) = &reports[1];
2134        assert!(
2135            gz_path.as_str().ends_with(".gz") && gz_rep.errors.is_empty(),
2136            "rotated segment must stay clean: {gz_path} {gz_rep:?}"
2137        );
2138        assert!(
2139            cur_path.file_name() == Some("access.log") && !cur_rep.errors.is_empty(),
2140            "tampered current segment must report issues: {cur_path} {cur_rep:?}"
2141        );
2142    }
2143}