atomr_persistence_sql/
worm.rs1use async_trait::async_trait;
22use sha2::{Digest, Sha256};
23use thiserror::Error;
24
25use crate::journal::SqlJournal;
26
27#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
30pub struct WormConfig {
31 pub hash_chain: bool,
33 pub deny_update_delete: bool,
35}
36
37impl WormConfig {
38 pub fn enforced() -> Self {
40 Self { hash_chain: true, deny_update_delete: true }
41 }
42}
43
44#[derive(Debug, Error)]
46pub enum IntegrityError {
47 #[error("backend error: {0}")]
48 Backend(String),
49}
50
51impl IntegrityError {
52 pub fn backend(e: impl std::fmt::Display) -> Self {
53 Self::Backend(e.to_string())
54 }
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum ChainProof {
60 Intact { rows: u64 },
62 Tampered { first_bad_sequence_nr: u64 },
65}
66
67pub(crate) fn compute_row_hash(
73 prev_hash: &[u8],
74 persistence_id: &str,
75 sequence_nr: u64,
76 payload: &[u8],
77 created_at: i64,
78) -> Vec<u8> {
79 let mut h = Sha256::new();
80 h.update(prev_hash);
81 h.update(persistence_id.as_bytes());
82 h.update(sequence_nr.to_le_bytes());
83 h.update(payload);
84 h.update(created_at.to_le_bytes());
85 h.finalize().to_vec()
86}
87
88#[async_trait]
90pub trait IntegrityVerify {
91 async fn verify_chain(&self, pid: &str) -> Result<ChainProof, IntegrityError>;
92}
93
94#[async_trait]
95impl IntegrityVerify for SqlJournal {
96 async fn verify_chain(&self, pid: &str) -> Result<ChainProof, IntegrityError> {
97 let rows: Vec<(i64, Vec<u8>, i64, Option<Vec<u8>>, Option<Vec<u8>>)> = sqlx::query_as(
99 "SELECT sequence_nr, payload, created_at, prev_hash, row_hash \
100 FROM event_journal WHERE persistence_id = ? ORDER BY sequence_nr ASC",
101 )
102 .bind(pid)
103 .fetch_all(self.pool())
104 .await
105 .map_err(IntegrityError::backend)?;
106
107 let mut expected_prev: Vec<u8> = Vec::new();
108 let mut count = 0u64;
109 for (seq, payload, created_at, stored_prev, stored_row) in rows {
110 count += 1;
111 let stored_prev = stored_prev.unwrap_or_default();
112 if stored_prev != expected_prev {
114 return Ok(ChainProof::Tampered { first_bad_sequence_nr: seq as u64 });
115 }
116 let recomputed = compute_row_hash(&expected_prev, pid, seq as u64, &payload, created_at);
117 match stored_row {
118 Some(stored) if stored == recomputed => {
119 expected_prev = recomputed;
120 }
121 _ => {
122 return Ok(ChainProof::Tampered { first_bad_sequence_nr: seq as u64 });
123 }
124 }
125 }
126 Ok(ChainProof::Intact { rows: count })
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133
134 #[test]
135 fn row_hash_is_stable_and_chains() {
136 let h1 = compute_row_hash(&[], "p", 1, b"a", 100);
137 let h1b = compute_row_hash(&[], "p", 1, b"a", 100);
138 assert_eq!(h1, h1b, "deterministic");
139 let h2 = compute_row_hash(&h1, "p", 2, b"b", 200);
140 assert_ne!(h1, h2);
142 let h2_tampered = compute_row_hash(&h1, "p", 2, b"B", 200);
143 assert_ne!(h2, h2_tampered);
144 }
145}