leptos_sync_core/reliability/data_integrity/
mod.rs1pub mod checksum;
10pub mod corruption;
11pub mod types;
12pub mod version;
13
14pub use checksum::{ChecksumAlgorithm, ChecksumConfig, ChecksumVerifier};
16pub use corruption::{CorruptionConfig, CorruptionDetector, CorruptionResult};
17pub use types::{DataFormat, DataMetadata, IntegrityResult, IntegrityStats};
18pub use version::{VersionConfig, VersionVerifier};
19
20use std::sync::Arc;
21use tokio::sync::RwLock;
22
23#[derive(Debug, Clone, PartialEq)]
25pub struct IntegrityConfig {
26 pub checksum_config: ChecksumConfig,
28 pub version_config: VersionConfig,
30 pub corruption_config: CorruptionConfig,
32}
33
34impl Default for IntegrityConfig {
35 fn default() -> Self {
36 Self {
37 checksum_config: ChecksumConfig::default(),
38 version_config: VersionConfig::default(),
39 corruption_config: CorruptionConfig::default(),
40 }
41 }
42}
43
44#[derive(Debug, Clone)]
46pub struct DataIntegrity {
47 checksum_verifier: ChecksumVerifier,
49 version_verifier: VersionVerifier,
51 corruption_detector: CorruptionDetector,
53 stats: Arc<RwLock<IntegrityStats>>,
55 initialized: bool,
57}
58
59impl DataIntegrity {
60 pub fn new() -> Self {
62 Self {
63 checksum_verifier: ChecksumVerifier::new(),
64 version_verifier: VersionVerifier::new(),
65 corruption_detector: CorruptionDetector::new(),
66 stats: Arc::new(RwLock::new(IntegrityStats::new())),
67 initialized: false,
68 }
69 }
70
71 pub fn with_config(config: IntegrityConfig) -> Self {
73 Self {
74 checksum_verifier: ChecksumVerifier::with_config(config.checksum_config),
75 version_verifier: VersionVerifier::with_config(config.version_config),
76 corruption_detector: CorruptionDetector::with_config(config.corruption_config),
77 stats: Arc::new(RwLock::new(IntegrityStats::new())),
78 initialized: false,
79 }
80 }
81
82 pub async fn initialize(&mut self) -> Result<(), IntegrityError> {
84 if self.initialized {
85 return Err(IntegrityError::AlreadyInitialized);
86 }
87
88 self.checksum_verifier = ChecksumVerifier::new();
90 self.version_verifier = VersionVerifier::new();
91 self.corruption_detector = CorruptionDetector::new();
92
93 self.initialized = true;
94 Ok(())
95 }
96
97 pub async fn shutdown(&mut self) -> Result<(), IntegrityError> {
99 if !self.initialized {
100 return Err(IntegrityError::NotInitialized);
101 }
102
103 self.checksum_verifier.clear_cache();
105 self.version_verifier.clear_versions();
106 self.corruption_detector.clear_stats();
107
108 self.initialized = false;
109 Ok(())
110 }
111
112 pub async fn verify_integrity(
114 &mut self,
115 data: &[u8],
116 key: &str,
117 ) -> Result<IntegrityResult, IntegrityError> {
118 if !self.initialized {
119 return Err(IntegrityError::NotInitialized);
120 }
121
122 let metadata = DataMetadata::new(DataFormat::Binary, data.len());
123
124 let checksum_valid = if self.checksum_verifier.config().verify_on_read {
126 if let Some(expected_checksum) = self.checksum_verifier.get_cached_checksum(key) {
127 self.checksum_verifier
128 .verify_checksum(data, expected_checksum)?
129 } else {
130 true }
132 } else {
133 true
134 };
135
136 let version_valid = if self.version_verifier.config().verify_on_read {
138 let current_version = self.version_verifier.get_version(key);
139 self.version_verifier.verify_version(key, current_version)?
140 } else {
141 true
142 };
143
144 let corruption_result = self.corruption_detector.detect_corruption(data, key);
146 let corruption_detected = corruption_result.is_corrupted;
147
148 let is_valid = checksum_valid && version_valid && !corruption_detected;
149
150 {
152 let mut stats = self.stats.write().await;
153 stats.record_verification(is_valid);
154 if corruption_detected {
155 stats.record_corruption_detection();
156 }
157 if corruption_result.recovery_attempted {
158 stats.record_recovery_attempt(corruption_result.recovery_successful);
159 }
160 }
161
162 let result = if is_valid {
163 IntegrityResult::valid(metadata)
164 } else {
165 let error_msg = format!(
166 "Integrity check failed: checksum={}, version={}, corruption={}",
167 checksum_valid, version_valid, corruption_detected
168 );
169 IntegrityResult::invalid(error_msg, metadata)
170 };
171
172 Ok(result)
173 }
174
175 pub async fn compute_checksum(
177 &mut self,
178 data: &[u8],
179 key: &str,
180 ) -> Result<String, IntegrityError> {
181 if !self.initialized {
182 return Err(IntegrityError::NotInitialized);
183 }
184
185 if !self.checksum_verifier.config().compute_on_write {
186 return Err(IntegrityError::ChecksumComputationDisabled);
187 }
188
189 self.checksum_verifier.compute_checksum(data, key)
190 }
191
192 pub async fn increment_version(&mut self, key: &str) -> Result<u64, IntegrityError> {
194 if !self.initialized {
195 return Err(IntegrityError::NotInitialized);
196 }
197
198 if !self.version_verifier.config().increment_on_write {
199 return Err(IntegrityError::VersionIncrementDisabled);
200 }
201
202 Ok(self.version_verifier.increment_version(key))
203 }
204
205 pub async fn get_stats(&self) -> IntegrityStats {
207 self.stats.read().await.clone()
208 }
209
210 pub fn is_initialized(&self) -> bool {
212 self.initialized
213 }
214
215 pub fn checksum_verifier(&self) -> &ChecksumVerifier {
217 &self.checksum_verifier
218 }
219
220 pub fn version_verifier(&self) -> &VersionVerifier {
222 &self.version_verifier
223 }
224
225 pub fn corruption_detector(&self) -> &CorruptionDetector {
227 &self.corruption_detector
228 }
229}
230
231impl Default for DataIntegrity {
232 fn default() -> Self {
233 Self::new()
234 }
235}
236
237#[derive(Debug, Clone, PartialEq, Eq)]
239pub enum IntegrityError {
240 NotInitialized,
242 AlreadyInitialized,
244 ChecksumMismatch,
246 VersionMismatch {
248 key: String,
249 expected: u64,
250 actual: u64,
251 },
252 CorruptionDetected,
254 ChecksumComputationDisabled,
256 VersionIncrementDisabled,
258 InvalidDataFormat,
260 OperationFailed(String),
262}
263
264impl std::fmt::Display for IntegrityError {
265 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
266 match self {
267 IntegrityError::NotInitialized => write!(f, "Data integrity system not initialized"),
268 IntegrityError::AlreadyInitialized => {
269 write!(f, "Data integrity system already initialized")
270 }
271 IntegrityError::ChecksumMismatch => write!(f, "Checksum mismatch detected"),
272 IntegrityError::VersionMismatch {
273 key,
274 expected,
275 actual,
276 } => {
277 write!(
278 f,
279 "Version mismatch for key '{}': expected {}, got {}",
280 key, expected, actual
281 )
282 }
283 IntegrityError::CorruptionDetected => write!(f, "Data corruption detected"),
284 IntegrityError::ChecksumComputationDisabled => {
285 write!(f, "Checksum computation is disabled")
286 }
287 IntegrityError::VersionIncrementDisabled => write!(f, "Version increment is disabled"),
288 IntegrityError::InvalidDataFormat => write!(f, "Invalid data format"),
289 IntegrityError::OperationFailed(msg) => write!(f, "Operation failed: {}", msg),
290 }
291 }
292}
293
294impl std::error::Error for IntegrityError {}
295
296#[cfg(test)]
297mod integration_tests {
298 use super::*;
299
300 #[tokio::test]
301 async fn test_data_integrity_creation() {
302 let mut integrity = DataIntegrity::new();
303 assert!(!integrity.is_initialized());
304
305 integrity.initialize().await.unwrap();
306 assert!(integrity.is_initialized());
307 }
308
309 #[tokio::test]
310 async fn test_data_integrity_with_config() {
311 let config = IntegrityConfig::default();
312 let mut integrity = DataIntegrity::with_config(config);
313
314 integrity.initialize().await.unwrap();
315 assert!(integrity.is_initialized());
316 }
317
318 #[tokio::test]
319 async fn test_data_integrity_initialization() {
320 let mut integrity = DataIntegrity::new();
321
322 let result = integrity.initialize().await;
324 assert!(result.is_ok());
325 assert!(integrity.is_initialized());
326
327 let result = integrity.initialize().await;
329 assert!(result.is_err());
330 }
331
332 #[tokio::test]
333 async fn test_data_integrity_shutdown() {
334 let mut integrity = DataIntegrity::new();
335
336 integrity.initialize().await.unwrap();
338 assert!(integrity.is_initialized());
339
340 let result = integrity.shutdown().await;
342 assert!(result.is_ok());
343 assert!(!integrity.is_initialized());
344 }
345
346 #[tokio::test]
347 async fn test_verify_integrity() {
348 let mut integrity = DataIntegrity::new();
349 integrity.initialize().await.unwrap();
350
351 let data = b"test data";
352 let key = "test_key";
353
354 let result = integrity.verify_integrity(data, key).await.unwrap();
355 assert!(result.is_valid);
356 assert!(result.checksum_valid);
357 assert!(result.version_valid);
358 assert!(!result.corruption_detected);
359 }
360
361 #[tokio::test]
362 async fn test_compute_checksum() {
363 let mut integrity = DataIntegrity::new();
364 integrity.initialize().await.unwrap();
365
366 let data = b"test data";
367 let key = "test_key";
368
369 let checksum = integrity.compute_checksum(data, key).await.unwrap();
370 assert!(!checksum.is_empty());
371 }
372
373 #[tokio::test]
374 async fn test_increment_version() {
375 let mut integrity = DataIntegrity::new();
376 integrity.initialize().await.unwrap();
377
378 let key = "test_key";
379
380 let version1 = integrity.increment_version(key).await.unwrap();
381 let version2 = integrity.increment_version(key).await.unwrap();
382
383 assert_eq!(version1, 1);
384 assert_eq!(version2, 2);
385 }
386
387 #[tokio::test]
388 async fn test_get_stats() {
389 let mut integrity = DataIntegrity::new();
390 integrity.initialize().await.unwrap();
391
392 let data = b"test data";
393 let key = "test_key";
394
395 integrity.verify_integrity(data, key).await.unwrap();
397 integrity.compute_checksum(data, key).await.unwrap();
398 integrity.increment_version(key).await.unwrap();
399
400 let stats = integrity.get_stats().await;
401 assert!(stats.total_verifications > 0);
402 assert!(stats.successful_verifications > 0);
403 }
404
405 #[tokio::test]
406 async fn test_operations_before_initialization() {
407 let mut integrity = DataIntegrity::new();
408
409 let data = b"test data";
410 let key = "test_key";
411
412 assert!(integrity.verify_integrity(data, key).await.is_err());
414 assert!(integrity.compute_checksum(data, key).await.is_err());
415 assert!(integrity.increment_version(key).await.is_err());
416 }
417}