Skip to main content

allsource_core/infrastructure/persistence/
storage_integrity.rs

1use crate::error::{AllSourceError, Result};
2use parquet::file::reader::{FileReader, SerializedFileReader};
3use sha2::{Digest, Sha256};
4use std::path::Path;
5
6/// Storage integrity checker (SierraDB Pattern)
7///
8/// Prevents silent data corruption with checksums.
9/// Based on production lessons from SierraDB event store.
10///
11/// # SierraDB Pattern
12/// - Checksums detect corruption in storage
13/// - Critical for long-running production systems
14/// - Verifies WAL segments and Parquet files
15///
16/// # Design
17/// - SHA-256 for cryptographic strength
18/// - Per-segment checksums for WAL
19/// - Per-file checksums for Parquet
20/// - Incremental verification (not full scan)
21pub struct StorageIntegrity;
22
23impl StorageIntegrity {
24    /// Compute SHA-256 checksum for data
25    ///
26    /// Returns hex-encoded checksum string.
27    ///
28    /// # Example
29    /// ```
30    /// use allsource_core::infrastructure::persistence::StorageIntegrity;
31    ///
32    /// let data = b"hello world";
33    /// let checksum = StorageIntegrity::compute_checksum(data);
34    /// assert_eq!(checksum.len(), 64); // SHA-256 is 32 bytes = 64 hex chars
35    /// ```
36    pub fn compute_checksum(data: &[u8]) -> String {
37        let mut hasher = Sha256::new();
38        hasher.update(data);
39        format!("{:x}", hasher.finalize())
40    }
41
42    /// Verify data against expected checksum
43    ///
44    /// Returns true if checksums match, false otherwise.
45    ///
46    /// # Example
47    /// ```
48    /// use allsource_core::infrastructure::persistence::StorageIntegrity;
49    ///
50    /// let data = b"hello world";
51    /// let checksum = StorageIntegrity::compute_checksum(data);
52    /// assert!(StorageIntegrity::verify_checksum(data, &checksum).unwrap());
53    /// ```
54    pub fn verify_checksum(data: &[u8], expected: &str) -> Result<bool> {
55        let computed = Self::compute_checksum(data);
56        Ok(computed == expected)
57    }
58
59    /// Verify data and return error if mismatch
60    ///
61    /// More convenient than verify_checksum for error handling.
62    pub fn verify_or_error(data: &[u8], expected: &str) -> Result<()> {
63        if !Self::verify_checksum(data, expected)? {
64            return Err(AllSourceError::StorageError(format!(
65                "Checksum mismatch: expected {}, computed {}",
66                expected,
67                Self::compute_checksum(data)
68            )));
69        }
70        Ok(())
71    }
72
73    /// Compute checksum with metadata
74    ///
75    /// Includes data length and optional label in checksum.
76    /// Prevents length extension attacks and provides context.
77    pub fn compute_checksum_with_metadata(data: &[u8], label: Option<&str>) -> String {
78        let mut hasher = Sha256::new();
79
80        // Include length to prevent length extension
81        hasher.update((data.len() as u64).to_le_bytes());
82
83        // Include label if provided
84        if let Some(l) = label {
85            hasher.update(l.as_bytes());
86        }
87
88        // Include actual data
89        hasher.update(data);
90
91        format!("{:x}", hasher.finalize())
92    }
93
94    /// Verify WAL segment integrity
95    ///
96    /// WAL segments are critical for durability.
97    /// Any corruption means potential data loss.
98    ///
99    /// # Returns
100    /// - Ok(true) if segment is valid
101    /// - Ok(false) if segment doesn't exist
102    /// - Err if corruption detected
103    pub fn verify_wal_segment(segment_path: &Path) -> Result<bool> {
104        if !segment_path.exists() {
105            return Ok(false);
106        }
107
108        // Read segment file
109        let data = std::fs::read(segment_path).map_err(|e| {
110            AllSourceError::StorageError(format!("Failed to read WAL segment: {e}"))
111        })?;
112
113        // WAL format: [checksum: 64 bytes][data: N bytes]
114        if data.len() < 64 {
115            return Err(AllSourceError::StorageError(
116                "WAL segment too short for checksum".to_string(),
117            ));
118        }
119
120        let stored_checksum = String::from_utf8_lossy(&data[0..64]).to_string();
121        let segment_data = &data[64..];
122
123        Self::verify_or_error(segment_data, &stored_checksum)?;
124        Ok(true)
125    }
126
127    /// Verify Parquet file integrity
128    ///
129    /// Opens the Parquet file and validates:
130    /// - Magic bytes (PAR1 header/footer)
131    /// - Footer metadata (schema, row groups, column chunks)
132    /// - Page-level CRC checksums when present in column chunks
133    ///
134    /// # Safety / trust boundary
135    /// `file_path` must come from a trusted source — this function is only
136    /// called internally with paths that the storage layer discovered by
137    /// enumerating files under the configured `data_dir`. It does NOT accept
138    /// user-controlled input directly; callers that want to verify a
139    /// user-supplied filename MUST canonicalize it and confirm the resolved
140    /// path is still contained within `data_dir` before calling this.
141    ///
142    /// # Returns
143    /// - Ok(true) if file is valid
144    /// - Ok(false) if file doesn't exist
145    /// - Err if corruption detected
146    pub fn verify_parquet_file(file_path: &Path) -> Result<bool> {
147        if !file_path.exists() {
148            return Ok(false);
149        }
150
151        let file = std::fs::File::open(file_path).map_err(|e| {
152            AllSourceError::StorageError(format!("Failed to open Parquet file: {e}"))
153        })?;
154
155        // SerializedFileReader validates magic bytes and parses the footer/metadata.
156        // If the file is truncated or the footer is corrupt, this returns an error.
157        let reader = SerializedFileReader::new(file).map_err(|e| {
158            AllSourceError::StorageError(format!(
159                "Parquet metadata verification failed for {}: {e}",
160                file_path.display()
161            ))
162        })?;
163
164        let metadata = reader.metadata();
165        let file_metadata = metadata.file_metadata();
166
167        // Verify each row group's column chunk metadata is readable
168        for rg_idx in 0..metadata.num_row_groups() {
169            let row_group = metadata.row_group(rg_idx);
170            for col_idx in 0..row_group.num_columns() {
171                let col = row_group.column(col_idx);
172                // Access column metadata to ensure it's not corrupt
173                let _compression = col.compression();
174                let _num_values = col.num_values();
175                // Verify byte range is sane
176                let (start, len) = col.byte_range();
177                if len == 0 && col.num_values() > 0 {
178                    return Err(AllSourceError::StorageError(format!(
179                        "Parquet column chunk {col_idx} in row group {rg_idx} has zero bytes but {} values in {}",
180                        col.num_values(),
181                        file_path.display()
182                    )));
183                }
184                let _ = start; // used for the sane check above
185            }
186        }
187
188        // Verify we can read the schema
189        let _schema = file_metadata.schema_descr();
190        let _num_rows = file_metadata.num_rows();
191
192        Ok(true)
193    }
194
195    /// Batch verify multiple files
196    ///
197    /// Efficiently verify multiple files with progress reporting.
198    pub fn batch_verify<P: AsRef<Path>>(
199        paths: &[P],
200        progress_callback: Option<&dyn Fn(usize, usize)>,
201    ) -> Result<Vec<bool>> {
202        let mut results = Vec::new();
203
204        for (idx, path) in paths.iter().enumerate() {
205            let path = path.as_ref();
206
207            // Determine file type and verify
208            let result = if path.extension().and_then(|s| s.to_str()) == Some("wal") {
209                Self::verify_wal_segment(path)?
210            } else if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
211                Self::verify_parquet_file(path)?
212            } else {
213                false
214            };
215
216            results.push(result);
217
218            // Report progress
219            if let Some(callback) = progress_callback {
220                callback(idx + 1, paths.len());
221            }
222        }
223
224        Ok(results)
225    }
226}
227
228/// Integrity check result
229#[derive(Debug, Clone, PartialEq)]
230pub struct IntegrityCheckResult {
231    pub path: String,
232    pub valid: bool,
233    pub checksum: Option<String>,
234    pub error: Option<String>,
235}
236
237impl IntegrityCheckResult {
238    pub fn success(path: String, checksum: String) -> Self {
239        Self {
240            path,
241            valid: true,
242            checksum: Some(checksum),
243            error: None,
244        }
245    }
246
247    pub fn failure(path: String, error: String) -> Self {
248        Self {
249            path,
250            valid: false,
251            checksum: None,
252            error: Some(error),
253        }
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use arrow::{
261        array::{Int32Array, StringArray},
262        datatypes::{DataType, Field, Schema},
263        record_batch::RecordBatch,
264    };
265    use parquet::arrow::ArrowWriter;
266    use std::sync::Arc;
267    use tempfile::NamedTempFile;
268
269    /// Helper to create a valid Parquet file for testing
270    fn create_test_parquet_file() -> NamedTempFile {
271        let schema = Arc::new(Schema::new(vec![
272            Field::new("id", DataType::Int32, false),
273            Field::new("name", DataType::Utf8, false),
274        ]));
275
276        let ids = Int32Array::from(vec![1, 2, 3]);
277        let names = StringArray::from(vec!["alpha", "beta", "gamma"]);
278        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(names)])
279            .expect("valid batch");
280
281        let tmp = NamedTempFile::new().expect("create temp file");
282        let mut writer =
283            ArrowWriter::try_new(tmp.reopen().expect("reopen"), schema, None).expect("writer");
284        writer.write(&batch).expect("write batch");
285        writer.close().expect("close writer");
286
287        tmp
288    }
289
290    #[test]
291    fn test_verify_parquet_file_valid() {
292        let tmp = create_test_parquet_file();
293        let result = StorageIntegrity::verify_parquet_file(tmp.path());
294        assert!(result.is_ok());
295        assert!(result.unwrap());
296    }
297
298    #[test]
299    fn test_verify_parquet_file_nonexistent() {
300        let result = StorageIntegrity::verify_parquet_file(Path::new("/nonexistent/file.parquet"));
301        assert!(result.is_ok());
302        assert!(!result.unwrap());
303    }
304
305    #[test]
306    fn test_verify_parquet_file_corrupt() {
307        let tmp = NamedTempFile::new().expect("create temp file");
308        std::fs::write(tmp.path(), b"this is not a parquet file").expect("write corrupt data");
309
310        let result = StorageIntegrity::verify_parquet_file(tmp.path());
311        assert!(result.is_err());
312        let err = result.unwrap_err();
313        assert!(matches!(err, AllSourceError::StorageError(_)));
314    }
315
316    #[test]
317    fn test_verify_parquet_file_truncated() {
318        // Write valid magic bytes but truncate the rest
319        let tmp = NamedTempFile::new().expect("create temp file");
320        std::fs::write(tmp.path(), b"PAR1").expect("write truncated data");
321
322        let result = StorageIntegrity::verify_parquet_file(tmp.path());
323        assert!(result.is_err());
324    }
325
326    #[test]
327    fn test_batch_verify_with_parquet() {
328        let tmp = create_test_parquet_file();
329        // batch_verify checks file extension, so copy to a .parquet path
330        let parquet_path = tmp.path().with_extension("parquet");
331        std::fs::copy(tmp.path(), &parquet_path).expect("copy to .parquet");
332        let paths = vec![parquet_path.clone()];
333        let results = StorageIntegrity::batch_verify(&paths, None).expect("batch verify");
334        assert_eq!(results.len(), 1);
335        assert!(results[0]);
336        std::fs::remove_file(&parquet_path).ok();
337    }
338
339    #[test]
340    fn test_compute_checksum() {
341        let data = b"hello world";
342        let checksum = StorageIntegrity::compute_checksum(data);
343
344        // SHA-256 produces 32 bytes = 64 hex characters
345        assert_eq!(checksum.len(), 64);
346
347        // Checksums should be deterministic
348        let checksum2 = StorageIntegrity::compute_checksum(data);
349        assert_eq!(checksum, checksum2);
350    }
351
352    #[test]
353    fn test_verify_checksum() {
354        let data = b"test data";
355        let checksum = StorageIntegrity::compute_checksum(data);
356
357        assert!(StorageIntegrity::verify_checksum(data, &checksum).unwrap());
358
359        // Wrong checksum should fail
360        assert!(!StorageIntegrity::verify_checksum(data, "wrong").unwrap());
361    }
362
363    #[test]
364    fn test_verify_or_error() {
365        let data = b"test data";
366        let checksum = StorageIntegrity::compute_checksum(data);
367
368        // Valid checksum should succeed
369        assert!(StorageIntegrity::verify_or_error(data, &checksum).is_ok());
370
371        // Invalid checksum should error
372        let result = StorageIntegrity::verify_or_error(data, "wrong");
373        assert!(result.is_err());
374        assert!(matches!(result, Err(AllSourceError::StorageError(_))));
375    }
376
377    #[test]
378    fn test_checksum_with_metadata() {
379        let data = b"test";
380
381        let checksum1 = StorageIntegrity::compute_checksum_with_metadata(data, Some("label1"));
382        let checksum2 = StorageIntegrity::compute_checksum_with_metadata(data, Some("label2"));
383
384        // Different labels produce different checksums
385        assert_ne!(checksum1, checksum2);
386
387        // Same label produces same checksum
388        let checksum3 = StorageIntegrity::compute_checksum_with_metadata(data, Some("label1"));
389        assert_eq!(checksum1, checksum3);
390    }
391
392    #[test]
393    fn test_different_data_different_checksums() {
394        let data1 = b"hello";
395        let data2 = b"world";
396
397        let checksum1 = StorageIntegrity::compute_checksum(data1);
398        let checksum2 = StorageIntegrity::compute_checksum(data2);
399
400        assert_ne!(checksum1, checksum2);
401    }
402
403    #[test]
404    fn test_empty_data() {
405        let data = b"";
406        let checksum = StorageIntegrity::compute_checksum(data);
407
408        // Should still produce valid checksum
409        assert_eq!(checksum.len(), 64);
410        assert!(StorageIntegrity::verify_checksum(data, &checksum).unwrap());
411    }
412
413    #[test]
414    fn test_large_data() {
415        let data = vec![0u8; 1_000_000]; // 1MB
416        let checksum = StorageIntegrity::compute_checksum(&data);
417
418        assert_eq!(checksum.len(), 64);
419        assert!(StorageIntegrity::verify_checksum(&data, &checksum).unwrap());
420    }
421
422    #[test]
423    fn test_integrity_check_result() {
424        let success = IntegrityCheckResult::success("test.wal".to_string(), "abc123".to_string());
425        assert!(success.valid);
426        assert_eq!(success.checksum, Some("abc123".to_string()));
427        assert_eq!(success.error, None);
428
429        let failure = IntegrityCheckResult::failure(
430            "test.wal".to_string(),
431            "corruption detected".to_string(),
432        );
433        assert!(!failure.valid);
434        assert_eq!(failure.checksum, None);
435        assert_eq!(failure.error, Some("corruption detected".to_string()));
436    }
437}