allsource_core/infrastructure/persistence/
storage_integrity.rs1use crate::error::{AllSourceError, Result};
2use parquet::file::reader::{FileReader, SerializedFileReader};
3use sha2::{Digest, Sha256};
4use std::path::Path;
5
6pub struct StorageIntegrity;
22
23impl StorageIntegrity {
24 pub fn compute_checksum(data: &[u8]) -> String {
37 let mut hasher = Sha256::new();
38 hasher.update(data);
39 format!("{:x}", hasher.finalize())
40 }
41
42 pub fn verify_checksum(data: &[u8], expected: &str) -> Result<bool> {
55 let computed = Self::compute_checksum(data);
56 Ok(computed == expected)
57 }
58
59 pub fn verify_or_error(data: &[u8], expected: &str) -> Result<()> {
63 if !Self::verify_checksum(data, expected)? {
64 return Err(AllSourceError::StorageError(format!(
65 "Checksum mismatch: expected {}, computed {}",
66 expected,
67 Self::compute_checksum(data)
68 )));
69 }
70 Ok(())
71 }
72
73 pub fn compute_checksum_with_metadata(data: &[u8], label: Option<&str>) -> String {
78 let mut hasher = Sha256::new();
79
80 hasher.update((data.len() as u64).to_le_bytes());
82
83 if let Some(l) = label {
85 hasher.update(l.as_bytes());
86 }
87
88 hasher.update(data);
90
91 format!("{:x}", hasher.finalize())
92 }
93
94 pub fn verify_wal_segment(segment_path: &Path) -> Result<bool> {
104 if !segment_path.exists() {
105 return Ok(false);
106 }
107
108 let data = std::fs::read(segment_path).map_err(|e| {
110 AllSourceError::StorageError(format!("Failed to read WAL segment: {e}"))
111 })?;
112
113 if data.len() < 64 {
115 return Err(AllSourceError::StorageError(
116 "WAL segment too short for checksum".to_string(),
117 ));
118 }
119
120 let stored_checksum = String::from_utf8_lossy(&data[0..64]).to_string();
121 let segment_data = &data[64..];
122
123 Self::verify_or_error(segment_data, &stored_checksum)?;
124 Ok(true)
125 }
126
127 pub fn verify_parquet_file(file_path: &Path) -> Result<bool> {
139 if !file_path.exists() {
140 return Ok(false);
141 }
142
143 let file = std::fs::File::open(file_path).map_err(|e| {
144 AllSourceError::StorageError(format!("Failed to open Parquet file: {e}"))
145 })?;
146
147 let reader = SerializedFileReader::new(file).map_err(|e| {
150 AllSourceError::StorageError(format!(
151 "Parquet metadata verification failed for {}: {e}",
152 file_path.display()
153 ))
154 })?;
155
156 let metadata = reader.metadata();
157 let file_metadata = metadata.file_metadata();
158
159 for rg_idx in 0..metadata.num_row_groups() {
161 let row_group = metadata.row_group(rg_idx);
162 for col_idx in 0..row_group.num_columns() {
163 let col = row_group.column(col_idx);
164 let _compression = col.compression();
166 let _num_values = col.num_values();
167 let (start, len) = col.byte_range();
169 if len == 0 && col.num_values() > 0 {
170 return Err(AllSourceError::StorageError(format!(
171 "Parquet column chunk {col_idx} in row group {rg_idx} has zero bytes but {} values in {}",
172 col.num_values(),
173 file_path.display()
174 )));
175 }
176 let _ = start; }
178 }
179
180 let _schema = file_metadata.schema_descr();
182 let _num_rows = file_metadata.num_rows();
183
184 Ok(true)
185 }
186
187 pub fn batch_verify<P: AsRef<Path>>(
191 paths: &[P],
192 progress_callback: Option<&dyn Fn(usize, usize)>,
193 ) -> Result<Vec<bool>> {
194 let mut results = Vec::new();
195
196 for (idx, path) in paths.iter().enumerate() {
197 let path = path.as_ref();
198
199 let result = if path.extension().and_then(|s| s.to_str()) == Some("wal") {
201 Self::verify_wal_segment(path)?
202 } else if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
203 Self::verify_parquet_file(path)?
204 } else {
205 false
206 };
207
208 results.push(result);
209
210 if let Some(callback) = progress_callback {
212 callback(idx + 1, paths.len());
213 }
214 }
215
216 Ok(results)
217 }
218}
219
220#[derive(Debug, Clone, PartialEq)]
222pub struct IntegrityCheckResult {
223 pub path: String,
224 pub valid: bool,
225 pub checksum: Option<String>,
226 pub error: Option<String>,
227}
228
229impl IntegrityCheckResult {
230 pub fn success(path: String, checksum: String) -> Self {
231 Self {
232 path,
233 valid: true,
234 checksum: Some(checksum),
235 error: None,
236 }
237 }
238
239 pub fn failure(path: String, error: String) -> Self {
240 Self {
241 path,
242 valid: false,
243 checksum: None,
244 error: Some(error),
245 }
246 }
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252 use arrow::{
253 array::{Int32Array, StringArray},
254 datatypes::{DataType, Field, Schema},
255 record_batch::RecordBatch,
256 };
257 use parquet::arrow::ArrowWriter;
258 use std::sync::Arc;
259 use tempfile::NamedTempFile;
260
261 fn create_test_parquet_file() -> NamedTempFile {
263 let schema = Arc::new(Schema::new(vec![
264 Field::new("id", DataType::Int32, false),
265 Field::new("name", DataType::Utf8, false),
266 ]));
267
268 let ids = Int32Array::from(vec![1, 2, 3]);
269 let names = StringArray::from(vec!["alpha", "beta", "gamma"]);
270 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(names)])
271 .expect("valid batch");
272
273 let tmp = NamedTempFile::new().expect("create temp file");
274 let mut writer =
275 ArrowWriter::try_new(tmp.reopen().expect("reopen"), schema, None).expect("writer");
276 writer.write(&batch).expect("write batch");
277 writer.close().expect("close writer");
278
279 tmp
280 }
281
282 #[test]
283 fn test_verify_parquet_file_valid() {
284 let tmp = create_test_parquet_file();
285 let result = StorageIntegrity::verify_parquet_file(tmp.path());
286 assert!(result.is_ok());
287 assert!(result.unwrap());
288 }
289
290 #[test]
291 fn test_verify_parquet_file_nonexistent() {
292 let result = StorageIntegrity::verify_parquet_file(Path::new("/nonexistent/file.parquet"));
293 assert!(result.is_ok());
294 assert!(!result.unwrap());
295 }
296
297 #[test]
298 fn test_verify_parquet_file_corrupt() {
299 let tmp = NamedTempFile::new().expect("create temp file");
300 std::fs::write(tmp.path(), b"this is not a parquet file").expect("write corrupt data");
301
302 let result = StorageIntegrity::verify_parquet_file(tmp.path());
303 assert!(result.is_err());
304 let err = result.unwrap_err();
305 assert!(matches!(err, AllSourceError::StorageError(_)));
306 }
307
308 #[test]
309 fn test_verify_parquet_file_truncated() {
310 let tmp = NamedTempFile::new().expect("create temp file");
312 std::fs::write(tmp.path(), b"PAR1").expect("write truncated data");
313
314 let result = StorageIntegrity::verify_parquet_file(tmp.path());
315 assert!(result.is_err());
316 }
317
318 #[test]
319 fn test_batch_verify_with_parquet() {
320 let tmp = create_test_parquet_file();
321 let parquet_path = tmp.path().with_extension("parquet");
323 std::fs::copy(tmp.path(), &parquet_path).expect("copy to .parquet");
324 let paths = vec![parquet_path.clone()];
325 let results = StorageIntegrity::batch_verify(&paths, None).expect("batch verify");
326 assert_eq!(results.len(), 1);
327 assert!(results[0]);
328 std::fs::remove_file(&parquet_path).ok();
329 }
330
331 #[test]
332 fn test_compute_checksum() {
333 let data = b"hello world";
334 let checksum = StorageIntegrity::compute_checksum(data);
335
336 assert_eq!(checksum.len(), 64);
338
339 let checksum2 = StorageIntegrity::compute_checksum(data);
341 assert_eq!(checksum, checksum2);
342 }
343
344 #[test]
345 fn test_verify_checksum() {
346 let data = b"test data";
347 let checksum = StorageIntegrity::compute_checksum(data);
348
349 assert!(StorageIntegrity::verify_checksum(data, &checksum).unwrap());
350
351 assert!(!StorageIntegrity::verify_checksum(data, "wrong").unwrap());
353 }
354
355 #[test]
356 fn test_verify_or_error() {
357 let data = b"test data";
358 let checksum = StorageIntegrity::compute_checksum(data);
359
360 assert!(StorageIntegrity::verify_or_error(data, &checksum).is_ok());
362
363 let result = StorageIntegrity::verify_or_error(data, "wrong");
365 assert!(result.is_err());
366 assert!(matches!(result, Err(AllSourceError::StorageError(_))));
367 }
368
369 #[test]
370 fn test_checksum_with_metadata() {
371 let data = b"test";
372
373 let checksum1 = StorageIntegrity::compute_checksum_with_metadata(data, Some("label1"));
374 let checksum2 = StorageIntegrity::compute_checksum_with_metadata(data, Some("label2"));
375
376 assert_ne!(checksum1, checksum2);
378
379 let checksum3 = StorageIntegrity::compute_checksum_with_metadata(data, Some("label1"));
381 assert_eq!(checksum1, checksum3);
382 }
383
384 #[test]
385 fn test_different_data_different_checksums() {
386 let data1 = b"hello";
387 let data2 = b"world";
388
389 let checksum1 = StorageIntegrity::compute_checksum(data1);
390 let checksum2 = StorageIntegrity::compute_checksum(data2);
391
392 assert_ne!(checksum1, checksum2);
393 }
394
395 #[test]
396 fn test_empty_data() {
397 let data = b"";
398 let checksum = StorageIntegrity::compute_checksum(data);
399
400 assert_eq!(checksum.len(), 64);
402 assert!(StorageIntegrity::verify_checksum(data, &checksum).unwrap());
403 }
404
405 #[test]
406 fn test_large_data() {
407 let data = vec![0u8; 1_000_000]; let checksum = StorageIntegrity::compute_checksum(&data);
409
410 assert_eq!(checksum.len(), 64);
411 assert!(StorageIntegrity::verify_checksum(&data, &checksum).unwrap());
412 }
413
414 #[test]
415 fn test_integrity_check_result() {
416 let success = IntegrityCheckResult::success("test.wal".to_string(), "abc123".to_string());
417 assert!(success.valid);
418 assert_eq!(success.checksum, Some("abc123".to_string()));
419 assert_eq!(success.error, None);
420
421 let failure = IntegrityCheckResult::failure(
422 "test.wal".to_string(),
423 "corruption detected".to_string(),
424 );
425 assert!(!failure.valid);
426 assert_eq!(failure.checksum, None);
427 assert_eq!(failure.error, Some("corruption detected".to_string()));
428 }
429}