use std::collections::BTreeMap;
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::sync::Mutex;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use camino::{Utf8Path, Utf8PathBuf};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct LogRow {
pub ts: DateTime<Utc>,
pub ts_seq: u64,
pub event: LogEvent,
#[serde(rename = "ref")]
pub ref_: Option<String>,
pub source: Option<String>,
pub result: LogResult,
pub license: Option<String>,
pub size_bytes: Option<u64>,
pub store_path: Option<String>,
pub capability: Capability,
pub session_id: String,
pub error_code: Option<String>,
pub schema_version: String,
pub canonical_digest: Option<String>,
pub prev_hash: String,
pub this_hash: String,
}
pub const LOG_SCHEMA_VERSION: &str = "v2";
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum LogEvent {
SessionStart,
CapabilityResolved,
Resolve,
Fetch,
StoreWrite,
SessionEnd,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum LogResult {
Ok,
Err,
Denied,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
#[non_exhaustive]
pub enum Capability {
Oa,
Metadata,
TdmElsevier,
TdmAps,
TdmSpringer,
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum LogError {
#[error("provenance log io error: {0}")]
Io(#[from] std::io::Error),
#[error("provenance log serialization error: {0}")]
Serialize(#[from] serde_json::Error),
#[error("provenance log path is not a regular file: {0}")]
NotARegularFile(Utf8PathBuf),
}
#[derive(Debug)]
pub struct ProvenanceLog {
path: Utf8PathBuf,
state: Mutex<LogState>,
session_id: String,
rotate_threshold: u64,
}
#[derive(Debug)]
struct LogState {
next_seq: u64,
last_hash: String,
}
const GENESIS_HASH: &str = "GENESIS";
const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
const DEFAULT_RETENTION_DAYS: i64 = 90;
fn rotate_threshold_bytes() -> u64 {
match std::env::var("DOIGET_LOG_ROTATE_BYTES") {
Ok(s) => s.trim().parse::<u64>().unwrap_or(ROTATE_BYTES),
Err(_) => ROTATE_BYTES,
}
}
fn retention_days() -> i64 {
match std::env::var("DOIGET_LOG_RETENTION_DAYS") {
Ok(s) => match s.trim().parse::<i64>() {
Ok(n) if n >= 0 => n,
_ => {
tracing::warn!(
value = %s,
"DOIGET_LOG_RETENTION_DAYS is not a non-negative integer; \
using the {DEFAULT_RETENTION_DAYS}-day default"
);
DEFAULT_RETENTION_DAYS
}
},
Err(_) => DEFAULT_RETENTION_DAYS,
}
}
fn rotate_log(path: &Utf8Path) -> Result<(), LogError> {
let file_name = path.file_name().ok_or_else(|| {
LogError::Io(std::io::Error::other(
"provenance log path has no file name; cannot rotate",
))
})?;
let ts = Utc::now().format("%Y-%m-%d-%H%M%S");
let gz_name = format!("{file_name}.{ts}.gz");
let dir = path.parent().unwrap_or_else(|| Utf8Path::new("."));
let gz_path = dir.join(&gz_name);
let tmp_path = dir.join(format!("{gz_name}.tmp"));
{
let mut src = File::open(path)?;
let tmp = File::create(&tmp_path)?;
let mut enc = GzEncoder::new(BufWriter::new(tmp), Compression::default());
std::io::copy(&mut src, &mut enc)?;
let bufw = enc.finish()?;
let tmp = bufw.into_inner().map_err(|e| {
LogError::Io(std::io::Error::other(format!(
"gz tmp buf flush failed: {}",
e.error()
)))
})?;
tmp.sync_all()?;
}
std::fs::rename(&tmp_path, &gz_path)?;
std::fs::remove_file(path)?;
Ok(())
}
fn rotated_segments(current: &Utf8Path) -> Vec<Utf8PathBuf> {
let Some(file_name) = current.file_name() else {
return Vec::new();
};
let dir = current.parent().unwrap_or_else(|| Utf8Path::new("."));
let prefix = format!("{file_name}.");
let mut segs: Vec<Utf8PathBuf> = match std::fs::read_dir(dir.as_std_path()) {
Ok(rd) => rd
.filter_map(|e| e.ok())
.filter_map(|e| Utf8PathBuf::from_path_buf(e.path()).ok())
.filter(|p| {
p.file_name()
.map(|n| n.starts_with(&prefix) && n.ends_with(".gz"))
.unwrap_or(false)
})
.collect(),
Err(_) => Vec::new(),
};
segs.sort();
segs
}
fn prune_rotated_segments(current: &Utf8Path, days: i64) {
if days <= 0 {
return;
}
let Some(cutoff) = std::time::SystemTime::now()
.checked_sub(std::time::Duration::from_secs(days as u64 * 86_400))
else {
return;
};
for seg in rotated_segments(current) {
let aged = std::fs::metadata(seg.as_std_path())
.and_then(|m| m.modified())
.map(|mt| mt < cutoff)
.unwrap_or(false);
if !aged {
continue;
}
match std::fs::remove_file(seg.as_std_path()) {
Ok(()) => tracing::info!(
segment = %seg,
"provenance: pruned rotated segment past retention"
),
Err(e) => tracing::warn!(
segment = %seg, error = %e,
"provenance: failed to prune rotated segment (best-effort; continuing)"
),
}
}
}
pub fn verify_all(current: &Utf8Path) -> Result<Vec<(Utf8PathBuf, VerifyReport)>, LogError> {
let mut out = Vec::new();
for seg in rotated_segments(current) {
let gz = File::open(seg.as_std_path())?;
let mut dec = GzDecoder::new(gz);
let tmp = tempfile::NamedTempFile::new().map_err(|e| {
LogError::Io(std::io::Error::other(format!(
"verify_all: tempfile for {seg}: {e}"
)))
})?;
{
let mut w = File::create(tmp.path())?;
std::io::copy(&mut dec, &mut w)?;
w.sync_all()?;
}
let tmp_utf8 = Utf8Path::from_path(tmp.path()).ok_or_else(|| {
LogError::Io(std::io::Error::other("verify_all: non-utf8 tempfile path"))
})?;
let report = verify(tmp_utf8)?;
out.push((seg, report));
}
let report = verify(current)?;
out.push((current.to_path_buf(), report));
Ok(out)
}
#[derive(Debug, Clone)]
pub struct RowInput<'a> {
pub event: LogEvent,
pub result: LogResult,
pub capability: Capability,
pub ref_: Option<&'a str>,
pub source: Option<&'a str>,
pub error_code: Option<&'a str>,
pub size_bytes: Option<u64>,
pub license: Option<&'a str>,
pub store_path: Option<&'a str>,
pub canonical_digest: Option<&'a str>,
}
#[derive(Serialize)]
struct RowForHash<'a> {
ts: DateTime<Utc>,
ts_seq: u64,
event: LogEvent,
#[serde(rename = "ref")]
ref_: Option<&'a str>,
source: Option<&'a str>,
result: LogResult,
license: Option<&'a str>,
size_bytes: Option<u64>,
store_path: Option<&'a str>,
capability: Capability,
session_id: &'a str,
error_code: Option<&'a str>,
schema_version: &'a str,
canonical_digest: Option<&'a str>,
prev_hash: &'a str,
}
fn canonical_json_for_hash(rfh: &RowForHash<'_>) -> Result<Vec<u8>, LogError> {
let value = serde_json::to_value(rfh)?;
let map = match value {
serde_json::Value::Object(m) => m,
_ => {
return Err(LogError::Serialize(serde::de::Error::custom(
"RowForHash did not serialize to a JSON object",
)));
}
};
let sorted: BTreeMap<String, serde_json::Value> = map.into_iter().collect();
Ok(serde_json::to_vec(&sorted)?)
}
fn compute_this_hash(rfh: &RowForHash<'_>) -> Result<String, LogError> {
let bytes = canonical_json_for_hash(rfh)?;
let digest = Sha256::digest(&bytes);
Ok(hex::encode(digest))
}
impl ProvenanceLog {
pub fn open(path: impl Into<Utf8PathBuf>, session_id: String) -> Result<Self, LogError> {
Self::open_with_rotate_threshold(path, session_id, rotate_threshold_bytes())
}
pub(crate) fn open_with_rotate_threshold(
path: impl Into<Utf8PathBuf>,
session_id: String,
rotate_threshold: u64,
) -> Result<Self, LogError> {
let path: Utf8PathBuf = path.into();
if path.exists() {
let md = std::fs::metadata(&path)?;
if !md.is_file() {
return Err(LogError::NotARegularFile(path));
}
}
let (next_seq, last_hash) = recover_state(&path)?;
prune_rotated_segments(&path, retention_days());
Ok(Self {
path,
state: Mutex::new(LogState {
next_seq,
last_hash,
}),
session_id,
rotate_threshold,
})
}
pub fn append(&self, input: RowInput<'_>) -> Result<u64, LogError> {
let mut state = self
.state
.lock()
.map_err(|_| LogError::Io(std::io::Error::other("provenance log mutex poisoned")))?;
let threshold = self.rotate_threshold;
if threshold > 0 {
let size = match std::fs::metadata(&self.path) {
Ok(m) => m.len(),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => 0,
Err(e) => return Err(LogError::Io(e)),
};
if size >= threshold {
rotate_log(&self.path)?;
state.next_seq = 1;
state.last_hash = GENESIS_HASH.to_string();
}
}
let ts_seq = state.next_seq;
let prev_hash = state.last_hash.clone();
let ts = Utc::now();
let rfh = RowForHash {
ts,
ts_seq,
event: input.event,
ref_: input.ref_,
source: input.source,
result: input.result,
license: input.license,
size_bytes: input.size_bytes,
store_path: input.store_path,
capability: input.capability,
session_id: &self.session_id,
error_code: input.error_code,
schema_version: LOG_SCHEMA_VERSION,
canonical_digest: input.canonical_digest,
prev_hash: &prev_hash,
};
let this_hash = compute_this_hash(&rfh)?;
let row = LogRow {
ts,
ts_seq,
event: input.event,
ref_: input.ref_.map(str::to_string),
source: input.source.map(str::to_string),
result: input.result,
license: input.license.map(str::to_string),
size_bytes: input.size_bytes,
store_path: input.store_path.map(str::to_string),
capability: input.capability,
session_id: self.session_id.clone(),
error_code: input.error_code.map(str::to_string),
schema_version: LOG_SCHEMA_VERSION.to_string(),
canonical_digest: input.canonical_digest.map(str::to_string),
prev_hash,
this_hash: this_hash.clone(),
};
let mut bytes = serde_json::to_vec(&row)?;
bytes.push(b'\n');
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)?;
let mut writer = BufWriter::new(file);
writer.write_all(&bytes)?;
writer.flush()?;
let file = writer.into_inner().map_err(|e| {
LogError::Io(std::io::Error::other(format!(
"buf writer flush failed: {}",
e.error()
)))
})?;
file.sync_all()?;
state.next_seq = ts_seq + 1;
state.last_hash = this_hash;
Ok(ts_seq)
}
pub fn path(&self) -> &Utf8Path {
&self.path
}
pub fn session_id(&self) -> &str {
&self.session_id
}
}
fn recover_state(path: &Utf8Path) -> Result<(u64, String), LogError> {
let file = match File::open(path) {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
return Ok((1, GENESIS_HASH.to_string()));
}
Err(e) => return Err(LogError::Io(e)),
};
let reader = BufReader::new(file);
let mut last_seq: u64 = 0;
let mut last_hash: String = GENESIS_HASH.to_string();
for (idx, line_res) in reader.lines().enumerate() {
let line_no = idx + 1;
let line = line_res?;
if line.is_empty() {
continue;
}
let row: LogRow = serde_json::from_str(&line).map_err(|e| {
LogError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("corrupted log at line {}: {}", line_no, e),
))
})?;
last_seq = row.ts_seq;
last_hash = row.this_hash;
}
if last_seq == 0 {
Ok((1, GENESIS_HASH.to_string()))
} else {
Ok((last_seq + 1, last_hash))
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct VerifyReport {
pub total_rows: usize,
pub ok_rows: usize,
pub errors: Vec<VerifyIssue>,
}
impl VerifyReport {
fn empty() -> Self {
Self {
total_rows: 0,
ok_rows: 0,
errors: Vec::new(),
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct VerifyIssue {
pub line: usize,
pub kind: VerifyIssueKind,
pub message: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum VerifyIssueKind {
ParseError,
PrevHashMismatch,
ThisHashMismatch,
SequenceJump,
}
pub fn verify(path: &Utf8Path) -> Result<VerifyReport, LogError> {
let file = match File::open(path) {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
tracing::warn!(
path = %path,
"audit-log verify: log file does not exist; reporting empty"
);
return Ok(VerifyReport::empty());
}
Err(e) => return Err(LogError::Io(e)),
};
let reader = BufReader::new(file);
let mut report = VerifyReport::empty();
let mut prev_row: Option<LogRow> = None;
for (idx, line_res) in reader.lines().enumerate() {
let line_no = idx + 1;
let line = line_res?;
if line.is_empty() {
continue;
}
report.total_rows += 1;
let row: LogRow = match serde_json::from_str(&line) {
Ok(r) => r,
Err(e) => {
report.errors.push(VerifyIssue {
line: line_no,
kind: VerifyIssueKind::ParseError,
message: format!("failed to parse row as LogRow: {e}"),
});
continue;
}
};
let mut row_ok = true;
let rfh = RowForHash {
ts: row.ts,
ts_seq: row.ts_seq,
event: row.event,
ref_: row.ref_.as_deref(),
source: row.source.as_deref(),
result: row.result,
license: row.license.as_deref(),
size_bytes: row.size_bytes,
store_path: row.store_path.as_deref(),
capability: row.capability,
session_id: &row.session_id,
error_code: row.error_code.as_deref(),
schema_version: &row.schema_version,
canonical_digest: row.canonical_digest.as_deref(),
prev_hash: &row.prev_hash,
};
match compute_this_hash(&rfh) {
Ok(recomputed) => {
if recomputed != row.this_hash {
report.errors.push(VerifyIssue {
line: line_no,
kind: VerifyIssueKind::ThisHashMismatch,
message: format!(
"this_hash mismatch: stored={}, recomputed={}",
row.this_hash, recomputed
),
});
row_ok = false;
}
}
Err(e) => {
report.errors.push(VerifyIssue {
line: line_no,
kind: VerifyIssueKind::ThisHashMismatch,
message: format!("failed to recompute this_hash: {e}"),
});
row_ok = false;
}
}
let is_genesis = row.prev_hash == GENESIS_HASH;
match &prev_row {
None => {
if !is_genesis {
report.errors.push(VerifyIssue {
line: line_no,
kind: VerifyIssueKind::PrevHashMismatch,
message: format!(
"first row must have prev_hash=\"GENESIS\", got {:?}",
row.prev_hash
),
});
row_ok = false;
}
}
Some(prev) => {
if is_genesis {
} else if row.prev_hash != prev.this_hash {
report.errors.push(VerifyIssue {
line: line_no,
kind: VerifyIssueKind::PrevHashMismatch,
message: format!(
"prev_hash mismatch: row stores {}, previous row's this_hash is {}",
row.prev_hash, prev.this_hash
),
});
row_ok = false;
}
}
}
if let Some(prev) = &prev_row {
if !is_genesis && row.ts_seq <= prev.ts_seq {
report.errors.push(VerifyIssue {
line: line_no,
kind: VerifyIssueKind::SequenceJump,
message: format!(
"ts_seq did not increase strictly: previous={}, current={}",
prev.ts_seq, row.ts_seq
),
});
row_ok = false;
}
}
if row_ok {
report.ok_rows += 1;
}
prev_row = Some(row);
}
Ok(report)
}
#[derive(Debug, Clone, Serialize)]
#[non_exhaustive]
pub struct MigrationReport {
pub rows_rewritten: u64,
pub dry_run: bool,
pub first_row_v1_chain_hash: String,
pub first_row_v2_chain_hash: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
struct V1LogRow {
ts: DateTime<Utc>,
ts_seq: u64,
event: LogEvent,
#[serde(rename = "ref")]
ref_: Option<String>,
source: Option<String>,
result: LogResult,
license: Option<String>,
size_bytes: Option<u64>,
store_path: Option<String>,
capability: Capability,
session_id: String,
error_code: Option<String>,
prev_hash: String,
this_hash: String,
}
#[derive(Debug, Clone)]
struct MigrationRowSeed {
ts: DateTime<Utc>,
ts_seq: u64,
event: LogEvent,
ref_: Option<String>,
source: Option<String>,
result: LogResult,
license: Option<String>,
size_bytes: Option<u64>,
store_path: Option<String>,
capability: Capability,
session_id: String,
error_code: Option<String>,
canonical_digest_in: Option<String>,
stored_this_hash: String,
}
pub fn migrate_v1_to_v2(log_path: &Utf8Path, dry_run: bool) -> Result<MigrationReport, LogError> {
use std::io::BufRead;
let file = match File::open(log_path) {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
return Ok(MigrationReport {
rows_rewritten: 0,
dry_run,
first_row_v1_chain_hash: GENESIS_HASH.to_string(),
first_row_v2_chain_hash: GENESIS_HASH.to_string(),
});
}
Err(e) => return Err(LogError::Io(e)),
};
let reader = BufReader::new(file);
let mut seeds: Vec<MigrationRowSeed> = Vec::new();
for (idx, line_res) in reader.lines().enumerate() {
let line_no = idx + 1;
let line = line_res?;
if line.is_empty() {
continue;
}
let seed = if let Ok(v1) = serde_json::from_str::<V1LogRow>(&line) {
MigrationRowSeed {
ts: v1.ts,
ts_seq: v1.ts_seq,
event: v1.event,
ref_: v1.ref_,
source: v1.source,
result: v1.result,
license: v1.license,
size_bytes: v1.size_bytes,
store_path: v1.store_path,
capability: v1.capability,
session_id: v1.session_id,
error_code: v1.error_code,
canonical_digest_in: None,
stored_this_hash: v1.this_hash,
}
} else {
match serde_json::from_str::<LogRow>(&line) {
Ok(v2) => MigrationRowSeed {
ts: v2.ts,
ts_seq: v2.ts_seq,
event: v2.event,
ref_: v2.ref_,
source: v2.source,
result: v2.result,
license: v2.license,
size_bytes: v2.size_bytes,
store_path: v2.store_path,
capability: v2.capability,
session_id: v2.session_id,
error_code: v2.error_code,
canonical_digest_in: v2.canonical_digest,
stored_this_hash: v2.this_hash,
},
Err(e) => {
return Err(LogError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("migration: line {line_no} is neither v1 nor v2: {e}"),
)));
}
}
};
seeds.push(seed);
}
fn derive_digest(seed: &MigrationRowSeed) -> Option<String> {
let ref_str = seed.ref_.as_deref()?;
let source_key = seed.source.as_deref().unwrap_or("");
let source_type = if ref_str.starts_with("10.") {
crate::SourceType::Doi
} else {
crate::SourceType::Arxiv
};
let c = crate::CanonicalRef::new(source_type, ref_str, source_key, None);
Some(c.digest_hex())
}
let digests: Vec<Option<String>> = seeds
.iter()
.map(|s| s.canonical_digest_in.clone().or_else(|| derive_digest(s)))
.collect();
let mut out_rows: Vec<LogRow> = Vec::with_capacity(seeds.len());
let mut prev_hash: String = GENESIS_HASH.to_string();
for (seed, digest) in seeds.iter().zip(digests.iter()) {
let rfh = RowForHash {
ts: seed.ts,
ts_seq: seed.ts_seq,
event: seed.event,
ref_: seed.ref_.as_deref(),
source: seed.source.as_deref(),
result: seed.result,
license: seed.license.as_deref(),
size_bytes: seed.size_bytes,
store_path: seed.store_path.as_deref(),
capability: seed.capability,
session_id: &seed.session_id,
error_code: seed.error_code.as_deref(),
schema_version: LOG_SCHEMA_VERSION,
canonical_digest: digest.as_deref(),
prev_hash: &prev_hash,
};
let this_hash = compute_this_hash(&rfh)?;
let row = LogRow {
ts: seed.ts,
ts_seq: seed.ts_seq,
event: seed.event,
ref_: seed.ref_.clone(),
source: seed.source.clone(),
result: seed.result,
license: seed.license.clone(),
size_bytes: seed.size_bytes,
store_path: seed.store_path.clone(),
capability: seed.capability,
session_id: seed.session_id.clone(),
error_code: seed.error_code.clone(),
schema_version: LOG_SCHEMA_VERSION.to_string(),
canonical_digest: digest.clone(),
prev_hash: prev_hash.clone(),
this_hash: this_hash.clone(),
};
prev_hash = this_hash;
out_rows.push(row);
}
let first_v1_hash = seeds
.first()
.map(|s| s.stored_this_hash.clone())
.unwrap_or_else(|| GENESIS_HASH.to_string());
let first_v2_hash = out_rows
.first()
.map(|r| r.this_hash.clone())
.unwrap_or_else(|| GENESIS_HASH.to_string());
let report = MigrationReport {
rows_rewritten: out_rows.len() as u64,
dry_run,
first_row_v1_chain_hash: first_v1_hash,
first_row_v2_chain_hash: first_v2_hash,
};
if dry_run {
return Ok(report);
}
let staged_path = with_suffix(log_path, ".v2-migrated");
let backup_path = with_suffix(log_path, ".v1-backup");
{
let staged_file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&staged_path)?;
let mut writer = BufWriter::new(staged_file);
for row in &out_rows {
let mut bytes = serde_json::to_vec(row)?;
bytes.push(b'\n');
writer.write_all(&bytes)?;
}
writer.flush()?;
let file = writer.into_inner().map_err(|e| {
LogError::Io(std::io::Error::other(format!(
"migration buf writer flush failed: {}",
e.error()
)))
})?;
file.sync_all()?;
}
let verify_report = verify(&staged_path)?;
if !verify_report.errors.is_empty() {
return Err(LogError::Io(std::io::Error::other(format!(
"migration: staged v2 log failed verify; first issue: {:?}",
verify_report.errors.first()
))));
}
if log_path.exists() {
if backup_path.exists() {
std::fs::remove_file(&backup_path)?;
}
std::fs::rename(log_path, &backup_path)?;
}
std::fs::rename(&staged_path, log_path)?;
Ok(report)
}
fn with_suffix(path: &Utf8Path, suffix: &str) -> Utf8PathBuf {
let s = format!("{path}{suffix}");
Utf8PathBuf::from(s)
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
use std::fs;
use std::sync::Arc;
use std::thread;
use tempfile::TempDir;
fn tmp_dir_utf8(dir: &TempDir) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(dir.path().to_path_buf()).expect("temp dir path must be UTF-8")
}
const TEST_SESSION_ID: &str = "01JCKZ7Q0000000000000000AB";
fn open_log(path: &Utf8Path) -> ProvenanceLog {
ProvenanceLog::open(path, TEST_SESSION_ID.to_string()).expect("open")
}
fn empty_input() -> RowInput<'static> {
RowInput {
event: LogEvent::Fetch,
result: LogResult::Ok,
capability: Capability::Oa,
ref_: None,
source: None,
error_code: None,
size_bytes: None,
license: None,
store_path: None,
canonical_digest: None,
}
}
fn read_rows(path: &Utf8Path) -> Vec<LogRow> {
let raw = fs::read_to_string(path).expect("read log");
raw.lines()
.filter(|l| !l.is_empty())
.map(|l| serde_json::from_str::<LogRow>(l).expect("valid LogRow"))
.collect()
}
fn verify_this_hash(row: &LogRow) {
let rfh = RowForHash {
ts: row.ts,
ts_seq: row.ts_seq,
event: row.event,
ref_: row.ref_.as_deref(),
source: row.source.as_deref(),
result: row.result,
license: row.license.as_deref(),
size_bytes: row.size_bytes,
store_path: row.store_path.as_deref(),
capability: row.capability,
session_id: &row.session_id,
error_code: row.error_code.as_deref(),
schema_version: &row.schema_version,
canonical_digest: row.canonical_digest.as_deref(),
prev_hash: &row.prev_hash,
};
let recomputed = compute_this_hash(&rfh).expect("hash");
assert_eq!(
recomputed, row.this_hash,
"this_hash mismatch on ts_seq {}",
row.ts_seq
);
}
#[test]
fn first_row_uses_genesis_prev_hash() {
let dir = TempDir::new().expect("tmp");
let path = tmp_dir_utf8(&dir).join("log.jsonl");
let log = open_log(&path);
let seq = log.append(empty_input()).expect("append");
assert_eq!(seq, 1);
let rows = read_rows(&path);
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].ts_seq, 1);
assert_eq!(rows[0].prev_hash, GENESIS_HASH);
assert_eq!(rows[0].this_hash.len(), 64);
assert_eq!(rows[0].session_id, TEST_SESSION_ID);
verify_this_hash(&rows[0]);
}
#[test]
fn subsequent_rows_chain_correctly() {
let dir = TempDir::new().expect("tmp");
let path = tmp_dir_utf8(&dir).join("log.jsonl");
let log = open_log(&path);
for _ in 0..3 {
log.append(empty_input()).expect("append");
}
let rows = read_rows(&path);
assert_eq!(rows.len(), 3);
assert_eq!(rows[0].prev_hash, GENESIS_HASH);
assert_eq!(rows[1].prev_hash, rows[0].this_hash);
assert_eq!(rows[2].prev_hash, rows[1].this_hash);
for r in &rows {
verify_this_hash(r);
}
assert_eq!(rows[0].ts_seq, 1);
assert_eq!(rows[1].ts_seq, 2);
assert_eq!(rows[2].ts_seq, 3);
}
#[test]
fn recovery_after_reopen() {
let dir = TempDir::new().expect("tmp");
let path = tmp_dir_utf8(&dir).join("log.jsonl");
{
let log = open_log(&path);
for _ in 0..3 {
log.append(empty_input()).expect("append");
}
}
let log2 = open_log(&path);
let seq = log2.append(empty_input()).expect("append after reopen");
assert_eq!(seq, 4);
let rows = read_rows(&path);
assert_eq!(rows.len(), 4);
assert_eq!(rows[0].prev_hash, GENESIS_HASH);
for i in 1..rows.len() {
assert_eq!(
rows[i].prev_hash,
rows[i - 1].this_hash,
"chain break at row {}",
i + 1
);
}
for (i, r) in rows.iter().enumerate() {
assert_eq!(r.ts_seq, (i + 1) as u64);
verify_this_hash(r);
}
}
#[test]
fn concurrent_writers_in_same_process_serialize() {
let dir = TempDir::new().expect("tmp");
let path = tmp_dir_utf8(&dir).join("log.jsonl");
let log = Arc::new(open_log(&path));
let mut handles = Vec::with_capacity(8);
for _ in 0..8 {
let log = Arc::clone(&log);
handles.push(thread::spawn(move || {
log.append(empty_input()).expect("append")
}));
}
let mut returned: Vec<u64> = handles
.into_iter()
.map(|h| h.join().expect("join"))
.collect();
returned.sort_unstable();
assert_eq!(returned, vec![1, 2, 3, 4, 5, 6, 7, 8]);
let rows = read_rows(&path);
assert_eq!(rows.len(), 8);
for (i, r) in rows.iter().enumerate() {
assert_eq!(r.ts_seq, (i + 1) as u64, "ts_seq gap at file row {}", i + 1);
}
assert_eq!(rows[0].prev_hash, GENESIS_HASH);
for i in 1..rows.len() {
assert_eq!(
rows[i].prev_hash,
rows[i - 1].this_hash,
"chain break at file row {}",
i + 1
);
}
for r in &rows {
verify_this_hash(r);
}
}
#[test]
fn corrupted_existing_log_fails_open() {
let dir = TempDir::new().expect("tmp");
let path = tmp_dir_utf8(&dir).join("log.jsonl");
fs::write(&path, "{\"ts_seq\": 1, \"garbage\": true}\n").expect("write");
let err =
ProvenanceLog::open(&path, TEST_SESSION_ID.to_string()).expect_err("must fail open");
match err {
LogError::Io(io) => {
let msg = io.to_string();
assert!(
msg.contains("corrupted log at line 1"),
"expected synthetic corruption message, got: {}",
msg
);
}
other => panic!("expected LogError::Io, got {:?}", other),
}
}
#[test]
fn rejects_non_regular_file() {
let dir = TempDir::new().expect("tmp");
let err = ProvenanceLog::open(tmp_dir_utf8(&dir), TEST_SESSION_ID.to_string())
.expect_err("must fail");
match err {
LogError::NotARegularFile(_) => {}
other => panic!("expected NotARegularFile, got {:?}", other),
}
}
#[test]
fn canonical_json_excludes_this_hash_field() {
let rfh = RowForHash {
ts: Utc::now(),
ts_seq: 1,
event: LogEvent::Fetch,
ref_: None,
source: None,
result: LogResult::Ok,
license: None,
size_bytes: None,
store_path: None,
capability: Capability::Oa,
session_id: TEST_SESSION_ID,
error_code: None,
schema_version: LOG_SCHEMA_VERSION,
canonical_digest: None,
prev_hash: GENESIS_HASH,
};
let bytes = canonical_json_for_hash(&rfh).expect("canonicalize");
let s = std::str::from_utf8(&bytes).expect("utf8");
assert!(!s.contains("this_hash"), "this_hash leaked into hash input");
assert!(s.contains("\"prev_hash\":"));
}
#[test]
fn canonical_json_keys_are_lexicographically_sorted() {
let rfh = RowForHash {
ts: Utc::now(),
ts_seq: 1,
event: LogEvent::Fetch,
ref_: Some("10.1234/example"),
source: Some("unpaywall"),
result: LogResult::Ok,
license: Some("CC-BY-4.0"),
size_bytes: Some(1234),
store_path: Some("papers/x.pdf"),
capability: Capability::Oa,
session_id: TEST_SESSION_ID,
error_code: None,
schema_version: LOG_SCHEMA_VERSION,
canonical_digest: Some(
"0000000000000000000000000000000000000000000000000000000000000000",
),
prev_hash: GENESIS_HASH,
};
let bytes = canonical_json_for_hash(&rfh).expect("canonicalize");
let s = std::str::from_utf8(&bytes).expect("utf8");
assert!(
s.starts_with("{\"canonical_digest\":"),
"canonical bytes must start with lex-first v2 key, got: {}",
s
);
let event_idx = s.find("\"event\":").expect("event key present");
let prev_idx = s.find("\"prev_hash\":").expect("prev_hash key present");
assert!(event_idx < prev_idx, "event must precede prev_hash");
let ts_idx = s.find("\"ts\":").expect("ts key present");
let tsseq_idx = s.find("\"ts_seq\":").expect("ts_seq key present");
assert!(ts_idx < tsseq_idx, "ts must precede ts_seq");
}
fn tamper_string_field(
path: &Utf8Path,
line_no_1based: usize,
field_key: &str,
new_value: &str,
) {
let raw = fs::read_to_string(path).expect("read log");
let mut lines: Vec<String> = raw.lines().map(str::to_string).collect();
let target = &lines[line_no_1based - 1];
let needle = format!("\"{field_key}\":\"");
let start = target
.find(&needle)
.unwrap_or_else(|| panic!("field {field_key} not found on line {line_no_1based}"))
+ needle.len();
let end_rel = target[start..]
.find('"')
.unwrap_or_else(|| panic!("unterminated string for field {field_key}"));
let end = start + end_rel;
let mut new_line = String::with_capacity(target.len());
new_line.push_str(&target[..start]);
new_line.push_str(new_value);
new_line.push_str(&target[end..]);
lines[line_no_1based - 1] = new_line;
let mut out = lines.join("\n");
out.push('\n');
fs::write(path, out).expect("write tampered log");
}
#[test]
fn verify_empty_log_is_ok() {
let dir = TempDir::new().expect("tmp");
let path = tmp_dir_utf8(&dir).join("nonexistent.jsonl");
assert!(!path.exists(), "precondition: file must not exist");
let report = verify(&path).expect("verify must not error on missing file");
assert_eq!(report.total_rows, 0);
assert_eq!(report.ok_rows, 0);
assert!(report.errors.is_empty(), "errors: {:?}", report.errors);
}
#[test]
fn verify_well_formed_chain_passes() {
let dir = TempDir::new().expect("tmp");
let path = tmp_dir_utf8(&dir).join("log.jsonl");
let log = open_log(&path);
for _ in 0..3 {
log.append(empty_input()).expect("append");
}
let report = verify(&path).expect("verify must succeed");
assert_eq!(report.total_rows, 3);
assert_eq!(report.ok_rows, 3);
assert!(
report.errors.is_empty(),
"expected no issues on a well-formed log; got: {:?}",
report.errors
);
}
#[test]
fn verify_detects_tampered_row_hash() {
let dir = TempDir::new().expect("tmp");
let path = tmp_dir_utf8(&dir).join("log.jsonl");
let log = open_log(&path);
log.append(empty_input()).expect("append 1");
log.append(empty_input()).expect("append 2");
drop(log);
tamper_string_field(
&path,
2,
"this_hash",
"0000000000000000000000000000000000000000000000000000000000000000",
);
let report = verify(&path).expect("verify must succeed");
assert_eq!(report.total_rows, 2);
let hash_issues: Vec<_> = report
.errors
.iter()
.filter(|e| e.kind == VerifyIssueKind::ThisHashMismatch)
.collect();
assert_eq!(
hash_issues.len(),
1,
"expected exactly one ThisHashMismatch, got {:?}",
report.errors
);
assert_eq!(hash_issues[0].line, 2);
}
#[test]
fn verify_detects_tampered_prev_hash() {
let dir = TempDir::new().expect("tmp");
let path = tmp_dir_utf8(&dir).join("log.jsonl");
let log = open_log(&path);
log.append(empty_input()).expect("append 1");
log.append(empty_input()).expect("append 2");
drop(log);
tamper_string_field(
&path,
2,
"prev_hash",
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
);
let report = verify(&path).expect("verify must succeed");
assert_eq!(report.total_rows, 2);
let prev_issues: Vec<_> = report
.errors
.iter()
.filter(|e| e.kind == VerifyIssueKind::PrevHashMismatch)
.collect();
assert_eq!(
prev_issues.len(),
1,
"expected exactly one PrevHashMismatch, got {:?}",
report.errors
);
assert_eq!(prev_issues[0].line, 2);
}
#[test]
fn verify_detects_corrupted_json() {
let dir = TempDir::new().expect("tmp");
let path = tmp_dir_utf8(&dir).join("log.jsonl");
let log = open_log(&path);
log.append(empty_input()).expect("append 1");
drop(log);
let mut existing = fs::read_to_string(&path).expect("read");
if !existing.ends_with('\n') {
existing.push('\n');
}
existing.push_str("{\"garbage\":true}\n");
fs::write(&path, existing).expect("write");
let report = verify(&path).expect("verify must succeed");
assert_eq!(report.total_rows, 2);
let parse_issues: Vec<_> = report
.errors
.iter()
.filter(|e| e.kind == VerifyIssueKind::ParseError)
.collect();
assert_eq!(
parse_issues.len(),
1,
"expected exactly one ParseError, got {:?}",
report.errors
);
assert_eq!(parse_issues[0].line, 2);
}
#[test]
fn capability_serializes_kebab_case() {
let cases = [
(Capability::Oa, "\"oa\""),
(Capability::Metadata, "\"metadata\""),
(Capability::TdmElsevier, "\"tdm-elsevier\""),
(Capability::TdmAps, "\"tdm-aps\""),
(Capability::TdmSpringer, "\"tdm-springer\""),
];
for (cap, expected) in cases {
let got = serde_json::to_string(&cap).expect("serialize");
assert_eq!(
got, expected,
"capability wire format mismatch for {:?}",
cap
);
}
}
fn gunzip_to_string(gz: &Utf8Path) -> String {
use std::io::Read;
let f = std::fs::File::open(gz.as_std_path()).expect("open gz");
let mut dec = GzDecoder::new(f);
let mut s = String::new();
dec.read_to_string(&mut s).expect("gunzip");
s
}
#[test]
fn rotation_archives_to_gz_and_restarts_genesis_chain() {
let dir = TempDir::new().expect("tmp");
let path = tmp_dir_utf8(&dir).join("access.log");
let log = ProvenanceLog::open_with_rotate_threshold(&path, TEST_SESSION_ID.to_string(), 50)
.expect("open");
log.append(empty_input()).expect("append 1");
let row1 = read_rows(&path);
assert_eq!(row1.len(), 1);
assert_eq!(row1[0].prev_hash, GENESIS_HASH);
log.append(empty_input()).expect("append 2 (rotates first)");
let segs = rotated_segments(&path);
assert_eq!(segs.len(), 1, "one .gz segment expected; got {segs:?}");
let archived: Vec<LogRow> = gunzip_to_string(&segs[0])
.lines()
.filter(|l| !l.is_empty())
.map(|l| serde_json::from_str(l).expect("row"))
.collect();
assert_eq!(archived.len(), 1);
assert_eq!(archived[0].this_hash, row1[0].this_hash);
let cur = read_rows(&path);
assert_eq!(cur.len(), 1, "fresh segment holds only the post-rotate row");
assert_eq!(cur[0].prev_hash, GENESIS_HASH);
assert_eq!(cur[0].ts_seq, 1);
let reports = verify_all(&path).expect("verify_all");
assert_eq!(reports.len(), 2, "rotated .gz + current");
for (p, r) in &reports {
assert!(r.errors.is_empty(), "segment {p} must verify clean: {r:?}");
}
}
#[test]
fn rotate_log_is_fail_closed_on_missing_source() {
let dir = TempDir::new().expect("tmp");
let missing = tmp_dir_utf8(&dir).join("nope.log");
let err = rotate_log(&missing).expect_err("missing source must error");
assert!(matches!(err, LogError::Io(_)), "got {err:?}");
}
#[test]
#[serial_test::serial]
fn prune_respects_retention_window_and_disable() {
let dir = TempDir::new().expect("tmp");
let base = tmp_dir_utf8(&dir);
let path = base.join("access.log");
let old_gz = base.join("access.log.2020-01-01-000000.gz");
let new_gz = base.join("access.log.2999-01-01-000000.gz");
let mk = |p: &Utf8Path, aged: bool| {
let f = std::fs::File::create(p.as_std_path()).expect("create gz");
if aged {
let when =
std::time::SystemTime::now() - std::time::Duration::from_secs(100 * 86_400);
f.set_modified(when).expect("set mtime");
}
};
mk(&old_gz, true);
std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "0");
let _ = open_log(&path);
assert!(old_gz.exists(), "days=0 must NOT prune");
mk(&new_gz, false);
std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "1");
let _ = open_log(&path);
assert!(!old_gz.exists(), "aged segment must be pruned at days=1");
assert!(new_gz.exists(), "fresh segment must survive");
std::env::remove_var("DOIGET_LOG_RETENTION_DAYS");
}
#[test]
#[serial_test::serial]
fn retention_days_env_parsing() {
std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "0");
assert_eq!(retention_days(), 0);
std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "30");
assert_eq!(retention_days(), 30);
std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "garbage");
assert_eq!(retention_days(), DEFAULT_RETENTION_DAYS);
std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "-5");
assert_eq!(retention_days(), DEFAULT_RETENTION_DAYS);
std::env::remove_var("DOIGET_LOG_RETENTION_DAYS");
assert_eq!(retention_days(), DEFAULT_RETENTION_DAYS);
}
#[test]
fn verify_all_flags_tampered_segment_independently() {
let dir = TempDir::new().expect("tmp");
let path = tmp_dir_utf8(&dir).join("access.log");
let log = ProvenanceLog::open_with_rotate_threshold(&path, TEST_SESSION_ID.to_string(), 50)
.expect("open");
log.append(empty_input()).expect("append 1");
log.append(empty_input()).expect("append 2 (rotates)");
drop(log);
let mut cur = read_rows(&path);
let mut bad = cur.remove(0);
bad.this_hash =
"0000000000000000000000000000000000000000000000000000000000000000".to_string();
std::fs::write(
path.as_std_path(),
format!("{}\n", serde_json::to_string(&bad).expect("ser")),
)
.expect("rewrite tampered current");
let reports = verify_all(&path).expect("verify_all");
assert_eq!(reports.len(), 2);
let (gz_path, gz_rep) = &reports[0];
let (cur_path, cur_rep) = &reports[1];
assert!(
gz_path.as_str().ends_with(".gz") && gz_rep.errors.is_empty(),
"rotated segment must stay clean: {gz_path} {gz_rep:?}"
);
assert!(
cur_path.file_name() == Some("access.log") && !cur_rep.errors.is_empty(),
"tampered current segment must report issues: {cur_path} {cur_rep:?}"
);
}
}