leptos_sync_core/reliability/data_integrity/
mod.rs

1//! Data Integrity Module
2//!
3//! This module provides comprehensive data integrity verification including:
4//! - Checksum verification for data corruption detection
5//! - Version verification for data consistency
6//! - Corruption detection and recovery
7//! - Data validation and sanitization
8
9pub mod checksum;
10pub mod corruption;
11pub mod types;
12pub mod version;
13
14// Re-export main types for convenience
15pub use checksum::{ChecksumAlgorithm, ChecksumConfig, ChecksumVerifier};
16pub use corruption::{CorruptionConfig, CorruptionDetector, CorruptionResult};
17pub use types::{DataFormat, DataMetadata, IntegrityResult, IntegrityStats};
18pub use version::{VersionConfig, VersionVerifier};
19
20use std::sync::Arc;
21use tokio::sync::RwLock;
22
23/// Configuration for the data integrity system
24#[derive(Debug, Clone, PartialEq)]
25pub struct IntegrityConfig {
26    /// Checksum configuration
27    pub checksum_config: ChecksumConfig,
28    /// Version configuration
29    pub version_config: VersionConfig,
30    /// Corruption configuration
31    pub corruption_config: CorruptionConfig,
32}
33
34impl Default for IntegrityConfig {
35    fn default() -> Self {
36        Self {
37            checksum_config: ChecksumConfig::default(),
38            version_config: VersionConfig::default(),
39            corruption_config: CorruptionConfig::default(),
40        }
41    }
42}
43
44/// Data integrity system for corruption detection and verification
45#[derive(Debug, Clone)]
46pub struct DataIntegrity {
47    /// Checksum verifier
48    checksum_verifier: ChecksumVerifier,
49    /// Version verifier
50    version_verifier: VersionVerifier,
51    /// Corruption detector
52    corruption_detector: CorruptionDetector,
53    /// Integrity statistics
54    stats: Arc<RwLock<IntegrityStats>>,
55    /// Whether the system is initialized
56    initialized: bool,
57}
58
59impl DataIntegrity {
60    /// Create a new data integrity system
61    pub fn new() -> Self {
62        Self {
63            checksum_verifier: ChecksumVerifier::new(),
64            version_verifier: VersionVerifier::new(),
65            corruption_detector: CorruptionDetector::new(),
66            stats: Arc::new(RwLock::new(IntegrityStats::new())),
67            initialized: false,
68        }
69    }
70
71    /// Create a new data integrity system with custom configuration
72    pub fn with_config(config: IntegrityConfig) -> Self {
73        Self {
74            checksum_verifier: ChecksumVerifier::with_config(config.checksum_config),
75            version_verifier: VersionVerifier::with_config(config.version_config),
76            corruption_detector: CorruptionDetector::with_config(config.corruption_config),
77            stats: Arc::new(RwLock::new(IntegrityStats::new())),
78            initialized: false,
79        }
80    }
81
82    /// Initialize the data integrity system
83    pub async fn initialize(&mut self) -> Result<(), IntegrityError> {
84        if self.initialized {
85            return Err(IntegrityError::AlreadyInitialized);
86        }
87
88        // Initialize components
89        self.checksum_verifier = ChecksumVerifier::new();
90        self.version_verifier = VersionVerifier::new();
91        self.corruption_detector = CorruptionDetector::new();
92
93        self.initialized = true;
94        Ok(())
95    }
96
97    /// Shutdown the data integrity system
98    pub async fn shutdown(&mut self) -> Result<(), IntegrityError> {
99        if !self.initialized {
100            return Err(IntegrityError::NotInitialized);
101        }
102
103        // Clear all caches and statistics
104        self.checksum_verifier.clear_cache();
105        self.version_verifier.clear_versions();
106        self.corruption_detector.clear_stats();
107
108        self.initialized = false;
109        Ok(())
110    }
111
112    /// Verify data integrity
113    pub async fn verify_integrity(
114        &mut self,
115        data: &[u8],
116        key: &str,
117    ) -> Result<IntegrityResult, IntegrityError> {
118        if !self.initialized {
119            return Err(IntegrityError::NotInitialized);
120        }
121
122        let metadata = DataMetadata::new(DataFormat::Binary, data.len());
123
124        // Perform checksum verification
125        let checksum_valid = if self.checksum_verifier.config().verify_on_read {
126            if let Some(expected_checksum) = self.checksum_verifier.get_cached_checksum(key) {
127                self.checksum_verifier
128                    .verify_checksum(data, expected_checksum)?
129            } else {
130                true // No checksum to verify
131            }
132        } else {
133            true
134        };
135
136        // Perform version verification
137        let version_valid = if self.version_verifier.config().verify_on_read {
138            let current_version = self.version_verifier.get_version(key);
139            self.version_verifier.verify_version(key, current_version)?
140        } else {
141            true
142        };
143
144        // Perform corruption detection
145        let corruption_result = self.corruption_detector.detect_corruption(data, key);
146        let corruption_detected = corruption_result.is_corrupted;
147
148        let is_valid = checksum_valid && version_valid && !corruption_detected;
149
150        // Update statistics
151        {
152            let mut stats = self.stats.write().await;
153            stats.record_verification(is_valid);
154            if corruption_detected {
155                stats.record_corruption_detection();
156            }
157            if corruption_result.recovery_attempted {
158                stats.record_recovery_attempt(corruption_result.recovery_successful);
159            }
160        }
161
162        let result = if is_valid {
163            IntegrityResult::valid(metadata)
164        } else {
165            let error_msg = format!(
166                "Integrity check failed: checksum={}, version={}, corruption={}",
167                checksum_valid, version_valid, corruption_detected
168            );
169            IntegrityResult::invalid(error_msg, metadata)
170        };
171
172        Ok(result)
173    }
174
175    /// Compute and store checksum for data
176    pub async fn compute_checksum(
177        &mut self,
178        data: &[u8],
179        key: &str,
180    ) -> Result<String, IntegrityError> {
181        if !self.initialized {
182            return Err(IntegrityError::NotInitialized);
183        }
184
185        if !self.checksum_verifier.config().compute_on_write {
186            return Err(IntegrityError::ChecksumComputationDisabled);
187        }
188
189        self.checksum_verifier.compute_checksum(data, key)
190    }
191
192    /// Increment version for data
193    pub async fn increment_version(&mut self, key: &str) -> Result<u64, IntegrityError> {
194        if !self.initialized {
195            return Err(IntegrityError::NotInitialized);
196        }
197
198        if !self.version_verifier.config().increment_on_write {
199            return Err(IntegrityError::VersionIncrementDisabled);
200        }
201
202        Ok(self.version_verifier.increment_version(key))
203    }
204
205    /// Get integrity statistics
206    pub async fn get_stats(&self) -> IntegrityStats {
207        self.stats.read().await.clone()
208    }
209
210    /// Check if the system is initialized
211    pub fn is_initialized(&self) -> bool {
212        self.initialized
213    }
214
215    /// Get checksum verifier
216    pub fn checksum_verifier(&self) -> &ChecksumVerifier {
217        &self.checksum_verifier
218    }
219
220    /// Get version verifier
221    pub fn version_verifier(&self) -> &VersionVerifier {
222        &self.version_verifier
223    }
224
225    /// Get corruption detector
226    pub fn corruption_detector(&self) -> &CorruptionDetector {
227        &self.corruption_detector
228    }
229}
230
231impl Default for DataIntegrity {
232    fn default() -> Self {
233        Self::new()
234    }
235}
236
237/// Error types for data integrity operations
238#[derive(Debug, Clone, PartialEq, Eq)]
239pub enum IntegrityError {
240    /// System not initialized
241    NotInitialized,
242    /// System already initialized
243    AlreadyInitialized,
244    /// Checksum mismatch
245    ChecksumMismatch,
246    /// Version mismatch
247    VersionMismatch {
248        key: String,
249        expected: u64,
250        actual: u64,
251    },
252    /// Corruption detected
253    CorruptionDetected,
254    /// Checksum computation disabled
255    ChecksumComputationDisabled,
256    /// Version increment disabled
257    VersionIncrementDisabled,
258    /// Invalid data format
259    InvalidDataFormat,
260    /// Operation failed
261    OperationFailed(String),
262}
263
264impl std::fmt::Display for IntegrityError {
265    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
266        match self {
267            IntegrityError::NotInitialized => write!(f, "Data integrity system not initialized"),
268            IntegrityError::AlreadyInitialized => {
269                write!(f, "Data integrity system already initialized")
270            }
271            IntegrityError::ChecksumMismatch => write!(f, "Checksum mismatch detected"),
272            IntegrityError::VersionMismatch {
273                key,
274                expected,
275                actual,
276            } => {
277                write!(
278                    f,
279                    "Version mismatch for key '{}': expected {}, got {}",
280                    key, expected, actual
281                )
282            }
283            IntegrityError::CorruptionDetected => write!(f, "Data corruption detected"),
284            IntegrityError::ChecksumComputationDisabled => {
285                write!(f, "Checksum computation is disabled")
286            }
287            IntegrityError::VersionIncrementDisabled => write!(f, "Version increment is disabled"),
288            IntegrityError::InvalidDataFormat => write!(f, "Invalid data format"),
289            IntegrityError::OperationFailed(msg) => write!(f, "Operation failed: {}", msg),
290        }
291    }
292}
293
294impl std::error::Error for IntegrityError {}
295
296#[cfg(test)]
297mod integration_tests {
298    use super::*;
299
300    #[tokio::test]
301    async fn test_data_integrity_creation() {
302        let mut integrity = DataIntegrity::new();
303        assert!(!integrity.is_initialized());
304
305        integrity.initialize().await.unwrap();
306        assert!(integrity.is_initialized());
307    }
308
309    #[tokio::test]
310    async fn test_data_integrity_with_config() {
311        let config = IntegrityConfig::default();
312        let mut integrity = DataIntegrity::with_config(config);
313
314        integrity.initialize().await.unwrap();
315        assert!(integrity.is_initialized());
316    }
317
318    #[tokio::test]
319    async fn test_data_integrity_initialization() {
320        let mut integrity = DataIntegrity::new();
321
322        // Initialize
323        let result = integrity.initialize().await;
324        assert!(result.is_ok());
325        assert!(integrity.is_initialized());
326
327        // Try to initialize again (should fail)
328        let result = integrity.initialize().await;
329        assert!(result.is_err());
330    }
331
332    #[tokio::test]
333    async fn test_data_integrity_shutdown() {
334        let mut integrity = DataIntegrity::new();
335
336        // Initialize first
337        integrity.initialize().await.unwrap();
338        assert!(integrity.is_initialized());
339
340        // Then shutdown
341        let result = integrity.shutdown().await;
342        assert!(result.is_ok());
343        assert!(!integrity.is_initialized());
344    }
345
346    #[tokio::test]
347    async fn test_verify_integrity() {
348        let mut integrity = DataIntegrity::new();
349        integrity.initialize().await.unwrap();
350
351        let data = b"test data";
352        let key = "test_key";
353
354        let result = integrity.verify_integrity(data, key).await.unwrap();
355        assert!(result.is_valid);
356        assert!(result.checksum_valid);
357        assert!(result.version_valid);
358        assert!(!result.corruption_detected);
359    }
360
361    #[tokio::test]
362    async fn test_compute_checksum() {
363        let mut integrity = DataIntegrity::new();
364        integrity.initialize().await.unwrap();
365
366        let data = b"test data";
367        let key = "test_key";
368
369        let checksum = integrity.compute_checksum(data, key).await.unwrap();
370        assert!(!checksum.is_empty());
371    }
372
373    #[tokio::test]
374    async fn test_increment_version() {
375        let mut integrity = DataIntegrity::new();
376        integrity.initialize().await.unwrap();
377
378        let key = "test_key";
379
380        let version1 = integrity.increment_version(key).await.unwrap();
381        let version2 = integrity.increment_version(key).await.unwrap();
382
383        assert_eq!(version1, 1);
384        assert_eq!(version2, 2);
385    }
386
387    #[tokio::test]
388    async fn test_get_stats() {
389        let mut integrity = DataIntegrity::new();
390        integrity.initialize().await.unwrap();
391
392        let data = b"test data";
393        let key = "test_key";
394
395        // Perform some operations
396        integrity.verify_integrity(data, key).await.unwrap();
397        integrity.compute_checksum(data, key).await.unwrap();
398        integrity.increment_version(key).await.unwrap();
399
400        let stats = integrity.get_stats().await;
401        assert!(stats.total_verifications > 0);
402        assert!(stats.successful_verifications > 0);
403    }
404
405    #[tokio::test]
406    async fn test_operations_before_initialization() {
407        let mut integrity = DataIntegrity::new();
408
409        let data = b"test data";
410        let key = "test_key";
411
412        // Operations should fail before initialization
413        assert!(integrity.verify_integrity(data, key).await.is_err());
414        assert!(integrity.compute_checksum(data, key).await.is_err());
415        assert!(integrity.increment_version(key).await.is_err());
416    }
417}