allsource_core/infrastructure/persistence/
storage_integrity.rs

1use crate::error::{AllSourceError, Result};
2use sha2::{Digest, Sha256};
3use std::path::Path;
4
5/// Storage integrity checker (SierraDB Pattern)
6///
7/// Prevents silent data corruption with checksums.
8/// Based on production lessons from SierraDB event store.
9///
10/// # SierraDB Pattern
11/// - Checksums detect corruption in storage
12/// - Critical for long-running production systems
13/// - Verifies WAL segments and Parquet files
14///
15/// # Design
16/// - SHA-256 for cryptographic strength
17/// - Per-segment checksums for WAL
18/// - Per-file checksums for Parquet
19/// - Incremental verification (not full scan)
20pub struct StorageIntegrity;
21
22impl StorageIntegrity {
23    /// Compute SHA-256 checksum for data
24    ///
25    /// Returns hex-encoded checksum string.
26    ///
27    /// # Example
28    /// ```
29    /// use allsource_core::infrastructure::persistence::StorageIntegrity;
30    ///
31    /// let data = b"hello world";
32    /// let checksum = StorageIntegrity::compute_checksum(data);
33    /// assert_eq!(checksum.len(), 64); // SHA-256 is 32 bytes = 64 hex chars
34    /// ```
35    pub fn compute_checksum(data: &[u8]) -> String {
36        let mut hasher = Sha256::new();
37        hasher.update(data);
38        format!("{:x}", hasher.finalize())
39    }
40
41    /// Verify data against expected checksum
42    ///
43    /// Returns true if checksums match, false otherwise.
44    ///
45    /// # Example
46    /// ```
47    /// use allsource_core::infrastructure::persistence::StorageIntegrity;
48    ///
49    /// let data = b"hello world";
50    /// let checksum = StorageIntegrity::compute_checksum(data);
51    /// assert!(StorageIntegrity::verify_checksum(data, &checksum).unwrap());
52    /// ```
53    pub fn verify_checksum(data: &[u8], expected: &str) -> Result<bool> {
54        let computed = Self::compute_checksum(data);
55        Ok(computed == expected)
56    }
57
58    /// Verify data and return error if mismatch
59    ///
60    /// More convenient than verify_checksum for error handling.
61    pub fn verify_or_error(data: &[u8], expected: &str) -> Result<()> {
62        if !Self::verify_checksum(data, expected)? {
63            return Err(AllSourceError::StorageError(format!(
64                "Checksum mismatch: expected {}, computed {}",
65                expected,
66                Self::compute_checksum(data)
67            )));
68        }
69        Ok(())
70    }
71
72    /// Compute checksum with metadata
73    ///
74    /// Includes data length and optional label in checksum.
75    /// Prevents length extension attacks and provides context.
76    pub fn compute_checksum_with_metadata(data: &[u8], label: Option<&str>) -> String {
77        let mut hasher = Sha256::new();
78
79        // Include length to prevent length extension
80        hasher.update(&(data.len() as u64).to_le_bytes());
81
82        // Include label if provided
83        if let Some(l) = label {
84            hasher.update(l.as_bytes());
85        }
86
87        // Include actual data
88        hasher.update(data);
89
90        format!("{:x}", hasher.finalize())
91    }
92
93    /// Verify WAL segment integrity
94    ///
95    /// WAL segments are critical for durability.
96    /// Any corruption means potential data loss.
97    ///
98    /// # Returns
99    /// - Ok(true) if segment is valid
100    /// - Ok(false) if segment doesn't exist
101    /// - Err if corruption detected
102    pub fn verify_wal_segment(segment_path: &Path) -> Result<bool> {
103        if !segment_path.exists() {
104            return Ok(false);
105        }
106
107        // Read segment file
108        let data = std::fs::read(segment_path).map_err(|e| {
109            AllSourceError::StorageError(format!("Failed to read WAL segment: {}", e))
110        })?;
111
112        // WAL format: [checksum: 64 bytes][data: N bytes]
113        if data.len() < 64 {
114            return Err(AllSourceError::StorageError(
115                "WAL segment too short for checksum".to_string(),
116            ));
117        }
118
119        let stored_checksum = String::from_utf8_lossy(&data[0..64]).to_string();
120        let segment_data = &data[64..];
121
122        Self::verify_or_error(segment_data, &stored_checksum)?;
123        Ok(true)
124    }
125
126    /// Verify Parquet file integrity
127    ///
128    /// Parquet files are our long-term storage.
129    /// Corruption here means historical data loss.
130    ///
131    /// # Returns
132    /// - Ok(true) if file is valid
133    /// - Ok(false) if file doesn't exist
134    /// - Err if corruption detected
135    pub fn verify_parquet_file(file_path: &Path) -> Result<bool> {
136        if !file_path.exists() {
137            return Ok(false);
138        }
139
140        // For now, just verify file can be read
141        // TODO: Add Parquet metadata checksum verification
142        let _data = std::fs::read(file_path).map_err(|e| {
143            AllSourceError::StorageError(format!("Failed to read Parquet file: {}", e))
144        })?;
145
146        // Parquet has internal checksums, but we could add external ones
147        Ok(true)
148    }
149
150    /// Batch verify multiple files
151    ///
152    /// Efficiently verify multiple files with progress reporting.
153    pub fn batch_verify<P: AsRef<Path>>(
154        paths: &[P],
155        progress_callback: Option<Box<dyn Fn(usize, usize)>>,
156    ) -> Result<Vec<bool>> {
157        let mut results = Vec::new();
158
159        for (idx, path) in paths.iter().enumerate() {
160            let path = path.as_ref();
161
162            // Determine file type and verify
163            let result = if path.extension().and_then(|s| s.to_str()) == Some("wal") {
164                Self::verify_wal_segment(path)?
165            } else if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
166                Self::verify_parquet_file(path)?
167            } else {
168                false
169            };
170
171            results.push(result);
172
173            // Report progress
174            if let Some(ref callback) = progress_callback {
175                callback(idx + 1, paths.len());
176            }
177        }
178
179        Ok(results)
180    }
181}
182
183/// Integrity check result
184#[derive(Debug, Clone, PartialEq)]
185pub struct IntegrityCheckResult {
186    pub path: String,
187    pub valid: bool,
188    pub checksum: Option<String>,
189    pub error: Option<String>,
190}
191
192impl IntegrityCheckResult {
193    pub fn success(path: String, checksum: String) -> Self {
194        Self {
195            path,
196            valid: true,
197            checksum: Some(checksum),
198            error: None,
199        }
200    }
201
202    pub fn failure(path: String, error: String) -> Self {
203        Self {
204            path,
205            valid: false,
206            checksum: None,
207            error: Some(error),
208        }
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    #[test]
217    fn test_compute_checksum() {
218        let data = b"hello world";
219        let checksum = StorageIntegrity::compute_checksum(data);
220
221        // SHA-256 produces 32 bytes = 64 hex characters
222        assert_eq!(checksum.len(), 64);
223
224        // Checksums should be deterministic
225        let checksum2 = StorageIntegrity::compute_checksum(data);
226        assert_eq!(checksum, checksum2);
227    }
228
229    #[test]
230    fn test_verify_checksum() {
231        let data = b"test data";
232        let checksum = StorageIntegrity::compute_checksum(data);
233
234        assert!(StorageIntegrity::verify_checksum(data, &checksum).unwrap());
235
236        // Wrong checksum should fail
237        assert!(!StorageIntegrity::verify_checksum(data, "wrong").unwrap());
238    }
239
240    #[test]
241    fn test_verify_or_error() {
242        let data = b"test data";
243        let checksum = StorageIntegrity::compute_checksum(data);
244
245        // Valid checksum should succeed
246        assert!(StorageIntegrity::verify_or_error(data, &checksum).is_ok());
247
248        // Invalid checksum should error
249        let result = StorageIntegrity::verify_or_error(data, "wrong");
250        assert!(result.is_err());
251        assert!(matches!(result, Err(AllSourceError::StorageError(_))));
252    }
253
254    #[test]
255    fn test_checksum_with_metadata() {
256        let data = b"test";
257
258        let checksum1 = StorageIntegrity::compute_checksum_with_metadata(data, Some("label1"));
259        let checksum2 = StorageIntegrity::compute_checksum_with_metadata(data, Some("label2"));
260
261        // Different labels produce different checksums
262        assert_ne!(checksum1, checksum2);
263
264        // Same label produces same checksum
265        let checksum3 = StorageIntegrity::compute_checksum_with_metadata(data, Some("label1"));
266        assert_eq!(checksum1, checksum3);
267    }
268
269    #[test]
270    fn test_different_data_different_checksums() {
271        let data1 = b"hello";
272        let data2 = b"world";
273
274        let checksum1 = StorageIntegrity::compute_checksum(data1);
275        let checksum2 = StorageIntegrity::compute_checksum(data2);
276
277        assert_ne!(checksum1, checksum2);
278    }
279
280    #[test]
281    fn test_empty_data() {
282        let data = b"";
283        let checksum = StorageIntegrity::compute_checksum(data);
284
285        // Should still produce valid checksum
286        assert_eq!(checksum.len(), 64);
287        assert!(StorageIntegrity::verify_checksum(data, &checksum).unwrap());
288    }
289
290    #[test]
291    fn test_large_data() {
292        let data = vec![0u8; 1_000_000]; // 1MB
293        let checksum = StorageIntegrity::compute_checksum(&data);
294
295        assert_eq!(checksum.len(), 64);
296        assert!(StorageIntegrity::verify_checksum(&data, &checksum).unwrap());
297    }
298
299    #[test]
300    fn test_integrity_check_result() {
301        let success = IntegrityCheckResult::success("test.wal".to_string(), "abc123".to_string());
302        assert!(success.valid);
303        assert_eq!(success.checksum, Some("abc123".to_string()));
304        assert_eq!(success.error, None);
305
306        let failure = IntegrityCheckResult::failure(
307            "test.wal".to_string(),
308            "corruption detected".to_string(),
309        );
310        assert!(!failure.valid);
311        assert_eq!(failure.checksum, None);
312        assert_eq!(failure.error, Some("corruption detected".to_string()));
313    }
314}