Skip to main content

atomr_persistence_sql/
worm.rs

1//! FR-9 — Write-Once-Read-Many (WORM) storage with a tamper-evident hash
2//! chain.
3//!
4//! Two independent guarantees, each opt-in via [`WormConfig`]:
5//!
6//! * **`deny_update_delete`** installs dialect DDL that physically rejects
7//!   `UPDATE`/`DELETE` on `event_journal` (SQLite: `BEFORE UPDATE/DELETE`
8//!   triggers raising `RAISE(ABORT)`). Once on, the only legal mutation is
9//!   `INSERT` — the journal is append-only.
10//!
11//! * **`hash_chain`** maintains a per-`persistence_id` Merkle-style chain:
12//!   each row stores `prev_hash` (the prior row's `row_hash`) and
13//!   `row_hash = SHA-256(prev_hash || persistence_id || seq_le || payload ||
14//!   created_at_le)`. Any retroactive edit to a payload breaks every
15//!   downstream link, which [`IntegrityVerify::verify_chain`] detects and
16//!   localizes to the first bad sequence number.
17//!
18//! The columns ride in the additive `002_worm.sql` migration; no change to
19//! [`atomr_persistence::PersistentRepr`].
20
21use async_trait::async_trait;
22use sha2::{Digest, Sha256};
23use thiserror::Error;
24
25use crate::journal::SqlJournal;
26
27/// WORM behavior toggles, stored on a [`SqlJournal`] via
28/// [`SqlJournal::with_worm`].
29#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
30pub struct WormConfig {
31    /// Maintain the per-pid tamper-evident SHA-256 hash chain on write.
32    pub hash_chain: bool,
33    /// Install DDL that rejects UPDATE/DELETE on `event_journal`.
34    pub deny_update_delete: bool,
35}
36
37impl WormConfig {
38    /// Both protections on — the strongest WORM posture.
39    pub fn enforced() -> Self {
40        Self { hash_chain: true, deny_update_delete: true }
41    }
42}
43
44/// Errors surfaced while verifying a hash chain.
45#[derive(Debug, Error)]
46pub enum IntegrityError {
47    #[error("backend error: {0}")]
48    Backend(String),
49}
50
51impl IntegrityError {
52    pub fn backend(e: impl std::fmt::Display) -> Self {
53        Self::Backend(e.to_string())
54    }
55}
56
57/// Result of verifying a persistence id's hash chain.
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum ChainProof {
60    /// Every stored `row_hash` matches a fresh recomputation.
61    Intact { rows: u64 },
62    /// The chain diverges at `first_bad_sequence_nr` (the earliest row whose
63    /// recomputed hash or `prev_hash` link does not match what is stored).
64    Tampered { first_bad_sequence_nr: u64 },
65}
66
67/// Compute the canonical row hash for a journal row.
68///
69/// `row_hash = SHA-256(prev_hash || persistence_id || seq_le || payload || created_at_le)`
70/// where `prev_hash` is the previous row's `row_hash` for the same pid (empty
71/// for the first row in the chain).
72pub(crate) fn compute_row_hash(
73    prev_hash: &[u8],
74    persistence_id: &str,
75    sequence_nr: u64,
76    payload: &[u8],
77    created_at: i64,
78) -> Vec<u8> {
79    let mut h = Sha256::new();
80    h.update(prev_hash);
81    h.update(persistence_id.as_bytes());
82    h.update(sequence_nr.to_le_bytes());
83    h.update(payload);
84    h.update(created_at.to_le_bytes());
85    h.finalize().to_vec()
86}
87
88/// Recompute and validate a persistence id's hash chain.
89#[async_trait]
90pub trait IntegrityVerify {
91    async fn verify_chain(&self, pid: &str) -> Result<ChainProof, IntegrityError>;
92}
93
94#[async_trait]
95impl IntegrityVerify for SqlJournal {
96    async fn verify_chain(&self, pid: &str) -> Result<ChainProof, IntegrityError> {
97        // Pull rows in sequence order with the stored chain columns.
98        let rows: Vec<(i64, Vec<u8>, i64, Option<Vec<u8>>, Option<Vec<u8>>)> = sqlx::query_as(
99            "SELECT sequence_nr, payload, created_at, prev_hash, row_hash \
100             FROM event_journal WHERE persistence_id = ? ORDER BY sequence_nr ASC",
101        )
102        .bind(pid)
103        .fetch_all(self.pool())
104        .await
105        .map_err(IntegrityError::backend)?;
106
107        let mut expected_prev: Vec<u8> = Vec::new();
108        let mut count = 0u64;
109        for (seq, payload, created_at, stored_prev, stored_row) in rows {
110            count += 1;
111            let stored_prev = stored_prev.unwrap_or_default();
112            // The prev_hash recorded on this row must equal the running hash.
113            if stored_prev != expected_prev {
114                return Ok(ChainProof::Tampered { first_bad_sequence_nr: seq as u64 });
115            }
116            let recomputed = compute_row_hash(&expected_prev, pid, seq as u64, &payload, created_at);
117            match stored_row {
118                Some(stored) if stored == recomputed => {
119                    expected_prev = recomputed;
120                }
121                _ => {
122                    return Ok(ChainProof::Tampered { first_bad_sequence_nr: seq as u64 });
123                }
124            }
125        }
126        Ok(ChainProof::Intact { rows: count })
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133
134    #[test]
135    fn row_hash_is_stable_and_chains() {
136        let h1 = compute_row_hash(&[], "p", 1, b"a", 100);
137        let h1b = compute_row_hash(&[], "p", 1, b"a", 100);
138        assert_eq!(h1, h1b, "deterministic");
139        let h2 = compute_row_hash(&h1, "p", 2, b"b", 200);
140        // Changing any input changes the hash.
141        assert_ne!(h1, h2);
142        let h2_tampered = compute_row_hash(&h1, "p", 2, b"B", 200);
143        assert_ne!(h2, h2_tampered);
144    }
145}