1use crate::event::StreamEvent;
62use anyhow::Result;
63use chrono::{DateTime, Utc};
64use serde::{Deserialize, Serialize};
65use std::collections::{HashMap, VecDeque};
66use std::sync::Arc;
67use std::time::{Duration, Instant};
68use tokio::sync::RwLock;
69use tracing::{debug, info, warn};
70use uuid::Uuid;
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct QualityConfig {
78 pub enable_validation: bool,
80
81 pub enable_profiling: bool,
83
84 pub enable_cleansing: bool,
86
87 pub enable_metrics: bool,
89
90 pub quality_threshold: f64,
92
93 pub enable_alerting: bool,
95
96 pub alert_threshold: f64,
98
99 pub max_failures_before_alert: usize,
101
102 pub profiling_window_size: usize,
104
105 pub enable_sla_tracking: bool,
107
108 pub sla_target: f64,
110
111 pub enable_audit_trail: bool,
113
114 pub max_audit_entries: usize,
116
117 pub allow_null_values: bool,
119
120 pub enable_duplicate_detection: bool,
122
123 pub duplicate_window: Duration,
125}
126
127impl Default for QualityConfig {
128 fn default() -> Self {
129 Self {
130 enable_validation: true,
131 enable_profiling: true,
132 enable_cleansing: false,
133 enable_metrics: true,
134 quality_threshold: 0.95,
135 enable_alerting: true,
136 alert_threshold: 0.90,
137 max_failures_before_alert: 10,
138 profiling_window_size: 1000,
139 enable_sla_tracking: true,
140 sla_target: 0.99,
141 enable_audit_trail: true,
142 max_audit_entries: 10000,
143 allow_null_values: false,
144 enable_duplicate_detection: true,
145 duplicate_window: Duration::from_secs(60),
146 }
147 }
148}
149
150pub struct DataQualityValidator {
152 rules: Arc<RwLock<Vec<ValidationRule>>>,
154
155 profiler: Arc<RwLock<DataProfiler>>,
157
158 cleanser: Arc<RwLock<DataCleanser>>,
160
161 metrics: Arc<RwLock<QualityMetrics>>,
163
164 scorer: Arc<RwLock<QualityScorer>>,
166
167 alert_manager: Arc<RwLock<AlertManager>>,
169
170 audit_trail: Arc<RwLock<AuditTrail>>,
172
173 duplicate_detector: Arc<RwLock<DuplicateDetector>>,
175
176 config: QualityConfig,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
182pub enum ValidationRule {
183 NotNull { field: String },
185
186 Unique { field: String },
188
189 Format { field: String, pattern: String },
191
192 Range { field: String, min: f64, max: f64 },
194
195 Enum {
197 field: String,
198 allowed_values: Vec<String>,
199 },
200
201 MinLength { field: String, min_length: usize },
203
204 MaxLength { field: String, max_length: usize },
206
207 Url { field: String },
209
210 Email { field: String },
212
213 Date { field: String, format: String },
215
216 Custom { name: String, description: String },
218
219 CrossField {
221 name: String,
222 fields: Vec<String>,
223 condition: String,
224 },
225
226 ReferenceIntegrity {
228 field: String,
229 reference_stream: String,
230 reference_field: String,
231 },
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
236pub struct ValidationResult {
237 pub event_id: Uuid,
239
240 pub is_valid: bool,
242
243 pub failures: Vec<ValidationFailure>,
245
246 pub quality_score: f64,
248
249 pub timestamp: DateTime<Utc>,
251
252 pub corrections: Vec<DataCorrection>,
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize)]
258pub struct ValidationFailure {
259 pub rule: String,
261
262 pub field: String,
264
265 pub reason: String,
267
268 pub severity: FailureSeverity,
270
271 pub suggested_fix: Option<String>,
273}
274
275#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
277pub enum FailureSeverity {
278 Low,
280 Medium,
282 High,
284 Critical,
286}
287
288#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct DataCorrection {
291 pub field: String,
293
294 pub original_value: String,
296
297 pub corrected_value: String,
299
300 pub correction_type: CorrectionType,
302
303 pub timestamp: DateTime<Utc>,
305}
306
307#[derive(Debug, Clone, Serialize, Deserialize)]
309pub enum CorrectionType {
310 NullFill,
312 FormatCorrection,
314 OutlierCapping,
316 DuplicateRemoval,
318 Standardization,
320 Custom { name: String },
322}
323
324#[derive(Debug, Clone)]
326pub struct DataProfiler {
327 pub profiles: HashMap<String, FieldProfile>,
329
330 pub window: VecDeque<ProfiledEvent>,
332
333 pub window_size: usize,
335
336 pub stats: ProfileStats,
338}
339
340#[derive(Debug, Clone, Serialize, Deserialize)]
342pub struct FieldProfile {
343 pub field_name: String,
345
346 pub type_distribution: HashMap<String, usize>,
348
349 pub null_count: usize,
351
352 pub unique_count: usize,
354
355 pub min_value: Option<f64>,
357
358 pub max_value: Option<f64>,
360
361 pub mean_value: Option<f64>,
363
364 pub std_dev: Option<f64>,
366
367 pub percentiles: HashMap<String, f64>,
369
370 pub top_values: Vec<(String, usize)>,
372
373 pub patterns: HashMap<String, usize>,
375
376 pub last_updated: DateTime<Utc>,
378}
379
380#[derive(Debug, Clone)]
382pub struct ProfiledEvent {
383 pub event_id: Uuid,
385
386 pub fields: HashMap<String, String>,
388
389 pub timestamp: DateTime<Utc>,
391}
392
393#[derive(Debug, Clone, Default, Serialize, Deserialize)]
395pub struct ProfileStats {
396 pub total_events: u64,
398
399 pub fields_profiled: usize,
401
402 pub total_profiling_time: Duration,
404
405 pub avg_profiling_time: Duration,
407}
408
409#[derive(Debug, Clone)]
411pub struct DataCleanser {
412 pub rules: Vec<CleansingRule>,
414
415 pub stats: CleansingStats,
417}
418
419#[derive(Debug, Clone, Serialize, Deserialize)]
421pub enum CleansingRule {
422 FillNull { field: String, fill_value: String },
424
425 TrimWhitespace { field: String },
427
428 ToLowerCase { field: String },
430
431 ToUpperCase { field: String },
433
434 RemoveDuplicates,
436
437 CapOutliers {
439 field: String,
440 method: OutlierMethod,
441 },
442
443 StandardizeFormat { field: String, format: String },
445
446 Custom { name: String },
448}
449
450#[derive(Debug, Clone, Serialize, Deserialize)]
452pub enum OutlierMethod {
453 Iqr { multiplier: f64 },
455 ZScore { threshold: f64 },
457 Percentile { lower: f64, upper: f64 },
459}
460
461#[derive(Debug, Clone, Default, Serialize, Deserialize)]
463pub struct CleansingStats {
464 pub total_corrections: u64,
466
467 pub corrections_by_type: HashMap<String, u64>,
469
470 pub total_cleansing_time: Duration,
472}
473
474#[derive(Debug, Clone)]
476pub struct QualityScorer {
477 pub dimensions: HashMap<String, QualityDimension>,
479
480 pub weights: HashMap<String, f64>,
482
483 pub stats: ScoringStats,
485}
486
487#[derive(Debug, Clone, Serialize, Deserialize)]
489pub enum QualityDimension {
490 Completeness,
492
493 Accuracy,
495
496 Consistency,
498
499 Timeliness,
501
502 Validity,
504
505 Uniqueness,
507}
508
509#[derive(Debug, Clone, Default, Serialize, Deserialize)]
511pub struct ScoringStats {
512 pub total_events_scored: u64,
514
515 pub avg_quality_score: f64,
517
518 pub min_quality_score: f64,
520
521 pub max_quality_score: f64,
523
524 pub events_below_threshold: u64,
526}
527
528#[derive(Debug, Clone)]
530pub struct AlertManager {
531 pub alerts: Vec<QualityAlert>,
533
534 pub alert_rules: Vec<AlertRule>,
536
537 pub stats: AlertStats,
539}
540
541#[derive(Debug, Clone, Serialize, Deserialize)]
543pub struct QualityAlert {
544 pub id: Uuid,
546
547 pub alert_type: AlertType,
549
550 pub severity: AlertSeverity,
552
553 pub message: String,
555
556 pub triggered_at: DateTime<Utc>,
558
559 pub event_id: Option<Uuid>,
561
562 pub quality_score: f64,
564
565 pub details: HashMap<String, String>,
567}
568
569#[derive(Debug, Clone, Serialize, Deserialize)]
571pub enum AlertType {
572 QualityScoreLow,
574
575 HighFailureRate,
577
578 SlaViolation,
580
581 DataAnomaly,
583
584 ProfileDrift,
586
587 Custom { name: String },
589}
590
591#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
593pub enum AlertSeverity {
594 Info,
596 Warning,
598 Error,
600 Critical,
602}
603
604#[derive(Debug, Clone, Serialize, Deserialize)]
606pub struct AlertRule {
607 pub name: String,
609
610 pub condition: AlertCondition,
612
613 pub severity: AlertSeverity,
615
616 pub enabled: bool,
618}
619
620#[derive(Debug, Clone, Serialize, Deserialize)]
622pub enum AlertCondition {
623 QualityScoreBelow { threshold: f64 },
625
626 FailureRateAbove { threshold: f64 },
628
629 SlaBreached,
631
632 Custom { expression: String },
634}
635
636#[derive(Debug, Clone, Default, Serialize, Deserialize)]
638pub struct AlertStats {
639 pub total_alerts: u64,
641
642 pub alerts_by_type: HashMap<String, u64>,
644
645 pub alerts_by_severity: HashMap<String, u64>,
647
648 pub last_alert_time: Option<DateTime<Utc>>,
650}
651
652#[derive(Debug, Clone)]
654pub struct AuditTrail {
655 pub entries: VecDeque<AuditEntry>,
657
658 pub max_entries: usize,
660
661 pub stats: AuditStats,
663}
664
665#[derive(Debug, Clone, Serialize, Deserialize)]
667pub struct AuditEntry {
668 pub id: Uuid,
670
671 pub timestamp: DateTime<Utc>,
673
674 pub event_id: Uuid,
676
677 pub action: AuditAction,
679
680 pub details: String,
682
683 pub actor: String,
685}
686
687#[derive(Debug, Clone, Serialize, Deserialize)]
689pub enum AuditAction {
690 Validation,
692
693 Cleansing,
695
696 AlertTriggered,
698
699 QualityScoreComputed,
701
702 ProfileUpdated,
704
705 Custom { name: String },
707}
708
709#[derive(Debug, Clone, Default, Serialize, Deserialize)]
711pub struct AuditStats {
712 pub total_entries: u64,
714
715 pub entries_by_action: HashMap<String, u64>,
717
718 pub oldest_entry: Option<DateTime<Utc>>,
720
721 pub newest_entry: Option<DateTime<Utc>>,
723}
724
725#[derive(Debug, Clone)]
727pub struct DuplicateDetector {
728 pub event_hashes: VecDeque<(String, DateTime<Utc>)>,
730
731 pub window: Duration,
733
734 pub stats: DuplicateStats,
736}
737
738#[derive(Debug, Clone, Default, Serialize, Deserialize)]
740pub struct DuplicateStats {
741 pub total_duplicates: u64,
743
744 pub duplicates_removed: u64,
746
747 pub last_duplicate_time: Option<DateTime<Utc>>,
749}
750
751#[derive(Debug, Clone, Default, Serialize, Deserialize)]
753pub struct QualityMetrics {
754 pub total_events_validated: u64,
756
757 pub valid_events: u64,
759
760 pub invalid_events: u64,
762
763 pub validation_rate: f64,
765
766 pub avg_quality_score: f64,
768
769 pub current_quality_score: f64,
771
772 pub sla_compliance: f64,
774
775 pub completeness_score: f64,
777
778 pub accuracy_score: f64,
780
781 pub consistency_score: f64,
783
784 pub timeliness_score: f64,
786
787 pub validity_score: f64,
789
790 pub last_updated: DateTime<Utc>,
792}
793
794impl DataQualityValidator {
795 pub fn new(config: QualityConfig) -> Result<Self> {
797 Ok(Self {
798 rules: Arc::new(RwLock::new(Vec::new())),
799 profiler: Arc::new(RwLock::new(DataProfiler {
800 profiles: HashMap::new(),
801 window: VecDeque::new(),
802 window_size: config.profiling_window_size,
803 stats: ProfileStats::default(),
804 })),
805 cleanser: Arc::new(RwLock::new(DataCleanser {
806 rules: Vec::new(),
807 stats: CleansingStats::default(),
808 })),
809 metrics: Arc::new(RwLock::new(QualityMetrics {
810 last_updated: Utc::now(),
811 ..Default::default()
812 })),
813 scorer: Arc::new(RwLock::new(QualityScorer {
814 dimensions: HashMap::new(),
815 weights: HashMap::new(),
816 stats: ScoringStats::default(),
817 })),
818 alert_manager: Arc::new(RwLock::new(AlertManager {
819 alerts: Vec::new(),
820 alert_rules: Vec::new(),
821 stats: AlertStats::default(),
822 })),
823 audit_trail: Arc::new(RwLock::new(AuditTrail {
824 entries: VecDeque::new(),
825 max_entries: config.max_audit_entries,
826 stats: AuditStats::default(),
827 })),
828 duplicate_detector: Arc::new(RwLock::new(DuplicateDetector {
829 event_hashes: VecDeque::new(),
830 window: config.duplicate_window,
831 stats: DuplicateStats::default(),
832 })),
833 config,
834 })
835 }
836
837 pub async fn add_rule(&mut self, rule: ValidationRule) -> Result<()> {
839 let mut rules = self.rules.write().await;
840 rules.push(rule);
841 info!("Added validation rule");
842 Ok(())
843 }
844
845 pub async fn validate_event(&self, event: &StreamEvent) -> Result<ValidationResult> {
847 let _start_time = Instant::now();
848 let event_id = Uuid::new_v4();
849
850 if self.config.enable_duplicate_detection && self.is_duplicate(event).await? {
852 return Ok(ValidationResult {
853 event_id,
854 is_valid: false,
855 failures: vec![ValidationFailure {
856 rule: "DuplicateDetection".to_string(),
857 field: "event".to_string(),
858 reason: "Duplicate event detected".to_string(),
859 severity: FailureSeverity::Medium,
860 suggested_fix: Some("Skip duplicate event".to_string()),
861 }],
862 quality_score: 0.0,
863 timestamp: Utc::now(),
864 corrections: Vec::new(),
865 });
866 }
867
868 let mut failures = Vec::new();
870 let rules = self.rules.read().await;
871
872 for rule in rules.iter() {
873 if let Some(failure) = self.apply_rule(rule, event).await? {
874 failures.push(failure);
875 }
876 }
877
878 if self.config.enable_profiling {
880 self.profile_event(event).await?;
881 }
882
883 let quality_score = self.compute_quality_score(&failures).await?;
885
886 let corrections = if self.config.enable_cleansing && !failures.is_empty() {
888 self.cleanse_event(event, &failures).await?
889 } else {
890 Vec::new()
891 };
892
893 self.update_metrics(quality_score, failures.is_empty())
895 .await;
896
897 if self.config.enable_audit_trail {
899 self.add_audit_entry(event_id, AuditAction::Validation, &failures)
900 .await?;
901 }
902
903 if self.config.enable_alerting && quality_score < self.config.alert_threshold {
905 self.trigger_alert(event_id, quality_score, &failures)
906 .await?;
907 }
908
909 let is_valid = failures.is_empty();
910
911 Ok(ValidationResult {
912 event_id,
913 is_valid,
914 failures,
915 quality_score,
916 timestamp: Utc::now(),
917 corrections,
918 })
919 }
920
921 async fn apply_rule(
923 &self,
924 rule: &ValidationRule,
925 _event: &StreamEvent,
926 ) -> Result<Option<ValidationFailure>> {
927 match rule {
929 ValidationRule::NotNull { field: _ } => {
930 if self.config.allow_null_values {
932 Ok(None)
933 } else {
934 Ok(None)
936 }
937 }
938 ValidationRule::Format { field, pattern } => {
939 debug!(
941 "Validating format for field {} with pattern {}",
942 field, pattern
943 );
944 Ok(None)
945 }
946 _ => Ok(None),
947 }
948 }
949
950 async fn is_duplicate(&self, event: &StreamEvent) -> Result<bool> {
952 let mut detector = self.duplicate_detector.write().await;
953
954 let event_hash = format!("{:?}", event);
956
957 let cutoff = Utc::now() - chrono::Duration::from_std(detector.window)?;
959 detector.event_hashes.retain(|(_, ts)| ts > &cutoff);
960
961 let is_duplicate = detector
963 .event_hashes
964 .iter()
965 .any(|(hash, _)| hash == &event_hash);
966
967 if is_duplicate {
968 detector.stats.total_duplicates += 1;
969 detector.stats.last_duplicate_time = Some(Utc::now());
970 } else {
971 detector.event_hashes.push_back((event_hash, Utc::now()));
972 }
973
974 Ok(is_duplicate)
975 }
976
977 async fn profile_event(&self, _event: &StreamEvent) -> Result<()> {
979 let mut profiler = self.profiler.write().await;
980 profiler.stats.total_events += 1;
981 Ok(())
982 }
983
984 async fn compute_quality_score(&self, failures: &[ValidationFailure]) -> Result<f64> {
986 if failures.is_empty() {
987 return Ok(1.0);
988 }
989
990 let total_weight: f64 = failures
992 .iter()
993 .map(|f| match f.severity {
994 FailureSeverity::Low => 0.1,
995 FailureSeverity::Medium => 0.3,
996 FailureSeverity::High => 0.6,
997 FailureSeverity::Critical => 1.0,
998 })
999 .sum();
1000
1001 let score = (1.0 - (total_weight / (failures.len() as f64))).max(0.0);
1002
1003 Ok(score)
1004 }
1005
1006 async fn cleanse_event(
1008 &self,
1009 _event: &StreamEvent,
1010 _failures: &[ValidationFailure],
1011 ) -> Result<Vec<DataCorrection>> {
1012 Ok(Vec::new())
1014 }
1015
1016 async fn update_metrics(&self, quality_score: f64, is_valid: bool) {
1018 let mut metrics = self.metrics.write().await;
1019 metrics.total_events_validated += 1;
1020
1021 if is_valid {
1022 metrics.valid_events += 1;
1023 } else {
1024 metrics.invalid_events += 1;
1025 }
1026
1027 metrics.validation_rate =
1028 (metrics.valid_events as f64 / metrics.total_events_validated as f64) * 100.0;
1029
1030 let total =
1032 metrics.avg_quality_score * (metrics.total_events_validated - 1) as f64 + quality_score;
1033 metrics.avg_quality_score = total / metrics.total_events_validated as f64;
1034
1035 metrics.current_quality_score = quality_score;
1036 metrics.last_updated = Utc::now();
1037 }
1038
1039 async fn add_audit_entry(
1041 &self,
1042 event_id: Uuid,
1043 action: AuditAction,
1044 failures: &[ValidationFailure],
1045 ) -> Result<()> {
1046 let mut audit = self.audit_trail.write().await;
1047
1048 let entry = AuditEntry {
1049 id: Uuid::new_v4(),
1050 timestamp: Utc::now(),
1051 event_id,
1052 action,
1053 details: format!("{} validation failures", failures.len()),
1054 actor: "system".to_string(),
1055 };
1056
1057 audit.entries.push_back(entry);
1058
1059 while audit.entries.len() > audit.max_entries {
1061 audit.entries.pop_front();
1062 }
1063
1064 audit.stats.total_entries += 1;
1065
1066 Ok(())
1067 }
1068
1069 async fn trigger_alert(
1071 &self,
1072 event_id: Uuid,
1073 quality_score: f64,
1074 failures: &[ValidationFailure],
1075 ) -> Result<()> {
1076 let mut alert_manager = self.alert_manager.write().await;
1077
1078 let severity = if quality_score < 0.5 {
1079 AlertSeverity::Critical
1080 } else if quality_score < 0.7 {
1081 AlertSeverity::Error
1082 } else if quality_score < 0.9 {
1083 AlertSeverity::Warning
1084 } else {
1085 AlertSeverity::Info
1086 };
1087
1088 let alert = QualityAlert {
1089 id: Uuid::new_v4(),
1090 alert_type: AlertType::QualityScoreLow,
1091 severity,
1092 message: format!(
1093 "Quality score {} below threshold {} ({} failures)",
1094 quality_score,
1095 self.config.alert_threshold,
1096 failures.len()
1097 ),
1098 triggered_at: Utc::now(),
1099 event_id: Some(event_id),
1100 quality_score,
1101 details: HashMap::new(),
1102 };
1103
1104 alert_manager.alerts.push(alert);
1105 alert_manager.stats.total_alerts += 1;
1106 alert_manager.stats.last_alert_time = Some(Utc::now());
1107
1108 warn!(
1109 "Quality alert triggered: score={}, failures={}",
1110 quality_score,
1111 failures.len()
1112 );
1113
1114 Ok(())
1115 }
1116
1117 pub async fn get_metrics(&self) -> QualityMetrics {
1119 self.metrics.read().await.clone()
1120 }
1121
1122 pub async fn get_quality_report(&self) -> QualityReport {
1124 let metrics = self.metrics.read().await.clone();
1125 let profiler_stats = self.profiler.read().await.stats.clone();
1126 let cleanser_stats = self.cleanser.read().await.stats.clone();
1127 let scorer_stats = self.scorer.read().await.stats.clone();
1128 let alert_stats = self.alert_manager.read().await.stats.clone();
1129 let duplicate_stats = self.duplicate_detector.read().await.stats.clone();
1130
1131 QualityReport {
1132 metrics,
1133 profiler_stats,
1134 cleanser_stats,
1135 scorer_stats,
1136 alert_stats,
1137 duplicate_stats,
1138 generated_at: Utc::now(),
1139 }
1140 }
1141}
1142
1143#[derive(Debug, Clone, Serialize, Deserialize)]
1145pub struct QualityReport {
1146 pub metrics: QualityMetrics,
1148
1149 pub profiler_stats: ProfileStats,
1151
1152 pub cleanser_stats: CleansingStats,
1154
1155 pub scorer_stats: ScoringStats,
1157
1158 pub alert_stats: AlertStats,
1160
1161 pub duplicate_stats: DuplicateStats,
1163
1164 pub generated_at: DateTime<Utc>,
1166}
1167
1168#[cfg(test)]
1169mod tests {
1170 use super::*;
1171 use crate::event::EventMetadata;
1172
1173 #[tokio::test]
1174 async fn test_validator_creation() {
1175 let config = QualityConfig::default();
1176 let validator = DataQualityValidator::new(config);
1177 assert!(validator.is_ok());
1178 }
1179
1180 #[tokio::test]
1181 async fn test_add_validation_rule() {
1182 let config = QualityConfig::default();
1183 let mut validator = DataQualityValidator::new(config).unwrap();
1184
1185 let rule = ValidationRule::NotNull {
1186 field: "subject".to_string(),
1187 };
1188
1189 let result = validator.add_rule(rule).await;
1190 assert!(result.is_ok());
1191
1192 let rules = validator.rules.read().await;
1193 assert_eq!(rules.len(), 1);
1194 }
1195
1196 #[tokio::test]
1197 async fn test_validate_event() {
1198 let config = QualityConfig::default();
1199 let validator = DataQualityValidator::new(config).unwrap();
1200
1201 let event = StreamEvent::Heartbeat {
1202 timestamp: Utc::now(),
1203 source: "test".to_string(),
1204 metadata: EventMetadata::default(),
1205 };
1206
1207 let result = validator.validate_event(&event).await;
1208 assert!(result.is_ok());
1209
1210 let validation_result = result.unwrap();
1211 assert!(validation_result.is_valid);
1212 assert_eq!(validation_result.quality_score, 1.0);
1213 }
1214
1215 #[tokio::test]
1216 async fn test_duplicate_detection() {
1217 let config = QualityConfig {
1218 enable_duplicate_detection: true,
1219 ..Default::default()
1220 };
1221 let validator = DataQualityValidator::new(config).unwrap();
1222
1223 let event = StreamEvent::Heartbeat {
1224 timestamp: Utc::now(),
1225 source: "test".to_string(),
1226 metadata: EventMetadata::default(),
1227 };
1228
1229 let result1 = validator.validate_event(&event).await.unwrap();
1231 assert!(result1.is_valid);
1232
1233 let result2 = validator.validate_event(&event).await.unwrap();
1235 assert!(!result2.is_valid);
1236 assert!(!result2.failures.is_empty());
1237 }
1238
1239 #[tokio::test]
1240 async fn test_quality_score_computation() {
1241 let config = QualityConfig::default();
1242 let validator = DataQualityValidator::new(config).unwrap();
1243
1244 let failures = vec![
1245 ValidationFailure {
1246 rule: "NotNull".to_string(),
1247 field: "field1".to_string(),
1248 reason: "Field is null".to_string(),
1249 severity: FailureSeverity::Low,
1250 suggested_fix: None,
1251 },
1252 ValidationFailure {
1253 rule: "Format".to_string(),
1254 field: "field2".to_string(),
1255 reason: "Invalid format".to_string(),
1256 severity: FailureSeverity::High,
1257 suggested_fix: None,
1258 },
1259 ];
1260
1261 let score = validator.compute_quality_score(&failures).await.unwrap();
1262 assert!(score < 1.0);
1263 assert!(score > 0.0);
1264 }
1265
1266 #[tokio::test]
1267 async fn test_metrics_collection() {
1268 let config = QualityConfig::default();
1269 let validator = DataQualityValidator::new(config).unwrap();
1270
1271 let event = StreamEvent::Heartbeat {
1272 timestamp: Utc::now(),
1273 source: "test".to_string(),
1274 metadata: EventMetadata::default(),
1275 };
1276
1277 validator.validate_event(&event).await.unwrap();
1278
1279 let metrics = validator.get_metrics().await;
1280 assert_eq!(metrics.total_events_validated, 1);
1281 assert_eq!(metrics.valid_events, 1);
1282 }
1283
1284 #[tokio::test]
1285 async fn test_audit_trail() {
1286 let config = QualityConfig {
1287 enable_audit_trail: true,
1288 ..Default::default()
1289 };
1290 let validator = DataQualityValidator::new(config).unwrap();
1291
1292 let event_id = Uuid::new_v4();
1293 let failures = vec![];
1294
1295 validator
1296 .add_audit_entry(event_id, AuditAction::Validation, &failures)
1297 .await
1298 .unwrap();
1299
1300 let audit = validator.audit_trail.read().await;
1301 assert_eq!(audit.entries.len(), 1);
1302 assert_eq!(audit.stats.total_entries, 1);
1303 }
1304
1305 #[tokio::test]
1306 async fn test_quality_report() {
1307 let config = QualityConfig::default();
1308 let validator = DataQualityValidator::new(config).unwrap();
1309
1310 let event = StreamEvent::Heartbeat {
1311 timestamp: Utc::now(),
1312 source: "test".to_string(),
1313 metadata: EventMetadata::default(),
1314 };
1315
1316 validator.validate_event(&event).await.unwrap();
1317
1318 let report = validator.get_quality_report().await;
1319 assert_eq!(report.metrics.total_events_validated, 1);
1320 }
1321
1322 #[tokio::test]
1323 async fn test_alert_triggering() {
1324 let config = QualityConfig {
1325 enable_alerting: true,
1326 alert_threshold: 0.8,
1327 ..Default::default()
1328 };
1329 let validator = DataQualityValidator::new(config).unwrap();
1330
1331 let event_id = Uuid::new_v4();
1332 let quality_score = 0.5; let failures = vec![];
1334
1335 validator
1336 .trigger_alert(event_id, quality_score, &failures)
1337 .await
1338 .unwrap();
1339
1340 let alert_manager = validator.alert_manager.read().await;
1341 assert_eq!(alert_manager.alerts.len(), 1);
1342 assert_eq!(alert_manager.stats.total_alerts, 1);
1343 }
1344
1345 #[tokio::test]
1346 async fn test_multiple_validation_rules() {
1347 let config = QualityConfig::default();
1348 let mut validator = DataQualityValidator::new(config).unwrap();
1349
1350 validator
1351 .add_rule(ValidationRule::NotNull {
1352 field: "subject".to_string(),
1353 })
1354 .await
1355 .unwrap();
1356
1357 validator
1358 .add_rule(ValidationRule::Format {
1359 field: "timestamp".to_string(),
1360 pattern: r"^\d{4}-\d{2}-\d{2}".to_string(),
1361 })
1362 .await
1363 .unwrap();
1364
1365 let rules = validator.rules.read().await;
1366 assert_eq!(rules.len(), 2);
1367 }
1368}