Skip to main content

doiget_core/
provenance.rs

1//! JSON Lines + SHA-256 hash-chained provenance log.
2//!
3//! Binding spec: `docs/PROVENANCE_LOG.md` (NORMATIVE, §3 row schema, §4 hash
4//! chain). Failure semantics: **fail-closed** — callers MUST abort the fetch
5//! if a log write returns `Err`. See `docs/SECURITY.md` §1.8 and ADR-0006.
6//!
7//! # On-disk format
8//!
9//! - JSON Lines (`.jsonl`): one JSON object per line, terminated by `\n` (LF).
10//! - UTF-8. Timestamps are RFC3339 in UTC.
11//! - Each row is appended via a single `write_all` whose payload always ends
12//!   in `\n`, so a partially-written row is detectable as a missing trailing
13//!   newline rather than a torn JSON record.
14//! - In audit-grade mode (the only mode shipped here), the writer flushes the
15//!   `BufWriter` and `fsync`s the file after every row.
16//!
17//! # Hash chain (PROVENANCE_LOG.md §4)
18//!
19//! Each row carries a `prev_hash` and a `this_hash`. The first row's
20//! `prev_hash` is the literal string `"GENESIS"`. Every subsequent row's
21//! `prev_hash` MUST equal the previous row's `this_hash`.
22//!
23//! When a log file rotates (§6 — not yet implemented in this crate; see TODO
24//! below), the first row of the NEW log file also uses `prev_hash =
25//! "GENESIS"`, restarting the chain.
26//!
27//! `this_hash` is computed as:
28//!
29//! ```text
30//! this_hash = lower_hex(SHA-256(canonical_json(row \ {this_hash})))
31//! ```
32//!
33//! where `canonical_json` is **compact JSON (no whitespace) with object keys
34//! sorted lexicographically** (PROVENANCE_LOG.md §4). For a row with fields
35//! `{ts: "...", ts_seq: 1, event: "fetch", ...}`, the canonical bytes begin
36//! with `{"capability":...` because `capability` is the lex-first top-level
37//! key. Downstream `doiget audit-log --verify` (Phase 1+) relies on this
38//! exact rule — do not change the canonicalization without bumping the spec.
39//!
40//! # In-process serialization
41//!
42//! `ProvenanceLog` holds a `Mutex<LogState>`. All `append` calls within the
43//! same process serialize on this mutex, satisfying the "process-local mutex
44//! on log appender" requirement of `docs/SECURITY.md` §1.8. Cross-process
45//! coordination (multiple `doiget` invocations) is out of scope here and
46//! handled by the higher-level `flock`-based store layer.
47//!
48//! # Session id
49//!
50//! `session_id` (PROVENANCE_LOG.md §3) is a 26-char ULID generated **once per
51//! process invocation** by the caller and stamped into every row written
52//! through the resulting [`ProvenanceLog`]. This crate does not generate the
53//! ULID itself — see [`ProvenanceLog::open`] for the contract.
54//!
55//! # TODO: log rotation (§6)
56//!
57//! Log rotation is not yet implemented. When it lands, the first row of the
58//! NEW log file MUST use `prev_hash = "GENESIS"` (chain restart), matching
59//! the `GENESIS_HASH` constant below.
60
61use std::collections::BTreeMap;
62use std::fs::{File, OpenOptions};
63use std::io::{BufRead, BufReader, BufWriter, Write};
64use std::sync::Mutex;
65
66use camino::{Utf8Path, Utf8PathBuf};
67use chrono::{DateTime, Utc};
68use serde::{Deserialize, Serialize};
69use sha2::{Digest, Sha256};
70
71/// One row of the provenance log (PROVENANCE_LOG.md §3).
72///
73/// The on-disk wire field names match the spec table; struct-field order is
74/// **not** load-bearing for the hash because canonicalization sorts keys
75/// lexicographically (see PROVENANCE_LOG.md §4).
76///
77/// **Schema version**: this struct is the **v2** row shape (ADR-0024).
78/// Every v2 row carries `schema_version = "v2"` literally; the
79/// `canonical_digest` field carries the ADR-0021 §1 audit identity of
80/// the fetch on rows where one applies (`Fetch` / `Resolve` /
81/// `StoreWrite`) and is `None` on session bookend rows
82/// (`SessionStart` / `SessionEnd` / `CapabilityResolved`) that have no
83/// ref. v1 rows (pre-Slice-4) lack both fields and MUST be migrated via
84/// [`migrate_v1_to_v2`] before the v2 binary can read them — the
85/// `deny_unknown_fields` + non-defaulted `schema_version` shape ensures
86/// v1 rows fail to parse loudly rather than producing silent hash-chain
87/// mismatches.
88#[derive(Debug, Clone, Serialize, Deserialize)]
89#[serde(deny_unknown_fields)]
90pub struct LogRow {
91    /// RFC3339 UTC timestamp of the append (millisecond precision).
92    pub ts: DateTime<Utc>,
93    /// Per-session monotonic sequence number, starting at 1.
94    pub ts_seq: u64,
95    /// Event class (see [`LogEvent`]).
96    pub event: LogEvent,
97    /// Optional reference (DOI / arXiv id). Wire field name is `ref`.
98    #[serde(rename = "ref")]
99    pub ref_: Option<String>,
100    /// Optional source name (e.g. `unpaywall`).
101    pub source: Option<String>,
102    /// Result (see [`LogResult`]).
103    pub result: LogResult,
104    /// OA license string (`event=fetch`, `result=ok`); `None` otherwise.
105    pub license: Option<String>,
106    /// Bytes written / fetched, on success rows.
107    pub size_bytes: Option<u64>,
108    /// Path to the stored payload, relative to the store root
109    /// (`event=fetch`, `result=ok`); `None` otherwise.
110    pub store_path: Option<String>,
111    /// Capability under which the row was written (REQUIRED, every row).
112    pub capability: Capability,
113    /// 26-char ULID identifying the process invocation (REQUIRED).
114    pub session_id: String,
115    /// Stable error code on failure rows.
116    pub error_code: Option<String>,
117    /// Row schema version. Always [`LOG_SCHEMA_VERSION`] (`"v2"`) for
118    /// new rows written by this build (ADR-0024). v1 rows lack this
119    /// field; they MUST be migrated via [`migrate_v1_to_v2`] first.
120    pub schema_version: String,
121    /// Canonical-digest of the fetch's audit identity (ADR-0021 §1) as
122    /// 64 lowercase hex chars. Present on rows with a `ref` (`Fetch`,
123    /// `Resolve`, `StoreWrite`); `None` on session bookend rows. The
124    /// digest is computed from a [`crate::CanonicalRef`] whose
125    /// `resolver_profile` matches this row's `source` field for
126    /// migrated v1 rows; new v2 rows MAY pass an explicit
127    /// `resolver_profile` distinct from `source`.
128    pub canonical_digest: Option<String>,
129    /// 64 lowercase hex chars, OR the literal string `"GENESIS"` for the
130    /// first row of a fresh log file.
131    pub prev_hash: String,
132    /// 64 lowercase hex chars. SHA-256 of canonical JSON of THIS row with
133    /// the `this_hash` field removed. See module docs.
134    pub this_hash: String,
135}
136
137/// Provenance-log row schema version this build writes
138/// (`docs/PROVENANCE_LOG.md` §3, ADR-0024).
139///
140/// Bumped from `"v1"` (implicit; pre-Slice-4 rows had no
141/// `schema_version` field) to `"v2"` when the `canonical_digest` column
142/// landed. The v1→v2 migration is one-shot, idempotent, and dry-runnable
143/// via [`migrate_v1_to_v2`].
144pub const LOG_SCHEMA_VERSION: &str = "v2";
145
146/// Event class for a log row (PROVENANCE_LOG.md §3).
147///
148/// Note: result-status (`ok`/`err`/`denied`) lives in [`LogResult`], NOT in
149/// the event variant. So `Fetch` covers both successful and failed fetch
150/// attempts; the row's `result` distinguishes them.
151///
152/// `non_exhaustive` so adding new variants is non-breaking.
153#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
154#[serde(rename_all = "snake_case")]
155#[non_exhaustive]
156pub enum LogEvent {
157    /// Process started; first row of a new session.
158    SessionStart,
159    /// Capability resolution finished (allowed / denied / which env var).
160    CapabilityResolved,
161    /// Reference resolved to a fetch URL.
162    Resolve,
163    /// Fetch attempt (success or failure determined by `result`).
164    Fetch,
165    /// Store write attempt (success or failure determined by `result`).
166    StoreWrite,
167    /// Process ended cleanly.
168    SessionEnd,
169}
170
171/// Per-row outcome (PROVENANCE_LOG.md §3). `non_exhaustive` for forward
172/// compatibility.
173#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
174#[serde(rename_all = "snake_case")]
175#[non_exhaustive]
176pub enum LogResult {
177    /// The operation succeeded.
178    Ok,
179    /// The operation failed with an error.
180    Err,
181    /// The operation was denied (e.g. capability gate).
182    Denied,
183}
184
185/// Capability under which a row was written (PROVENANCE_LOG.md §3).
186///
187/// `kebab-case` serde rename emits `oa`, `metadata`, `tdm-elsevier`,
188/// `tdm-aps`, `tdm-springer` exactly as the spec requires. `non_exhaustive`
189/// for forward compatibility.
190#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
191#[serde(rename_all = "kebab-case")]
192#[non_exhaustive]
193pub enum Capability {
194    /// Open access tier.
195    Oa,
196    /// Metadata-only access.
197    Metadata,
198    /// Elsevier TDM (Tier 3, opt-in build).
199    TdmElsevier,
200    /// APS TDM (Tier 3, opt-in build).
201    TdmAps,
202    /// Springer TDM (Tier 3, opt-in build).
203    TdmSpringer,
204}
205
206/// Errors emitted by the provenance log writer. Callers MUST treat any
207/// variant as a fail-closed signal and abort the surrounding fetch.
208#[derive(Debug, thiserror::Error)]
209#[non_exhaustive]
210pub enum LogError {
211    /// I/O error opening, reading, writing, or syncing the log file. Includes
212    /// recovery-time corruption detection where the synthetic message is
213    /// `"corrupted log at line N: …"`.
214    #[error("provenance log io error: {0}")]
215    Io(#[from] std::io::Error),
216    /// Serialization of a row to canonical JSON failed.
217    #[error("provenance log serialization error: {0}")]
218    Serialize(#[from] serde_json::Error),
219    /// Path supplied to [`ProvenanceLog::open`] exists but is not a regular
220    /// file (e.g. a directory or symlink).
221    #[error("provenance log path is not a regular file: {0}")]
222    NotARegularFile(Utf8PathBuf),
223}
224
225/// Append-only writer with in-process serialization.
226#[derive(Debug)]
227pub struct ProvenanceLog {
228    path: Utf8PathBuf,
229    state: Mutex<LogState>,
230    session_id: String,
231}
232
233/// Mutable internal state, guarded by [`ProvenanceLog::state`].
234#[derive(Debug)]
235struct LogState {
236    /// `ts_seq` of the **next** row to be appended.
237    next_seq: u64,
238    /// 64 lowercase hex chars; [`GENESIS_HASH`] if the log is empty.
239    last_hash: String,
240}
241
242/// The genesis sentinel used as `prev_hash` for the first row of a log file
243/// (PROVENANCE_LOG.md §3, §6). Also written verbatim as the prev-hash of the
244/// first row after a log rotation (TODO: rotation not yet implemented).
245const GENESIS_HASH: &str = "GENESIS";
246
247/// Caller-supplied fields for a row. The writer fills in `ts`, `ts_seq`,
248/// `session_id`, `prev_hash`, `this_hash`, and the literal
249/// `schema_version = "v2"` (`LOG_SCHEMA_VERSION`).
250///
251/// Callers SHOULD populate [`Self::canonical_digest`] on rows that have
252/// a meaningful audit identity (`Fetch` / `Resolve` / `StoreWrite` rows
253/// with a `ref`), leaving it `None` on session bookend rows. The digest
254/// is produced by [`crate::CanonicalRef::digest_hex`] from a
255/// `(source_type, source_id, resolver_profile, version)` tuple — see
256/// ADR-0021 §1 for the algorithm and ADR-0024 for the implementation
257/// surface.
258#[derive(Debug, Clone)]
259pub struct RowInput<'a> {
260    /// Event class.
261    pub event: LogEvent,
262    /// Result.
263    pub result: LogResult,
264    /// Capability under which the row is written (REQUIRED for every row).
265    pub capability: Capability,
266    /// Optional DOI / arXiv id.
267    pub ref_: Option<&'a str>,
268    /// Optional source name.
269    pub source: Option<&'a str>,
270    /// Optional error code on failure rows.
271    pub error_code: Option<&'a str>,
272    /// Optional payload size in bytes.
273    pub size_bytes: Option<u64>,
274    /// Optional OA license string (set on `event=fetch`, `result=ok`).
275    pub license: Option<&'a str>,
276    /// Optional store path relative to the store root (set on `event=fetch`,
277    /// `result=ok`).
278    pub store_path: Option<&'a str>,
279    /// Optional canonical-digest (ADR-0021 §1) as 64 lowercase hex
280    /// chars. `None` for session bookend / capability-resolution rows;
281    /// SHOULD be `Some` for `Fetch` / `Resolve` / `StoreWrite` rows
282    /// whose `source` field names the resolver. Build via
283    /// [`crate::Ref::promote`] + [`crate::CanonicalRef::digest_hex`].
284    pub canonical_digest: Option<&'a str>,
285}
286
287// ---------------------------------------------------------------------------
288// Canonical-JSON helper (PROVENANCE_LOG.md §4)
289//
290// Hashing rule (CRITICAL — this is the spec contract for `audit-log --verify`):
291//
292//   this_hash = lower_hex(SHA-256(canonical_json(row \ {this_hash})))
293//
294// Canonical JSON = **compact (no whitespace), keys sorted lexicographically,
295// no trailing whitespace** (§4). Struct field order is deliberately NOT
296// load-bearing here; the canonicalizer sorts the resulting object keys via
297// `BTreeMap<String, Value>`, which serializes in lex-sorted key order.
298//
299// Worked example: for the row fragment `{ts_seq: 1, ts: "..."}` (input order),
300// the canonical bytes after lex sort are `{"ts":"...","ts_seq":1}` because
301// `"ts"` < `"ts_seq"` lexicographically. In v2 (ADR-0024) the lex-first
302// top-level key is `"canonical_digest"` — `"canonical_digest"` < `"capability"`
303// because 'n'(110) < 'p'(112) at byte index 2 (both share the `"ca"`
304// prefix). The pre-v2 lex-first key was `"capability"`.
305// ---------------------------------------------------------------------------
306
307/// Serializable shadow of [`LogRow`] **without** `this_hash`. Used solely as
308/// an intermediate to compute the canonical bytes that `this_hash` is the
309/// SHA-256 of. The wire key names match [`LogRow`]'s `serde` attributes.
310///
311/// v2 shape (ADR-0024): includes `schema_version` and
312/// `canonical_digest`. Both fields participate in the hash chain — a
313/// tampered `canonical_digest` is detected by `audit-log --verify`
314/// exactly like a tampered `ref` or `source` would be.
315#[derive(Serialize)]
316struct RowForHash<'a> {
317    ts: DateTime<Utc>,
318    ts_seq: u64,
319    event: LogEvent,
320    #[serde(rename = "ref")]
321    ref_: Option<&'a str>,
322    source: Option<&'a str>,
323    result: LogResult,
324    license: Option<&'a str>,
325    size_bytes: Option<u64>,
326    store_path: Option<&'a str>,
327    capability: Capability,
328    session_id: &'a str,
329    error_code: Option<&'a str>,
330    schema_version: &'a str,
331    canonical_digest: Option<&'a str>,
332    prev_hash: &'a str,
333}
334
335/// Produce canonical-JSON bytes for a row-without-hash, with object keys
336/// sorted lexicographically per PROVENANCE_LOG.md §4.
337///
338/// Implementation: serialize via `serde_json::to_value` to get a `Value`,
339/// require it be an object, then move its entries into a
340/// `BTreeMap<String, Value>` (which serializes with lex-sorted keys) and
341/// re-serialize compactly. No new dependency required.
342fn canonical_json_for_hash(rfh: &RowForHash<'_>) -> Result<Vec<u8>, LogError> {
343    let value = serde_json::to_value(rfh)?;
344    let map = match value {
345        serde_json::Value::Object(m) => m,
346        // RowForHash is always a struct, so this branch is unreachable in
347        // practice; surface as a serde error if it ever changes.
348        _ => {
349            return Err(LogError::Serialize(serde::de::Error::custom(
350                "RowForHash did not serialize to a JSON object",
351            )));
352        }
353    };
354    let sorted: BTreeMap<String, serde_json::Value> = map.into_iter().collect();
355    Ok(serde_json::to_vec(&sorted)?)
356}
357
358/// Compute `this_hash` for the given row-without-hash. Returns 64 lowercase
359/// hex chars.
360fn compute_this_hash(rfh: &RowForHash<'_>) -> Result<String, LogError> {
361    let bytes = canonical_json_for_hash(rfh)?;
362    let digest = Sha256::digest(&bytes);
363    Ok(hex::encode(digest))
364}
365
366impl ProvenanceLog {
367    /// Open or create the log at `path`, stamping every row with
368    /// `session_id`.
369    ///
370    /// `session_id` MUST be a 26-char ULID generated **once per process**
371    /// invocation by the caller. Re-opening the log within the same process
372    /// reuses the same `session_id`; re-opening in a new process gets a new
373    /// one. This crate intentionally does NOT generate the ULID itself —
374    /// callers are responsible for creating one (e.g. via the `ulid` crate
375    /// already present in the workspace) and threading it through.
376    ///
377    /// If the file exists, scan it once to recover the last `ts_seq` and
378    /// `this_hash`. If the file is missing or empty, the first row will use
379    /// `prev_hash = "GENESIS"` and `ts_seq = 1`.
380    ///
381    /// # Errors
382    ///
383    /// Returns [`LogError::Io`] for I/O failures or if any line fails to
384    /// parse as a [`LogRow`] (synthetic message: `"corrupted log at line N: …"`).
385    /// The writer never silently truncates a corrupt log.
386    ///
387    /// Returns [`LogError::NotARegularFile`] if `path` exists but is not a
388    /// regular file (e.g. a directory).
389    pub fn open(path: impl Into<Utf8PathBuf>, session_id: String) -> Result<Self, LogError> {
390        let path: Utf8PathBuf = path.into();
391
392        // Reject obvious non-files up front so later `OpenOptions::append`
393        // doesn't produce a confusing platform-dependent error.
394        if path.exists() {
395            let md = std::fs::metadata(&path)?;
396            if !md.is_file() {
397                return Err(LogError::NotARegularFile(path));
398            }
399        }
400
401        let (next_seq, last_hash) = recover_state(&path)?;
402
403        Ok(Self {
404            path,
405            state: Mutex::new(LogState {
406                next_seq,
407                last_hash,
408            }),
409            session_id,
410        })
411    }
412
413    /// Append a row. Computes `prev_hash`, `ts_seq`, `ts`, `session_id`, and
414    /// `this_hash`; the caller only supplies the semantic fields via
415    /// [`RowInput`].
416    ///
417    /// Returns the assigned `ts_seq` on success.
418    ///
419    /// # Errors
420    ///
421    /// Returns [`LogError`] on serialization, I/O, or fsync failure. Callers
422    /// MUST treat this as fail-closed and abort the surrounding fetch.
423    pub fn append(&self, input: RowInput<'_>) -> Result<u64, LogError> {
424        // Hold the mutex for the entire append: serialize + write + flush +
425        // fsync + state update. This is the in-process serialization point
426        // promised by `docs/SECURITY.md` §1.8.
427        //
428        // A poisoned mutex only happens if a previous `append` panicked
429        // mid-write. Surface that as an I/O error rather than propagating
430        // a panic.
431        let mut state = self
432            .state
433            .lock()
434            .map_err(|_| LogError::Io(std::io::Error::other("provenance log mutex poisoned")))?;
435
436        let ts_seq = state.next_seq;
437        let prev_hash = state.last_hash.clone();
438        let ts = Utc::now();
439
440        let rfh = RowForHash {
441            ts,
442            ts_seq,
443            event: input.event,
444            ref_: input.ref_,
445            source: input.source,
446            result: input.result,
447            license: input.license,
448            size_bytes: input.size_bytes,
449            store_path: input.store_path,
450            capability: input.capability,
451            session_id: &self.session_id,
452            error_code: input.error_code,
453            schema_version: LOG_SCHEMA_VERSION,
454            canonical_digest: input.canonical_digest,
455            prev_hash: &prev_hash,
456        };
457
458        let this_hash = compute_this_hash(&rfh)?;
459
460        // Build the on-disk row. Owned strings here because `LogRow` does
461        // not borrow.
462        let row = LogRow {
463            ts,
464            ts_seq,
465            event: input.event,
466            ref_: input.ref_.map(str::to_string),
467            source: input.source.map(str::to_string),
468            result: input.result,
469            license: input.license.map(str::to_string),
470            size_bytes: input.size_bytes,
471            store_path: input.store_path.map(str::to_string),
472            capability: input.capability,
473            session_id: self.session_id.clone(),
474            error_code: input.error_code.map(str::to_string),
475            schema_version: LOG_SCHEMA_VERSION.to_string(),
476            canonical_digest: input.canonical_digest.map(str::to_string),
477            prev_hash,
478            this_hash: this_hash.clone(),
479        };
480
481        // Serialize, append `\n`, write_all in one syscall, flush BufWriter,
482        // fsync the underlying file. `\n` is part of the same buffer, so a
483        // crash mid-write leaves at most a partial line (no trailing `\n`),
484        // which is detectable on recovery as a corrupted final line.
485        let mut bytes = serde_json::to_vec(&row)?;
486        bytes.push(b'\n');
487
488        let file = OpenOptions::new()
489            .create(true)
490            .append(true)
491            .open(&self.path)?;
492        let mut writer = BufWriter::new(file);
493        writer.write_all(&bytes)?;
494        writer.flush()?;
495        // `into_inner` to recover the underlying File for `sync_all`.
496        let file = writer.into_inner().map_err(|e| {
497            LogError::Io(std::io::Error::other(format!(
498                "buf writer flush failed: {}",
499                e.error()
500            )))
501        })?;
502        file.sync_all()?;
503
504        // Only after a successful fsync do we advance the in-memory state.
505        // If any of the above fails, the next `append` retries from the
506        // same `(ts_seq, prev_hash)` — at most a torn last line on disk.
507        state.next_seq = ts_seq + 1;
508        state.last_hash = this_hash;
509
510        Ok(ts_seq)
511    }
512
513    /// Returns the path the log was opened at. Useful for tests and audit tooling.
514    pub fn path(&self) -> &Utf8Path {
515        &self.path
516    }
517
518    /// Returns the session id stamped into every row written through this
519    /// writer.
520    pub fn session_id(&self) -> &str {
521        &self.session_id
522    }
523}
524
525/// Scan an existing log to recover `(next_seq, last_hash)`.
526///
527/// Walk every line, parse as [`LogRow`], track the last successfully parsed
528/// row. If parsing fails, return [`LogError::Io`] with a synthetic
529/// `"corrupted log at line N: …"` message — never silently truncate.
530fn recover_state(path: &Utf8Path) -> Result<(u64, String), LogError> {
531    let file = match File::open(path) {
532        Ok(f) => f,
533        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
534            return Ok((1, GENESIS_HASH.to_string()));
535        }
536        Err(e) => return Err(LogError::Io(e)),
537    };
538
539    let reader = BufReader::new(file);
540    let mut last_seq: u64 = 0;
541    let mut last_hash: String = GENESIS_HASH.to_string();
542
543    for (idx, line_res) in reader.lines().enumerate() {
544        let line_no = idx + 1;
545        let line = line_res?;
546        if line.is_empty() {
547            // Tolerate trailing/empty lines silently — they are not data.
548            continue;
549        }
550        let row: LogRow = serde_json::from_str(&line).map_err(|e| {
551            LogError::Io(std::io::Error::new(
552                std::io::ErrorKind::InvalidData,
553                format!("corrupted log at line {}: {}", line_no, e),
554            ))
555        })?;
556        last_seq = row.ts_seq;
557        last_hash = row.this_hash;
558    }
559
560    if last_seq == 0 {
561        Ok((1, GENESIS_HASH.to_string()))
562    } else {
563        Ok((last_seq + 1, last_hash))
564    }
565}
566
567// ---------------------------------------------------------------------------
568// Verification (`doiget audit-log --verify`)
569//
570// The provenance log is a JSON Lines file with a SHA-256 hash chain
571// (PROVENANCE_LOG.md §4). Tampering is detected by recomputing every row's
572// `this_hash` and validating the chain. This module provides the offline
573// verifier; the CLI wrapper lives in `doiget-cli::commands::audit_log`.
574//
575// Failure model: returning `Err` is reserved for I/O failures opening / reading
576// the file. Per-row issues (parse failures, hash/chain mismatches, sequence
577// regressions) are accumulated into [`VerifyReport::errors`] so callers can
578// report them all in one pass — this is the contract Phase 1 ships.
579// ---------------------------------------------------------------------------
580
581/// Outcome of [`verify`]: per-row chain status across the entire log.
582#[derive(Debug, Clone)]
583#[non_exhaustive]
584pub struct VerifyReport {
585    /// Total non-empty lines processed (1-based count).
586    pub total_rows: usize,
587    /// Rows whose hash, chain link, and `ts_seq` all validated.
588    pub ok_rows: usize,
589    /// Issues encountered, in encounter order. Line numbers are 1-based.
590    pub errors: Vec<VerifyIssue>,
591}
592
593impl VerifyReport {
594    /// An empty, all-clear report — used when the log file is absent.
595    fn empty() -> Self {
596        Self {
597            total_rows: 0,
598            ok_rows: 0,
599            errors: Vec::new(),
600        }
601    }
602}
603
604/// A single issue discovered by [`verify`].
605#[derive(Debug, Clone)]
606#[non_exhaustive]
607pub struct VerifyIssue {
608    /// 1-based line number where the issue was detected.
609    pub line: usize,
610    /// Classification of the issue (see [`VerifyIssueKind`]).
611    pub kind: VerifyIssueKind,
612    /// Human-readable description (caller may format for stderr/stdout).
613    pub message: String,
614}
615
616/// Classification of a [`VerifyIssue`]. `non_exhaustive` for forward
617/// compatibility — future kinds may include `SessionIdChange`, etc.
618#[derive(Debug, Clone, Copy, PartialEq, Eq)]
619#[non_exhaustive]
620pub enum VerifyIssueKind {
621    /// Row failed to parse as [`LogRow`] (corrupted JSON or unknown field).
622    ParseError,
623    /// `prev_hash` did not match the previous row's `this_hash` (or the
624    /// genesis sentinel on row 1).
625    PrevHashMismatch,
626    /// Row's stored `this_hash` did not match the recomputed canonical-JSON
627    /// SHA-256.
628    ThisHashMismatch,
629    /// `ts_seq` did not increase strictly monotonically (within a session;
630    /// see PROVENANCE_LOG.md §3 + §6 — chain restarts after rotation are
631    /// permitted to reset `ts_seq` and are detected via the genesis sentinel).
632    SequenceJump,
633}
634
635/// Verify the entire log file at `path`.
636///
637/// Returns `Ok(VerifyReport)` regardless of whether the chain validates;
638/// callers inspect `report.errors.is_empty()` to determine pass/fail.
639/// Returns `Err` only when the file itself cannot be opened or read at the
640/// I/O level.
641///
642/// Behavior:
643///
644/// - A missing file is treated as a clean, empty log (no tampering possible
645///   on bytes that don't exist) and returns an empty report after a `warn!`.
646/// - Empty / blank lines are skipped — they are not data per the writer's
647///   on-disk format (PROVENANCE_LOG.md §2).
648/// - On a row that fails to parse as [`LogRow`], a `ParseError` is recorded
649///   and verification continues on the next line. The chain anchor does NOT
650///   advance through an unparsable row, so the next valid row's `prev_hash`
651///   is checked against the last successfully parsed row (or against
652///   `"GENESIS"` if no valid row has been seen yet).
653/// - A `prev_hash == "GENESIS"` sentinel marks a chain restart (first row of
654///   a fresh / rotated log per §6) and resets the `ts_seq` monotonicity
655///   anchor — `ts_seq` is NOT compared to the prior row across a restart.
656pub fn verify(path: &Utf8Path) -> Result<VerifyReport, LogError> {
657    let file = match File::open(path) {
658        Ok(f) => f,
659        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
660            tracing::warn!(
661                path = %path,
662                "audit-log verify: log file does not exist; reporting empty"
663            );
664            return Ok(VerifyReport::empty());
665        }
666        Err(e) => return Err(LogError::Io(e)),
667    };
668
669    let reader = BufReader::new(file);
670    let mut report = VerifyReport::empty();
671
672    // Anchor for the chain check: the LAST SUCCESSFULLY PARSED row. The chain
673    // is anchored to the bytes on disk, not to a hypothetical "should have
674    // been". This matches the spec — tampering at row N must surface both as
675    // a hash mismatch on N and as a chain break on N+1.
676    let mut prev_row: Option<LogRow> = None;
677
678    for (idx, line_res) in reader.lines().enumerate() {
679        let line_no = idx + 1;
680        let line = line_res?;
681        if line.is_empty() {
682            continue;
683        }
684
685        report.total_rows += 1;
686
687        let row: LogRow = match serde_json::from_str(&line) {
688            Ok(r) => r,
689            Err(e) => {
690                report.errors.push(VerifyIssue {
691                    line: line_no,
692                    kind: VerifyIssueKind::ParseError,
693                    message: format!("failed to parse row as LogRow: {e}"),
694                });
695                // Chain anchor cannot advance through an unparsable row;
696                // leave `prev_row` untouched so the next valid row's
697                // `prev_hash` is checked against the last-known anchor (or
698                // GENESIS if we never had one).
699                continue;
700            }
701        };
702
703        let mut row_ok = true;
704
705        // 1. Recompute `this_hash` from canonical JSON (row \ {this_hash}).
706        let rfh = RowForHash {
707            ts: row.ts,
708            ts_seq: row.ts_seq,
709            event: row.event,
710            ref_: row.ref_.as_deref(),
711            source: row.source.as_deref(),
712            result: row.result,
713            license: row.license.as_deref(),
714            size_bytes: row.size_bytes,
715            store_path: row.store_path.as_deref(),
716            capability: row.capability,
717            session_id: &row.session_id,
718            error_code: row.error_code.as_deref(),
719            schema_version: &row.schema_version,
720            canonical_digest: row.canonical_digest.as_deref(),
721            prev_hash: &row.prev_hash,
722        };
723        match compute_this_hash(&rfh) {
724            Ok(recomputed) => {
725                if recomputed != row.this_hash {
726                    report.errors.push(VerifyIssue {
727                        line: line_no,
728                        kind: VerifyIssueKind::ThisHashMismatch,
729                        message: format!(
730                            "this_hash mismatch: stored={}, recomputed={}",
731                            row.this_hash, recomputed
732                        ),
733                    });
734                    row_ok = false;
735                }
736            }
737            Err(e) => {
738                // Canonicalization itself failed — surface as a hash
739                // mismatch with the underlying error in the message.
740                report.errors.push(VerifyIssue {
741                    line: line_no,
742                    kind: VerifyIssueKind::ThisHashMismatch,
743                    message: format!("failed to recompute this_hash: {e}"),
744                });
745                row_ok = false;
746            }
747        }
748
749        // 2. Chain link: `prev_hash` matches anchor (GENESIS on row 1 / after
750        //    a chain restart, prior row's `this_hash` otherwise).
751        let is_genesis = row.prev_hash == GENESIS_HASH;
752        match &prev_row {
753            None => {
754                // First non-empty row in the file: must declare GENESIS.
755                if !is_genesis {
756                    report.errors.push(VerifyIssue {
757                        line: line_no,
758                        kind: VerifyIssueKind::PrevHashMismatch,
759                        message: format!(
760                            "first row must have prev_hash=\"GENESIS\", got {:?}",
761                            row.prev_hash
762                        ),
763                    });
764                    row_ok = false;
765                }
766            }
767            Some(prev) => {
768                if is_genesis {
769                    // Chain restart (rotation per §6) — accepted, no link
770                    // check, and the `ts_seq` monotonicity anchor resets
771                    // (handled below via `is_genesis`).
772                } else if row.prev_hash != prev.this_hash {
773                    report.errors.push(VerifyIssue {
774                        line: line_no,
775                        kind: VerifyIssueKind::PrevHashMismatch,
776                        message: format!(
777                            "prev_hash mismatch: row stores {}, previous row's this_hash is {}",
778                            row.prev_hash, prev.this_hash
779                        ),
780                    });
781                    row_ok = false;
782                }
783            }
784        }
785
786        // 3. ts_seq monotonicity — strictly greater than the previous row's
787        //    `ts_seq`, EXCEPT across a chain restart (where `ts_seq` resets).
788        if let Some(prev) = &prev_row {
789            if !is_genesis && row.ts_seq <= prev.ts_seq {
790                report.errors.push(VerifyIssue {
791                    line: line_no,
792                    kind: VerifyIssueKind::SequenceJump,
793                    message: format!(
794                        "ts_seq did not increase strictly: previous={}, current={}",
795                        prev.ts_seq, row.ts_seq
796                    ),
797                });
798                row_ok = false;
799            }
800        }
801
802        if row_ok {
803            report.ok_rows += 1;
804        }
805
806        // Advance the anchor to the just-parsed row (whether or not it had
807        // issues — the on-disk bytes ARE the chain).
808        prev_row = Some(row);
809    }
810
811    Ok(report)
812}
813
814// ---------------------------------------------------------------------------
815// v1 → v2 migration (ADR-0024, `docs/PROVENANCE_LOG.md` §"Schema migration").
816//
817// v1 rows lack `schema_version` and `canonical_digest`; the v2 binary
818// fails loudly when asked to read them (see `recover_state` /
819// `verify`). The migration recovers a v2 log from a v1 file by:
820//
821//   1. Parsing every v1 row via the [`V1LogRow`] shadow struct.
822//   2. Deriving a [`crate::CanonicalRef`] from the v1 `(ref, source)`
823//      pair — `source` becomes `resolver_profile`, `version` is `None`
824//      (ADR-0021 §1 → ADR-0024 migration recipe).
825//   3. Re-computing the SHA-256 hash chain across the new row
826//      payloads. The v1 chain is invalidated by the schema change; the
827//      v2 chain restarts at the first row's stored `prev_hash` (which
828//      is `"GENESIS"` on a fresh log).
829//   4. Writing the new rows to `<log_path>.v2-migrated`, then
830//      atomically renaming it onto `<log_path>` after backing up the
831//      original to `<log_path>.v1-backup`.
832//
833// The migration is **idempotent**: running it on an already-v2 log
834// re-parses every row as v2, recomputes the same hash chain, and
835// produces a byte-equivalent output.
836//
837// The migration is **dry-runnable**: `dry_run = true` returns a
838// [`MigrationReport`] summarizing what would change without touching
839// disk.
840// ---------------------------------------------------------------------------
841
842/// Summary of a [`migrate_v1_to_v2`] run.
843///
844/// Marked `#[non_exhaustive]` so future fields (e.g. a per-row error
845/// list, an aborted-row count) can be added without breaking callers
846/// that pattern-match.
847#[derive(Debug, Clone)]
848#[non_exhaustive]
849pub struct MigrationReport {
850    /// Number of rows rewritten (or that WOULD be rewritten under
851    /// `dry_run`).
852    pub rows_rewritten: u64,
853    /// Whether this was a dry-run preview (`true`) or a live rewrite
854    /// (`false`).
855    pub dry_run: bool,
856    /// Stored `this_hash` of the first input row (the v1 chain anchor).
857    /// `"GENESIS"` is reported as the literal `"GENESIS"` when the log
858    /// was empty.
859    pub first_row_v1_chain_hash: String,
860    /// Recomputed `this_hash` of the first migrated row under the v2
861    /// canonicalization. Equal to [`Self::first_row_v1_chain_hash`]
862    /// only if the input was already v2 (idempotent case).
863    pub first_row_v2_chain_hash: String,
864}
865
866/// v1 row shadow struct used ONLY by [`migrate_v1_to_v2`]. The
867/// non-defaulted v2 fields (`schema_version`, `canonical_digest`) are
868/// absent here; `deny_unknown_fields` rejects unexpected v2 fields so a
869/// v2 row on disk fails to parse as v1, letting the migrator detect
870/// already-v2 input via fallback to the v2 parser.
871#[derive(Debug, Clone, Deserialize, Serialize)]
872#[serde(deny_unknown_fields)]
873struct V1LogRow {
874    ts: DateTime<Utc>,
875    ts_seq: u64,
876    event: LogEvent,
877    #[serde(rename = "ref")]
878    ref_: Option<String>,
879    source: Option<String>,
880    result: LogResult,
881    license: Option<String>,
882    size_bytes: Option<u64>,
883    store_path: Option<String>,
884    capability: Capability,
885    session_id: String,
886    error_code: Option<String>,
887    prev_hash: String,
888    this_hash: String,
889}
890
891/// Minimal in-memory representation a v1 OR v2 row can be promoted to
892/// before re-hashing.
893#[derive(Debug, Clone)]
894struct MigrationRowSeed {
895    ts: DateTime<Utc>,
896    ts_seq: u64,
897    event: LogEvent,
898    ref_: Option<String>,
899    source: Option<String>,
900    result: LogResult,
901    license: Option<String>,
902    size_bytes: Option<u64>,
903    store_path: Option<String>,
904    capability: Capability,
905    session_id: String,
906    error_code: Option<String>,
907    /// `None` for v1 inputs (the digest is computed during migration);
908    /// `Some(...)` for already-v2 inputs (carried through verbatim for
909    /// idempotency).
910    canonical_digest_in: Option<String>,
911    /// As stored on disk in the input. Used only for the
912    /// `first_row_v1_chain_hash` field of [`MigrationReport`].
913    stored_this_hash: String,
914}
915
916/// Migrate a v1 provenance log to v2 (ADR-0024).
917///
918/// Returns a [`MigrationReport`] describing how many rows were (or
919/// would be) rewritten and the first-row chain-anchor delta. The
920/// migration is idempotent: running it twice produces byte-equivalent
921/// output the second time.
922///
923/// On a missing log file, returns a no-op report (`rows_rewritten = 0`,
924/// `first_row_v1_chain_hash = "GENESIS"`, `first_row_v2_chain_hash =
925/// "GENESIS"`) — there is nothing to migrate.
926///
927/// # Errors
928///
929/// Returns [`LogError::Io`] on I/O failures and on rows that fail to
930/// parse as either v1 or v2 (the synthetic message names the line
931/// number). Returns [`LogError::Serialize`] on canonicalization
932/// failures.
933pub fn migrate_v1_to_v2(log_path: &Utf8Path, dry_run: bool) -> Result<MigrationReport, LogError> {
934    use std::io::BufRead;
935
936    // -- 1. Read the input log, parsing each line as v1 OR (idempotent
937    //       fallback) v2. --------------------------------------------------
938    let file = match File::open(log_path) {
939        Ok(f) => f,
940        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
941            return Ok(MigrationReport {
942                rows_rewritten: 0,
943                dry_run,
944                first_row_v1_chain_hash: GENESIS_HASH.to_string(),
945                first_row_v2_chain_hash: GENESIS_HASH.to_string(),
946            });
947        }
948        Err(e) => return Err(LogError::Io(e)),
949    };
950    let reader = BufReader::new(file);
951    let mut seeds: Vec<MigrationRowSeed> = Vec::new();
952
953    for (idx, line_res) in reader.lines().enumerate() {
954        let line_no = idx + 1;
955        let line = line_res?;
956        if line.is_empty() {
957            continue;
958        }
959        // Try v1 first. If it fails, try v2 (idempotency: re-migrating
960        // a v2 log MUST succeed and produce equivalent output).
961        let seed = if let Ok(v1) = serde_json::from_str::<V1LogRow>(&line) {
962            MigrationRowSeed {
963                ts: v1.ts,
964                ts_seq: v1.ts_seq,
965                event: v1.event,
966                ref_: v1.ref_,
967                source: v1.source,
968                result: v1.result,
969                license: v1.license,
970                size_bytes: v1.size_bytes,
971                store_path: v1.store_path,
972                capability: v1.capability,
973                session_id: v1.session_id,
974                error_code: v1.error_code,
975                canonical_digest_in: None,
976                stored_this_hash: v1.this_hash,
977            }
978        } else {
979            match serde_json::from_str::<LogRow>(&line) {
980                Ok(v2) => MigrationRowSeed {
981                    ts: v2.ts,
982                    ts_seq: v2.ts_seq,
983                    event: v2.event,
984                    ref_: v2.ref_,
985                    source: v2.source,
986                    result: v2.result,
987                    license: v2.license,
988                    size_bytes: v2.size_bytes,
989                    store_path: v2.store_path,
990                    capability: v2.capability,
991                    session_id: v2.session_id,
992                    error_code: v2.error_code,
993                    canonical_digest_in: v2.canonical_digest,
994                    stored_this_hash: v2.this_hash,
995                },
996                Err(e) => {
997                    return Err(LogError::Io(std::io::Error::new(
998                        std::io::ErrorKind::InvalidData,
999                        format!("migration: line {line_no} is neither v1 nor v2: {e}"),
1000                    )));
1001                }
1002            }
1003        };
1004        seeds.push(seed);
1005    }
1006
1007    // -- 2. Derive `canonical_digest` for each seed that lacks one. ------
1008    //
1009    // For v1 rows: build a CanonicalRef from
1010    //   - source_type from `event`/`ref` shape (DOI prefix `10.` vs
1011    //     arXiv) — we use a heuristic that matches `Ref::parse`'s rule
1012    //     (`starts_with "10."` ⇒ DOI; else arXiv).
1013    //   - source_id = ref value (verbatim).
1014    //   - resolver_profile = source value (verbatim, ADR-0021 §3
1015    //     migration recipe).
1016    //   - version = None.
1017    //
1018    // Rows without a `ref` (session bookend) keep `canonical_digest =
1019    // None` per the v2 row contract.
1020
1021    fn derive_digest(seed: &MigrationRowSeed) -> Option<String> {
1022        let ref_str = seed.ref_.as_deref()?;
1023        let source_key = seed.source.as_deref().unwrap_or("");
1024        // Heuristic: bare DOIs always start `10.`; everything else is
1025        // treated as an arXiv id. Mirrors `Ref::parse` rule 3/4.
1026        let source_type = if ref_str.starts_with("10.") {
1027            crate::SourceType::Doi
1028        } else {
1029            crate::SourceType::Arxiv
1030        };
1031        let c = crate::CanonicalRef::new(source_type, ref_str, source_key, None);
1032        Some(c.digest_hex())
1033    }
1034
1035    let digests: Vec<Option<String>> = seeds
1036        .iter()
1037        .map(|s| s.canonical_digest_in.clone().or_else(|| derive_digest(s)))
1038        .collect();
1039
1040    // -- 3. Rebuild the hash chain across the v2 payloads. ----------------
1041    let mut out_rows: Vec<LogRow> = Vec::with_capacity(seeds.len());
1042    let mut prev_hash: String = GENESIS_HASH.to_string();
1043
1044    for (seed, digest) in seeds.iter().zip(digests.iter()) {
1045        let rfh = RowForHash {
1046            ts: seed.ts,
1047            ts_seq: seed.ts_seq,
1048            event: seed.event,
1049            ref_: seed.ref_.as_deref(),
1050            source: seed.source.as_deref(),
1051            result: seed.result,
1052            license: seed.license.as_deref(),
1053            size_bytes: seed.size_bytes,
1054            store_path: seed.store_path.as_deref(),
1055            capability: seed.capability,
1056            session_id: &seed.session_id,
1057            error_code: seed.error_code.as_deref(),
1058            schema_version: LOG_SCHEMA_VERSION,
1059            canonical_digest: digest.as_deref(),
1060            prev_hash: &prev_hash,
1061        };
1062        let this_hash = compute_this_hash(&rfh)?;
1063        let row = LogRow {
1064            ts: seed.ts,
1065            ts_seq: seed.ts_seq,
1066            event: seed.event,
1067            ref_: seed.ref_.clone(),
1068            source: seed.source.clone(),
1069            result: seed.result,
1070            license: seed.license.clone(),
1071            size_bytes: seed.size_bytes,
1072            store_path: seed.store_path.clone(),
1073            capability: seed.capability,
1074            session_id: seed.session_id.clone(),
1075            error_code: seed.error_code.clone(),
1076            schema_version: LOG_SCHEMA_VERSION.to_string(),
1077            canonical_digest: digest.clone(),
1078            prev_hash: prev_hash.clone(),
1079            this_hash: this_hash.clone(),
1080        };
1081        prev_hash = this_hash;
1082        out_rows.push(row);
1083    }
1084
1085    // -- 4. Build the report. --------------------------------------------
1086    let first_v1_hash = seeds
1087        .first()
1088        .map(|s| s.stored_this_hash.clone())
1089        .unwrap_or_else(|| GENESIS_HASH.to_string());
1090    let first_v2_hash = out_rows
1091        .first()
1092        .map(|r| r.this_hash.clone())
1093        .unwrap_or_else(|| GENESIS_HASH.to_string());
1094    let report = MigrationReport {
1095        rows_rewritten: out_rows.len() as u64,
1096        dry_run,
1097        first_row_v1_chain_hash: first_v1_hash,
1098        first_row_v2_chain_hash: first_v2_hash,
1099    };
1100
1101    if dry_run {
1102        return Ok(report);
1103    }
1104
1105    // -- 5. Live write: stage to `<log_path>.v2-migrated`, back up the
1106    //       v1, then atomically rename. -----------------------------------
1107    let staged_path = with_suffix(log_path, ".v2-migrated");
1108    let backup_path = with_suffix(log_path, ".v1-backup");
1109
1110    {
1111        let staged_file = OpenOptions::new()
1112            .create(true)
1113            .write(true)
1114            .truncate(true)
1115            .open(&staged_path)?;
1116        let mut writer = BufWriter::new(staged_file);
1117        for row in &out_rows {
1118            let mut bytes = serde_json::to_vec(row)?;
1119            bytes.push(b'\n');
1120            writer.write_all(&bytes)?;
1121        }
1122        writer.flush()?;
1123        let file = writer.into_inner().map_err(|e| {
1124            LogError::Io(std::io::Error::other(format!(
1125                "migration buf writer flush failed: {}",
1126                e.error()
1127            )))
1128        })?;
1129        file.sync_all()?;
1130    }
1131
1132    // Sanity-check: the staged file MUST verify clean before we
1133    // commit the swap. If it doesn't, the migration is buggy — abort
1134    // without touching the live log.
1135    let verify_report = verify(&staged_path)?;
1136    if !verify_report.errors.is_empty() {
1137        return Err(LogError::Io(std::io::Error::other(format!(
1138            "migration: staged v2 log failed verify; first issue: {:?}",
1139            verify_report.errors.first()
1140        ))));
1141    }
1142
1143    // Move the original aside as `<log_path>.v1-backup`. Overwriting
1144    // any prior backup is intentional — the user re-running migrate
1145    // expects the most recent original preserved.
1146    if log_path.exists() {
1147        if backup_path.exists() {
1148            std::fs::remove_file(&backup_path)?;
1149        }
1150        std::fs::rename(log_path, &backup_path)?;
1151    }
1152    // Atomically promote the staged file to the live path.
1153    std::fs::rename(&staged_path, log_path)?;
1154
1155    Ok(report)
1156}
1157
1158/// Append a literal suffix to a [`Utf8Path`], producing a sibling path
1159/// in the same directory. Avoids `std::path::PathBuf` per the workspace
1160/// posture rule (`docs/SECURITY.md` §3 — camino-only file paths in
1161/// production code).
1162fn with_suffix(path: &Utf8Path, suffix: &str) -> Utf8PathBuf {
1163    let s = format!("{path}{suffix}");
1164    Utf8PathBuf::from(s)
1165}
1166
1167// ---------------------------------------------------------------------------
1168// Tests
1169// ---------------------------------------------------------------------------
1170
1171#[cfg(test)]
1172#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
1173mod tests {
1174    use super::*;
1175    use std::fs;
1176    use std::sync::Arc;
1177    use std::thread;
1178
1179    use tempfile::TempDir;
1180
1181    /// Convert a `TempDir`'s `&std::path::Path` to a `Utf8PathBuf`. Tests
1182    /// always run on UTF-8 temp paths in CI; if the OS returns a non-UTF-8
1183    /// path we panic, which is acceptable for a unit test.
1184    fn tmp_dir_utf8(dir: &TempDir) -> Utf8PathBuf {
1185        Utf8PathBuf::from_path_buf(dir.path().to_path_buf()).expect("temp dir path must be UTF-8")
1186    }
1187
1188    /// A fixed 26-char ULID-shaped string used in tests. Real callers use
1189    /// the `ulid` crate; tests pin a constant so output is reproducible.
1190    const TEST_SESSION_ID: &str = "01JCKZ7Q0000000000000000AB";
1191
1192    fn open_log(path: &Utf8Path) -> ProvenanceLog {
1193        ProvenanceLog::open(path, TEST_SESSION_ID.to_string()).expect("open")
1194    }
1195
1196    fn empty_input() -> RowInput<'static> {
1197        RowInput {
1198            event: LogEvent::Fetch,
1199            result: LogResult::Ok,
1200            capability: Capability::Oa,
1201            ref_: None,
1202            source: None,
1203            error_code: None,
1204            size_bytes: None,
1205            license: None,
1206            store_path: None,
1207            canonical_digest: None,
1208        }
1209    }
1210
1211    /// Read the on-disk log and parse every line into a `LogRow`.
1212    fn read_rows(path: &Utf8Path) -> Vec<LogRow> {
1213        let raw = fs::read_to_string(path).expect("read log");
1214        raw.lines()
1215            .filter(|l| !l.is_empty())
1216            .map(|l| serde_json::from_str::<LogRow>(l).expect("valid LogRow"))
1217            .collect()
1218    }
1219
1220    /// Recompute `this_hash` for a stored row and assert it matches the
1221    /// stored value. Walks the same canonicalization rule as
1222    /// [`compute_this_hash`].
1223    fn verify_this_hash(row: &LogRow) {
1224        let rfh = RowForHash {
1225            ts: row.ts,
1226            ts_seq: row.ts_seq,
1227            event: row.event,
1228            ref_: row.ref_.as_deref(),
1229            source: row.source.as_deref(),
1230            result: row.result,
1231            license: row.license.as_deref(),
1232            size_bytes: row.size_bytes,
1233            store_path: row.store_path.as_deref(),
1234            capability: row.capability,
1235            session_id: &row.session_id,
1236            error_code: row.error_code.as_deref(),
1237            schema_version: &row.schema_version,
1238            canonical_digest: row.canonical_digest.as_deref(),
1239            prev_hash: &row.prev_hash,
1240        };
1241        let recomputed = compute_this_hash(&rfh).expect("hash");
1242        assert_eq!(
1243            recomputed, row.this_hash,
1244            "this_hash mismatch on ts_seq {}",
1245            row.ts_seq
1246        );
1247    }
1248
1249    #[test]
1250    fn first_row_uses_genesis_prev_hash() {
1251        let dir = TempDir::new().expect("tmp");
1252        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1253        let log = open_log(&path);
1254        let seq = log.append(empty_input()).expect("append");
1255        assert_eq!(seq, 1);
1256
1257        let rows = read_rows(&path);
1258        assert_eq!(rows.len(), 1);
1259        assert_eq!(rows[0].ts_seq, 1);
1260        assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1261        assert_eq!(rows[0].this_hash.len(), 64);
1262        assert_eq!(rows[0].session_id, TEST_SESSION_ID);
1263        verify_this_hash(&rows[0]);
1264    }
1265
1266    #[test]
1267    fn subsequent_rows_chain_correctly() {
1268        let dir = TempDir::new().expect("tmp");
1269        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1270        let log = open_log(&path);
1271
1272        for _ in 0..3 {
1273            log.append(empty_input()).expect("append");
1274        }
1275
1276        let rows = read_rows(&path);
1277        assert_eq!(rows.len(), 3);
1278        assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1279        assert_eq!(rows[1].prev_hash, rows[0].this_hash);
1280        assert_eq!(rows[2].prev_hash, rows[1].this_hash);
1281        for r in &rows {
1282            verify_this_hash(r);
1283        }
1284        assert_eq!(rows[0].ts_seq, 1);
1285        assert_eq!(rows[1].ts_seq, 2);
1286        assert_eq!(rows[2].ts_seq, 3);
1287    }
1288
1289    #[test]
1290    fn recovery_after_reopen() {
1291        let dir = TempDir::new().expect("tmp");
1292        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1293
1294        {
1295            let log = open_log(&path);
1296            for _ in 0..3 {
1297                log.append(empty_input()).expect("append");
1298            }
1299        } // drop writer
1300
1301        let log2 = open_log(&path);
1302        let seq = log2.append(empty_input()).expect("append after reopen");
1303        assert_eq!(seq, 4);
1304
1305        let rows = read_rows(&path);
1306        assert_eq!(rows.len(), 4);
1307        assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1308        for i in 1..rows.len() {
1309            assert_eq!(
1310                rows[i].prev_hash,
1311                rows[i - 1].this_hash,
1312                "chain break at row {}",
1313                i + 1
1314            );
1315        }
1316        for (i, r) in rows.iter().enumerate() {
1317            assert_eq!(r.ts_seq, (i + 1) as u64);
1318            verify_this_hash(r);
1319        }
1320    }
1321
1322    #[test]
1323    fn concurrent_writers_in_same_process_serialize() {
1324        let dir = TempDir::new().expect("tmp");
1325        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1326        let log = Arc::new(open_log(&path));
1327
1328        let mut handles = Vec::with_capacity(8);
1329        for _ in 0..8 {
1330            let log = Arc::clone(&log);
1331            handles.push(thread::spawn(move || {
1332                log.append(empty_input()).expect("append")
1333            }));
1334        }
1335        let mut returned: Vec<u64> = handles
1336            .into_iter()
1337            .map(|h| h.join().expect("join"))
1338            .collect();
1339        returned.sort_unstable();
1340        assert_eq!(returned, vec![1, 2, 3, 4, 5, 6, 7, 8]);
1341
1342        let rows = read_rows(&path);
1343        assert_eq!(rows.len(), 8);
1344
1345        // The in-process mutex serializes appends, so file order MUST equal
1346        // ts_seq order: row N (0-indexed) on disk has ts_seq = N+1.
1347        for (i, r) in rows.iter().enumerate() {
1348            assert_eq!(r.ts_seq, (i + 1) as u64, "ts_seq gap at file row {}", i + 1);
1349        }
1350        // Hash chain follows file order.
1351        assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1352        for i in 1..rows.len() {
1353            assert_eq!(
1354                rows[i].prev_hash,
1355                rows[i - 1].this_hash,
1356                "chain break at file row {}",
1357                i + 1
1358            );
1359        }
1360        for r in &rows {
1361            verify_this_hash(r);
1362        }
1363    }
1364
1365    #[test]
1366    fn corrupted_existing_log_fails_open() {
1367        let dir = TempDir::new().expect("tmp");
1368        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1369
1370        // JSON but not a valid LogRow: missing required fields, has unknown
1371        // field. `deny_unknown_fields` ensures the parser refuses.
1372        fs::write(&path, "{\"ts_seq\": 1, \"garbage\": true}\n").expect("write");
1373
1374        let err =
1375            ProvenanceLog::open(&path, TEST_SESSION_ID.to_string()).expect_err("must fail open");
1376        match err {
1377            LogError::Io(io) => {
1378                let msg = io.to_string();
1379                assert!(
1380                    msg.contains("corrupted log at line 1"),
1381                    "expected synthetic corruption message, got: {}",
1382                    msg
1383                );
1384            }
1385            other => panic!("expected LogError::Io, got {:?}", other),
1386        }
1387    }
1388
1389    #[test]
1390    fn rejects_non_regular_file() {
1391        // Pointing the log at a directory must fail with NotARegularFile.
1392        let dir = TempDir::new().expect("tmp");
1393        let err = ProvenanceLog::open(tmp_dir_utf8(&dir), TEST_SESSION_ID.to_string())
1394            .expect_err("must fail");
1395        match err {
1396            LogError::NotARegularFile(_) => {}
1397            other => panic!("expected NotARegularFile, got {:?}", other),
1398        }
1399    }
1400
1401    #[test]
1402    fn canonical_json_excludes_this_hash_field() {
1403        // Spec contract: the hashed bytes do not include `this_hash`. If
1404        // this ever regresses, every previously-written log becomes
1405        // unverifiable.
1406        let rfh = RowForHash {
1407            ts: Utc::now(),
1408            ts_seq: 1,
1409            event: LogEvent::Fetch,
1410            ref_: None,
1411            source: None,
1412            result: LogResult::Ok,
1413            license: None,
1414            size_bytes: None,
1415            store_path: None,
1416            capability: Capability::Oa,
1417            session_id: TEST_SESSION_ID,
1418            error_code: None,
1419            schema_version: LOG_SCHEMA_VERSION,
1420            canonical_digest: None,
1421            prev_hash: GENESIS_HASH,
1422        };
1423        let bytes = canonical_json_for_hash(&rfh).expect("canonicalize");
1424        let s = std::str::from_utf8(&bytes).expect("utf8");
1425        assert!(!s.contains("this_hash"), "this_hash leaked into hash input");
1426        assert!(s.contains("\"prev_hash\":"));
1427    }
1428
1429    #[test]
1430    fn canonical_json_keys_are_lexicographically_sorted() {
1431        // PROVENANCE_LOG.md §4: canonical JSON uses keys sorted
1432        // lexicographically. The lex-first top-level key of a row is
1433        // `capability` ("c..." < "e..." < ...). Build a row and assert the
1434        // canonical bytes start with that key.
1435        let rfh = RowForHash {
1436            ts: Utc::now(),
1437            ts_seq: 1,
1438            event: LogEvent::Fetch,
1439            ref_: Some("10.1234/example"),
1440            source: Some("unpaywall"),
1441            result: LogResult::Ok,
1442            license: Some("CC-BY-4.0"),
1443            size_bytes: Some(1234),
1444            store_path: Some("papers/x.pdf"),
1445            capability: Capability::Oa,
1446            session_id: TEST_SESSION_ID,
1447            error_code: None,
1448            schema_version: LOG_SCHEMA_VERSION,
1449            canonical_digest: Some(
1450                "0000000000000000000000000000000000000000000000000000000000000000",
1451            ),
1452            prev_hash: GENESIS_HASH,
1453        };
1454        let bytes = canonical_json_for_hash(&rfh).expect("canonicalize");
1455        let s = std::str::from_utf8(&bytes).expect("utf8");
1456        // v2: lex-first key is `canonical_digest` (< `capability` because
1457        // 'n' < 'p' at byte index 2). Pre-v2 it was `capability`.
1458        assert!(
1459            s.starts_with("{\"canonical_digest\":"),
1460            "canonical bytes must start with lex-first v2 key, got: {}",
1461            s
1462        );
1463        // Spot-check ordering: `prev_hash` (p) must come before `ref` (r),
1464        // which must come before `result` (re...) — wait, "ref" < "result"
1465        // lexicographically because 'f' < 's' in ascii at index 2 vs 'e' at
1466        // index 2 of "result"... let me just check a couple of unambiguous
1467        // pairs: `event` < `prev_hash`, and `ts` < `ts_seq`.
1468        let event_idx = s.find("\"event\":").expect("event key present");
1469        let prev_idx = s.find("\"prev_hash\":").expect("prev_hash key present");
1470        assert!(event_idx < prev_idx, "event must precede prev_hash");
1471        let ts_idx = s.find("\"ts\":").expect("ts key present");
1472        let tsseq_idx = s.find("\"ts_seq\":").expect("ts_seq key present");
1473        assert!(ts_idx < tsseq_idx, "ts must precede ts_seq");
1474    }
1475
1476    // -----------------------------------------------------------------
1477    // verify() tests — Phase 1 surface for `doiget audit-log --verify`.
1478    // -----------------------------------------------------------------
1479
1480    /// Rewrite a single field's quoted-string value on a specific 1-based
1481    /// line of `path`. Used to simulate tampering. Panics on malformed input
1482    /// — only valid inputs are produced by the test harness.
1483    ///
1484    /// `field_key` is matched as `"field_key":"...old..."` (quoted string
1485    /// JSON value). The new value is the literal string `new_value` (no
1486    /// JSON escaping needed for the test fixtures we use).
1487    fn tamper_string_field(
1488        path: &Utf8Path,
1489        line_no_1based: usize,
1490        field_key: &str,
1491        new_value: &str,
1492    ) {
1493        let raw = fs::read_to_string(path).expect("read log");
1494        let mut lines: Vec<String> = raw.lines().map(str::to_string).collect();
1495        let target = &lines[line_no_1based - 1];
1496        let needle = format!("\"{field_key}\":\"");
1497        let start = target
1498            .find(&needle)
1499            .unwrap_or_else(|| panic!("field {field_key} not found on line {line_no_1based}"))
1500            + needle.len();
1501        let end_rel = target[start..]
1502            .find('"')
1503            .unwrap_or_else(|| panic!("unterminated string for field {field_key}"));
1504        let end = start + end_rel;
1505        let mut new_line = String::with_capacity(target.len());
1506        new_line.push_str(&target[..start]);
1507        new_line.push_str(new_value);
1508        new_line.push_str(&target[end..]);
1509        lines[line_no_1based - 1] = new_line;
1510        let mut out = lines.join("\n");
1511        out.push('\n');
1512        fs::write(path, out).expect("write tampered log");
1513    }
1514
1515    #[test]
1516    fn verify_empty_log_is_ok() {
1517        // Missing file is a clean log — no tampering possible on bytes that
1518        // don't exist. `verify` returns an empty report, not an error.
1519        let dir = TempDir::new().expect("tmp");
1520        let path = tmp_dir_utf8(&dir).join("nonexistent.jsonl");
1521        assert!(!path.exists(), "precondition: file must not exist");
1522
1523        let report = verify(&path).expect("verify must not error on missing file");
1524        assert_eq!(report.total_rows, 0);
1525        assert_eq!(report.ok_rows, 0);
1526        assert!(report.errors.is_empty(), "errors: {:?}", report.errors);
1527    }
1528
1529    #[test]
1530    fn verify_well_formed_chain_passes() {
1531        // Three rows written via the real writer must verify clean.
1532        let dir = TempDir::new().expect("tmp");
1533        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1534        let log = open_log(&path);
1535        for _ in 0..3 {
1536            log.append(empty_input()).expect("append");
1537        }
1538
1539        let report = verify(&path).expect("verify must succeed");
1540        assert_eq!(report.total_rows, 3);
1541        assert_eq!(report.ok_rows, 3);
1542        assert!(
1543            report.errors.is_empty(),
1544            "expected no issues on a well-formed log; got: {:?}",
1545            report.errors
1546        );
1547    }
1548
1549    #[test]
1550    fn verify_detects_tampered_row_hash() {
1551        // Mutate the SECOND row's `this_hash` to a syntactically-valid but
1552        // wrong hash. The recomputed canonical-JSON SHA-256 will not match.
1553        let dir = TempDir::new().expect("tmp");
1554        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1555        let log = open_log(&path);
1556        log.append(empty_input()).expect("append 1");
1557        log.append(empty_input()).expect("append 2");
1558        drop(log);
1559
1560        // 64 lowercase hex chars, all zeros — passes `LogRow` parse, fails hash check.
1561        tamper_string_field(
1562            &path,
1563            2,
1564            "this_hash",
1565            "0000000000000000000000000000000000000000000000000000000000000000",
1566        );
1567
1568        let report = verify(&path).expect("verify must succeed");
1569        assert_eq!(report.total_rows, 2);
1570        // Row 2's hash mismatch breaks both the hash check on row 2 AND the
1571        // chain link from row 2's stored `prev_hash` (still correct) into the
1572        // forward direction. There's no row 3 to fail forward, so we expect
1573        // exactly one issue: the this-hash mismatch on line 2.
1574        let hash_issues: Vec<_> = report
1575            .errors
1576            .iter()
1577            .filter(|e| e.kind == VerifyIssueKind::ThisHashMismatch)
1578            .collect();
1579        assert_eq!(
1580            hash_issues.len(),
1581            1,
1582            "expected exactly one ThisHashMismatch, got {:?}",
1583            report.errors
1584        );
1585        assert_eq!(hash_issues[0].line, 2);
1586    }
1587
1588    #[test]
1589    fn verify_detects_tampered_prev_hash() {
1590        // Mutate the SECOND row's `prev_hash` to a wrong value. This
1591        // invalidates the chain link but the row's own `this_hash` was
1592        // computed with the original `prev_hash`, so the this-hash check
1593        // ALSO fails (hash input changed). We assert at least the prev-hash
1594        // issue is reported on line 2.
1595        let dir = TempDir::new().expect("tmp");
1596        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1597        let log = open_log(&path);
1598        log.append(empty_input()).expect("append 1");
1599        log.append(empty_input()).expect("append 2");
1600        drop(log);
1601
1602        tamper_string_field(
1603            &path,
1604            2,
1605            "prev_hash",
1606            "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
1607        );
1608
1609        let report = verify(&path).expect("verify must succeed");
1610        assert_eq!(report.total_rows, 2);
1611        let prev_issues: Vec<_> = report
1612            .errors
1613            .iter()
1614            .filter(|e| e.kind == VerifyIssueKind::PrevHashMismatch)
1615            .collect();
1616        assert_eq!(
1617            prev_issues.len(),
1618            1,
1619            "expected exactly one PrevHashMismatch, got {:?}",
1620            report.errors
1621        );
1622        assert_eq!(prev_issues[0].line, 2);
1623    }
1624
1625    #[test]
1626    fn verify_detects_corrupted_json() {
1627        // One valid row plus a literal `{"garbage":true}` line. The garbage
1628        // line fails `serde_json::from_str::<LogRow>` (missing fields +
1629        // `deny_unknown_fields`) and surfaces as a `ParseError` on line 2.
1630        let dir = TempDir::new().expect("tmp");
1631        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1632        let log = open_log(&path);
1633        log.append(empty_input()).expect("append 1");
1634        drop(log);
1635
1636        // Append a garbage line directly.
1637        let mut existing = fs::read_to_string(&path).expect("read");
1638        if !existing.ends_with('\n') {
1639            existing.push('\n');
1640        }
1641        existing.push_str("{\"garbage\":true}\n");
1642        fs::write(&path, existing).expect("write");
1643
1644        let report = verify(&path).expect("verify must succeed");
1645        // total_rows counts non-empty lines, so both lines are counted.
1646        assert_eq!(report.total_rows, 2);
1647        let parse_issues: Vec<_> = report
1648            .errors
1649            .iter()
1650            .filter(|e| e.kind == VerifyIssueKind::ParseError)
1651            .collect();
1652        assert_eq!(
1653            parse_issues.len(),
1654            1,
1655            "expected exactly one ParseError, got {:?}",
1656            report.errors
1657        );
1658        assert_eq!(parse_issues[0].line, 2);
1659    }
1660
1661    #[test]
1662    fn capability_serializes_kebab_case() {
1663        // PROVENANCE_LOG.md §3 requires `oa`, `metadata`, `tdm-elsevier`,
1664        // `tdm-aps`, `tdm-springer` on the wire (kebab-case).
1665        let cases = [
1666            (Capability::Oa, "\"oa\""),
1667            (Capability::Metadata, "\"metadata\""),
1668            (Capability::TdmElsevier, "\"tdm-elsevier\""),
1669            (Capability::TdmAps, "\"tdm-aps\""),
1670            (Capability::TdmSpringer, "\"tdm-springer\""),
1671        ];
1672        for (cap, expected) in cases {
1673            let got = serde_json::to_string(&cap).expect("serialize");
1674            assert_eq!(
1675                got, expected,
1676                "capability wire format mismatch for {:?}",
1677                cap
1678            );
1679        }
1680    }
1681}