allsource_core/infrastructure/persistence/
storage_integrity.rs1use crate::error::{AllSourceError, Result};
2use parquet::file::reader::{FileReader, SerializedFileReader};
3use sha2::{Digest, Sha256};
4use std::path::Path;
5
6pub struct StorageIntegrity;
22
23impl StorageIntegrity {
24 pub fn compute_checksum(data: &[u8]) -> String {
37 let mut hasher = Sha256::new();
38 hasher.update(data);
39 format!("{:x}", hasher.finalize())
40 }
41
42 pub fn verify_checksum(data: &[u8], expected: &str) -> Result<bool> {
55 let computed = Self::compute_checksum(data);
56 Ok(computed == expected)
57 }
58
59 pub fn verify_or_error(data: &[u8], expected: &str) -> Result<()> {
63 if !Self::verify_checksum(data, expected)? {
64 return Err(AllSourceError::StorageError(format!(
65 "Checksum mismatch: expected {}, computed {}",
66 expected,
67 Self::compute_checksum(data)
68 )));
69 }
70 Ok(())
71 }
72
73 pub fn compute_checksum_with_metadata(data: &[u8], label: Option<&str>) -> String {
78 let mut hasher = Sha256::new();
79
80 hasher.update((data.len() as u64).to_le_bytes());
82
83 if let Some(l) = label {
85 hasher.update(l.as_bytes());
86 }
87
88 hasher.update(data);
90
91 format!("{:x}", hasher.finalize())
92 }
93
94 pub fn verify_wal_segment(segment_path: &Path) -> Result<bool> {
104 if !segment_path.exists() {
105 return Ok(false);
106 }
107
108 let data = std::fs::read(segment_path).map_err(|e| {
110 AllSourceError::StorageError(format!("Failed to read WAL segment: {e}"))
111 })?;
112
113 if data.len() < 64 {
115 return Err(AllSourceError::StorageError(
116 "WAL segment too short for checksum".to_string(),
117 ));
118 }
119
120 let stored_checksum = String::from_utf8_lossy(&data[0..64]).to_string();
121 let segment_data = &data[64..];
122
123 Self::verify_or_error(segment_data, &stored_checksum)?;
124 Ok(true)
125 }
126
127 pub fn verify_parquet_file(file_path: &Path) -> Result<bool> {
147 if !file_path.exists() {
148 return Ok(false);
149 }
150
151 let file = std::fs::File::open(file_path).map_err(|e| {
152 AllSourceError::StorageError(format!("Failed to open Parquet file: {e}"))
153 })?;
154
155 let reader = SerializedFileReader::new(file).map_err(|e| {
158 AllSourceError::StorageError(format!(
159 "Parquet metadata verification failed for {}: {e}",
160 file_path.display()
161 ))
162 })?;
163
164 let metadata = reader.metadata();
165 let file_metadata = metadata.file_metadata();
166
167 for rg_idx in 0..metadata.num_row_groups() {
169 let row_group = metadata.row_group(rg_idx);
170 for col_idx in 0..row_group.num_columns() {
171 let col = row_group.column(col_idx);
172 let _compression = col.compression();
174 let _num_values = col.num_values();
175 let (start, len) = col.byte_range();
177 if len == 0 && col.num_values() > 0 {
178 return Err(AllSourceError::StorageError(format!(
179 "Parquet column chunk {col_idx} in row group {rg_idx} has zero bytes but {} values in {}",
180 col.num_values(),
181 file_path.display()
182 )));
183 }
184 let _ = start; }
186 }
187
188 let _schema = file_metadata.schema_descr();
190 let _num_rows = file_metadata.num_rows();
191
192 Ok(true)
193 }
194
195 pub fn batch_verify<P: AsRef<Path>>(
199 paths: &[P],
200 progress_callback: Option<&dyn Fn(usize, usize)>,
201 ) -> Result<Vec<bool>> {
202 let mut results = Vec::new();
203
204 for (idx, path) in paths.iter().enumerate() {
205 let path = path.as_ref();
206
207 let result = if path.extension().and_then(|s| s.to_str()) == Some("wal") {
209 Self::verify_wal_segment(path)?
210 } else if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
211 Self::verify_parquet_file(path)?
212 } else {
213 false
214 };
215
216 results.push(result);
217
218 if let Some(callback) = progress_callback {
220 callback(idx + 1, paths.len());
221 }
222 }
223
224 Ok(results)
225 }
226}
227
228#[derive(Debug, Clone, PartialEq)]
230pub struct IntegrityCheckResult {
231 pub path: String,
232 pub valid: bool,
233 pub checksum: Option<String>,
234 pub error: Option<String>,
235}
236
237impl IntegrityCheckResult {
238 pub fn success(path: String, checksum: String) -> Self {
239 Self {
240 path,
241 valid: true,
242 checksum: Some(checksum),
243 error: None,
244 }
245 }
246
247 pub fn failure(path: String, error: String) -> Self {
248 Self {
249 path,
250 valid: false,
251 checksum: None,
252 error: Some(error),
253 }
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260 use arrow::{
261 array::{Int32Array, StringArray},
262 datatypes::{DataType, Field, Schema},
263 record_batch::RecordBatch,
264 };
265 use parquet::arrow::ArrowWriter;
266 use std::sync::Arc;
267 use tempfile::NamedTempFile;
268
269 fn create_test_parquet_file() -> NamedTempFile {
271 let schema = Arc::new(Schema::new(vec![
272 Field::new("id", DataType::Int32, false),
273 Field::new("name", DataType::Utf8, false),
274 ]));
275
276 let ids = Int32Array::from(vec![1, 2, 3]);
277 let names = StringArray::from(vec!["alpha", "beta", "gamma"]);
278 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(names)])
279 .expect("valid batch");
280
281 let tmp = NamedTempFile::new().expect("create temp file");
282 let mut writer =
283 ArrowWriter::try_new(tmp.reopen().expect("reopen"), schema, None).expect("writer");
284 writer.write(&batch).expect("write batch");
285 writer.close().expect("close writer");
286
287 tmp
288 }
289
290 #[test]
291 fn test_verify_parquet_file_valid() {
292 let tmp = create_test_parquet_file();
293 let result = StorageIntegrity::verify_parquet_file(tmp.path());
294 assert!(result.is_ok());
295 assert!(result.unwrap());
296 }
297
298 #[test]
299 fn test_verify_parquet_file_nonexistent() {
300 let result = StorageIntegrity::verify_parquet_file(Path::new("/nonexistent/file.parquet"));
301 assert!(result.is_ok());
302 assert!(!result.unwrap());
303 }
304
305 #[test]
306 fn test_verify_parquet_file_corrupt() {
307 let tmp = NamedTempFile::new().expect("create temp file");
308 std::fs::write(tmp.path(), b"this is not a parquet file").expect("write corrupt data");
309
310 let result = StorageIntegrity::verify_parquet_file(tmp.path());
311 assert!(result.is_err());
312 let err = result.unwrap_err();
313 assert!(matches!(err, AllSourceError::StorageError(_)));
314 }
315
316 #[test]
317 fn test_verify_parquet_file_truncated() {
318 let tmp = NamedTempFile::new().expect("create temp file");
320 std::fs::write(tmp.path(), b"PAR1").expect("write truncated data");
321
322 let result = StorageIntegrity::verify_parquet_file(tmp.path());
323 assert!(result.is_err());
324 }
325
326 #[test]
327 fn test_batch_verify_with_parquet() {
328 let tmp = create_test_parquet_file();
329 let parquet_path = tmp.path().with_extension("parquet");
331 std::fs::copy(tmp.path(), &parquet_path).expect("copy to .parquet");
332 let paths = vec![parquet_path.clone()];
333 let results = StorageIntegrity::batch_verify(&paths, None).expect("batch verify");
334 assert_eq!(results.len(), 1);
335 assert!(results[0]);
336 std::fs::remove_file(&parquet_path).ok();
337 }
338
339 #[test]
340 fn test_compute_checksum() {
341 let data = b"hello world";
342 let checksum = StorageIntegrity::compute_checksum(data);
343
344 assert_eq!(checksum.len(), 64);
346
347 let checksum2 = StorageIntegrity::compute_checksum(data);
349 assert_eq!(checksum, checksum2);
350 }
351
352 #[test]
353 fn test_verify_checksum() {
354 let data = b"test data";
355 let checksum = StorageIntegrity::compute_checksum(data);
356
357 assert!(StorageIntegrity::verify_checksum(data, &checksum).unwrap());
358
359 assert!(!StorageIntegrity::verify_checksum(data, "wrong").unwrap());
361 }
362
363 #[test]
364 fn test_verify_or_error() {
365 let data = b"test data";
366 let checksum = StorageIntegrity::compute_checksum(data);
367
368 assert!(StorageIntegrity::verify_or_error(data, &checksum).is_ok());
370
371 let result = StorageIntegrity::verify_or_error(data, "wrong");
373 assert!(result.is_err());
374 assert!(matches!(result, Err(AllSourceError::StorageError(_))));
375 }
376
377 #[test]
378 fn test_checksum_with_metadata() {
379 let data = b"test";
380
381 let checksum1 = StorageIntegrity::compute_checksum_with_metadata(data, Some("label1"));
382 let checksum2 = StorageIntegrity::compute_checksum_with_metadata(data, Some("label2"));
383
384 assert_ne!(checksum1, checksum2);
386
387 let checksum3 = StorageIntegrity::compute_checksum_with_metadata(data, Some("label1"));
389 assert_eq!(checksum1, checksum3);
390 }
391
392 #[test]
393 fn test_different_data_different_checksums() {
394 let data1 = b"hello";
395 let data2 = b"world";
396
397 let checksum1 = StorageIntegrity::compute_checksum(data1);
398 let checksum2 = StorageIntegrity::compute_checksum(data2);
399
400 assert_ne!(checksum1, checksum2);
401 }
402
403 #[test]
404 fn test_empty_data() {
405 let data = b"";
406 let checksum = StorageIntegrity::compute_checksum(data);
407
408 assert_eq!(checksum.len(), 64);
410 assert!(StorageIntegrity::verify_checksum(data, &checksum).unwrap());
411 }
412
413 #[test]
414 fn test_large_data() {
415 let data = vec![0u8; 1_000_000]; let checksum = StorageIntegrity::compute_checksum(&data);
417
418 assert_eq!(checksum.len(), 64);
419 assert!(StorageIntegrity::verify_checksum(&data, &checksum).unwrap());
420 }
421
422 #[test]
423 fn test_integrity_check_result() {
424 let success = IntegrityCheckResult::success("test.wal".to_string(), "abc123".to_string());
425 assert!(success.valid);
426 assert_eq!(success.checksum, Some("abc123".to_string()));
427 assert_eq!(success.error, None);
428
429 let failure = IntegrityCheckResult::failure(
430 "test.wal".to_string(),
431 "corruption detected".to_string(),
432 );
433 assert!(!failure.valid);
434 assert_eq!(failure.checksum, None);
435 assert_eq!(failure.error, Some("corruption detected".to_string()));
436 }
437}