allsource_core/infrastructure/persistence/
storage_integrity.rs1use crate::error::{AllSourceError, Result};
2use sha2::{Digest, Sha256};
3use std::path::Path;
4
5pub struct StorageIntegrity;
21
22impl StorageIntegrity {
23 pub fn compute_checksum(data: &[u8]) -> String {
36 let mut hasher = Sha256::new();
37 hasher.update(data);
38 format!("{:x}", hasher.finalize())
39 }
40
41 pub fn verify_checksum(data: &[u8], expected: &str) -> Result<bool> {
54 let computed = Self::compute_checksum(data);
55 Ok(computed == expected)
56 }
57
58 pub fn verify_or_error(data: &[u8], expected: &str) -> Result<()> {
62 if !Self::verify_checksum(data, expected)? {
63 return Err(AllSourceError::StorageError(format!(
64 "Checksum mismatch: expected {}, computed {}",
65 expected,
66 Self::compute_checksum(data)
67 )));
68 }
69 Ok(())
70 }
71
72 pub fn compute_checksum_with_metadata(data: &[u8], label: Option<&str>) -> String {
77 let mut hasher = Sha256::new();
78
79 hasher.update(&(data.len() as u64).to_le_bytes());
81
82 if let Some(l) = label {
84 hasher.update(l.as_bytes());
85 }
86
87 hasher.update(data);
89
90 format!("{:x}", hasher.finalize())
91 }
92
93 pub fn verify_wal_segment(segment_path: &Path) -> Result<bool> {
103 if !segment_path.exists() {
104 return Ok(false);
105 }
106
107 let data = std::fs::read(segment_path).map_err(|e| {
109 AllSourceError::StorageError(format!("Failed to read WAL segment: {}", e))
110 })?;
111
112 if data.len() < 64 {
114 return Err(AllSourceError::StorageError(
115 "WAL segment too short for checksum".to_string(),
116 ));
117 }
118
119 let stored_checksum = String::from_utf8_lossy(&data[0..64]).to_string();
120 let segment_data = &data[64..];
121
122 Self::verify_or_error(segment_data, &stored_checksum)?;
123 Ok(true)
124 }
125
126 pub fn verify_parquet_file(file_path: &Path) -> Result<bool> {
136 if !file_path.exists() {
137 return Ok(false);
138 }
139
140 let _data = std::fs::read(file_path).map_err(|e| {
143 AllSourceError::StorageError(format!("Failed to read Parquet file: {}", e))
144 })?;
145
146 Ok(true)
148 }
149
150 pub fn batch_verify<P: AsRef<Path>>(
154 paths: &[P],
155 progress_callback: Option<Box<dyn Fn(usize, usize)>>,
156 ) -> Result<Vec<bool>> {
157 let mut results = Vec::new();
158
159 for (idx, path) in paths.iter().enumerate() {
160 let path = path.as_ref();
161
162 let result = if path.extension().and_then(|s| s.to_str()) == Some("wal") {
164 Self::verify_wal_segment(path)?
165 } else if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
166 Self::verify_parquet_file(path)?
167 } else {
168 false
169 };
170
171 results.push(result);
172
173 if let Some(ref callback) = progress_callback {
175 callback(idx + 1, paths.len());
176 }
177 }
178
179 Ok(results)
180 }
181}
182
183#[derive(Debug, Clone, PartialEq)]
185pub struct IntegrityCheckResult {
186 pub path: String,
187 pub valid: bool,
188 pub checksum: Option<String>,
189 pub error: Option<String>,
190}
191
192impl IntegrityCheckResult {
193 pub fn success(path: String, checksum: String) -> Self {
194 Self {
195 path,
196 valid: true,
197 checksum: Some(checksum),
198 error: None,
199 }
200 }
201
202 pub fn failure(path: String, error: String) -> Self {
203 Self {
204 path,
205 valid: false,
206 checksum: None,
207 error: Some(error),
208 }
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215
216 #[test]
217 fn test_compute_checksum() {
218 let data = b"hello world";
219 let checksum = StorageIntegrity::compute_checksum(data);
220
221 assert_eq!(checksum.len(), 64);
223
224 let checksum2 = StorageIntegrity::compute_checksum(data);
226 assert_eq!(checksum, checksum2);
227 }
228
229 #[test]
230 fn test_verify_checksum() {
231 let data = b"test data";
232 let checksum = StorageIntegrity::compute_checksum(data);
233
234 assert!(StorageIntegrity::verify_checksum(data, &checksum).unwrap());
235
236 assert!(!StorageIntegrity::verify_checksum(data, "wrong").unwrap());
238 }
239
240 #[test]
241 fn test_verify_or_error() {
242 let data = b"test data";
243 let checksum = StorageIntegrity::compute_checksum(data);
244
245 assert!(StorageIntegrity::verify_or_error(data, &checksum).is_ok());
247
248 let result = StorageIntegrity::verify_or_error(data, "wrong");
250 assert!(result.is_err());
251 assert!(matches!(result, Err(AllSourceError::StorageError(_))));
252 }
253
254 #[test]
255 fn test_checksum_with_metadata() {
256 let data = b"test";
257
258 let checksum1 = StorageIntegrity::compute_checksum_with_metadata(data, Some("label1"));
259 let checksum2 = StorageIntegrity::compute_checksum_with_metadata(data, Some("label2"));
260
261 assert_ne!(checksum1, checksum2);
263
264 let checksum3 = StorageIntegrity::compute_checksum_with_metadata(data, Some("label1"));
266 assert_eq!(checksum1, checksum3);
267 }
268
269 #[test]
270 fn test_different_data_different_checksums() {
271 let data1 = b"hello";
272 let data2 = b"world";
273
274 let checksum1 = StorageIntegrity::compute_checksum(data1);
275 let checksum2 = StorageIntegrity::compute_checksum(data2);
276
277 assert_ne!(checksum1, checksum2);
278 }
279
280 #[test]
281 fn test_empty_data() {
282 let data = b"";
283 let checksum = StorageIntegrity::compute_checksum(data);
284
285 assert_eq!(checksum.len(), 64);
287 assert!(StorageIntegrity::verify_checksum(data, &checksum).unwrap());
288 }
289
290 #[test]
291 fn test_large_data() {
292 let data = vec![0u8; 1_000_000]; let checksum = StorageIntegrity::compute_checksum(&data);
294
295 assert_eq!(checksum.len(), 64);
296 assert!(StorageIntegrity::verify_checksum(&data, &checksum).unwrap());
297 }
298
299 #[test]
300 fn test_integrity_check_result() {
301 let success = IntegrityCheckResult::success("test.wal".to_string(), "abc123".to_string());
302 assert!(success.valid);
303 assert_eq!(success.checksum, Some("abc123".to_string()));
304 assert_eq!(success.error, None);
305
306 let failure = IntegrityCheckResult::failure(
307 "test.wal".to_string(),
308 "corruption detected".to_string(),
309 );
310 assert!(!failure.valid);
311 assert_eq!(failure.checksum, None);
312 assert_eq!(failure.error, Some("corruption detected".to_string()));
313 }
314}