1use crate::error::EvalResult;
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, HashSet};
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct MultiTableEvaluation {
15 pub table_consistency: Vec<TableConsistencyResult>,
17 pub cascade_analysis: CascadeAnomalyAnalysis,
19 pub overall_consistency_score: f64,
21 pub total_violations: usize,
23 pub passes: bool,
25 pub issues: Vec<String>,
27}
28
29impl Default for MultiTableEvaluation {
30 fn default() -> Self {
31 Self {
32 table_consistency: Vec::new(),
33 cascade_analysis: CascadeAnomalyAnalysis::default(),
34 overall_consistency_score: 1.0,
35 total_violations: 0,
36 passes: true,
37 issues: Vec::new(),
38 }
39 }
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct TableConsistencyResult {
45 pub source_table: String,
47 pub target_table: String,
49 pub relationship: TableRelationship,
51 pub records_checked: usize,
53 pub matching_records: usize,
55 pub mismatched_records: usize,
57 pub orphaned_source: usize,
59 pub orphaned_target: usize,
61 pub consistency_score: f64,
63 pub violations: Vec<ConsistencyViolation>,
65}
66
67impl Default for TableConsistencyResult {
68 fn default() -> Self {
69 Self {
70 source_table: String::new(),
71 target_table: String::new(),
72 relationship: TableRelationship::OneToMany,
73 records_checked: 0,
74 matching_records: 0,
75 mismatched_records: 0,
76 orphaned_source: 0,
77 orphaned_target: 0,
78 consistency_score: 1.0,
79 violations: Vec::new(),
80 }
81 }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
86pub enum TableRelationship {
87 OneToOne,
89 OneToMany,
91 ManyToMany,
93 Hierarchical,
95 DocumentFlow,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct ConsistencyViolation {
102 pub violation_type: ViolationType,
104 pub source_record_id: String,
106 pub target_record_id: Option<String>,
108 pub description: String,
110 pub severity: u8,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
116pub enum ViolationType {
117 MissingReference,
119 ValueMismatch,
121 OrphanedRecord,
123 CircularReference,
125 AmountInconsistency,
127 DateInconsistency,
129 StatusInconsistency,
131 DocumentChainBreak,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize, Default)]
137pub struct CascadeAnomalyAnalysis {
138 pub total_anomalies: usize,
140 pub anomalies_with_cascades: usize,
142 pub average_cascade_depth: f64,
144 pub max_cascade_depth: usize,
146 pub cascade_paths: Vec<CascadePath>,
148 pub tables_affected: HashMap<String, usize>,
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct CascadePath {
155 pub anomaly_id: String,
157 pub anomaly_type: String,
159 pub source_id: String,
161 pub source_table: String,
163 pub affected_tables: Vec<String>,
165 pub records_affected: usize,
167 pub depth: usize,
169 pub monetary_impact: Option<f64>,
171}
172
173#[derive(Debug, Clone, Default)]
175pub struct MultiTableData {
176 pub tables: HashMap<String, Vec<TableRecord>>,
178 pub relationships: Vec<TableRelationshipDef>,
180 pub anomalies: Vec<AnomalyRecord>,
182}
183
184#[derive(Debug, Clone, Default)]
186pub struct TableRecord {
187 pub id: String,
189 pub table: String,
191 pub references: HashMap<String, String>, pub key_values: HashMap<String, String>,
195 pub amount: Option<f64>,
197 pub date: Option<String>,
199 pub status: Option<String>,
201 pub is_anomalous: bool,
203 pub anomaly_id: Option<String>,
205}
206
207#[derive(Debug, Clone)]
209pub struct TableRelationshipDef {
210 pub source_table: String,
212 pub target_table: String,
214 pub source_key: String,
216 pub target_key: String,
218 pub relationship_type: TableRelationship,
220 pub validate_amounts: bool,
222 pub validate_dates: bool,
224}
225
226#[derive(Debug, Clone)]
228pub struct AnomalyRecord {
229 pub anomaly_id: String,
231 pub anomaly_type: String,
233 pub source_record_id: String,
235 pub source_table: String,
237 pub severity: u8,
239 pub monetary_impact: Option<f64>,
241}
242
243pub struct MultiTableConsistencyEvaluator {
245 min_consistency_score: f64,
247 max_orphan_rate: f64,
249 max_cascade_depth: usize,
251}
252
253impl MultiTableConsistencyEvaluator {
254 pub fn new(min_consistency_score: f64, max_orphan_rate: f64, max_cascade_depth: usize) -> Self {
256 Self {
257 min_consistency_score,
258 max_orphan_rate,
259 max_cascade_depth,
260 }
261 }
262
263 pub fn evaluate(&self, data: &MultiTableData) -> EvalResult<MultiTableEvaluation> {
265 let mut evaluation = MultiTableEvaluation::default();
266
267 for rel_def in &data.relationships {
269 let result = self.evaluate_relationship(data, rel_def);
270 evaluation.table_consistency.push(result);
271 }
272
273 evaluation.cascade_analysis = self.analyze_cascades(data);
275
276 self.calculate_overall_metrics(&mut evaluation);
278
279 Ok(evaluation)
280 }
281
282 fn evaluate_relationship(
284 &self,
285 data: &MultiTableData,
286 rel_def: &TableRelationshipDef,
287 ) -> TableConsistencyResult {
288 let mut result = TableConsistencyResult {
289 source_table: rel_def.source_table.clone(),
290 target_table: rel_def.target_table.clone(),
291 relationship: rel_def.relationship_type.clone(),
292 ..Default::default()
293 };
294
295 let source_records = data.tables.get(&rel_def.source_table);
296 let target_records = data.tables.get(&rel_def.target_table);
297
298 let (source_records, target_records) = match (source_records, target_records) {
299 (Some(s), Some(t)) => (s, t),
300 _ => return result,
301 };
302
303 let target_index: HashMap<_, _> =
305 target_records.iter().map(|r| (r.id.clone(), r)).collect();
306
307 let mut referenced_targets = HashSet::new();
309
310 for source_record in source_records {
311 result.records_checked += 1;
312
313 if let Some(target_id) = source_record.references.get(&rel_def.target_table) {
315 if let Some(target_record) = target_index.get(target_id) {
316 let violations =
318 self.check_value_consistency(source_record, target_record, rel_def);
319 if violations.is_empty() {
320 result.matching_records += 1;
321 } else {
322 result.mismatched_records += 1;
323 result.violations.extend(violations);
324 }
325 referenced_targets.insert(target_id.clone());
326 } else {
327 result.orphaned_source += 1;
329 result.violations.push(ConsistencyViolation {
330 violation_type: ViolationType::MissingReference,
331 source_record_id: source_record.id.clone(),
332 target_record_id: Some(target_id.clone()),
333 description: format!(
334 "Source record {} references non-existent target {}",
335 source_record.id, target_id
336 ),
337 severity: 3,
338 });
339 }
340 }
341 }
342
343 result.orphaned_target = target_records
345 .iter()
346 .filter(|r| !referenced_targets.contains(&r.id))
347 .count();
348
349 let total = result.matching_records + result.mismatched_records + result.orphaned_source;
351 result.consistency_score = if total > 0 {
352 result.matching_records as f64 / total as f64
353 } else {
354 1.0
355 };
356
357 result
358 }
359
360 fn check_value_consistency(
362 &self,
363 source: &TableRecord,
364 target: &TableRecord,
365 rel_def: &TableRelationshipDef,
366 ) -> Vec<ConsistencyViolation> {
367 let mut violations = Vec::new();
368
369 if rel_def.validate_amounts {
371 if let (Some(s_amt), Some(t_amt)) = (source.amount, target.amount) {
372 if (s_amt - t_amt).abs() > 0.01 {
374 violations.push(ConsistencyViolation {
375 violation_type: ViolationType::AmountInconsistency,
376 source_record_id: source.id.clone(),
377 target_record_id: Some(target.id.clone()),
378 description: format!("Amount mismatch: source={s_amt}, target={t_amt}"),
379 severity: 3,
380 });
381 }
382 }
383 }
384
385 if rel_def.validate_dates {
387 if let (Some(ref s_date), Some(ref t_date)) = (&source.date, &target.date) {
388 if rel_def.relationship_type == TableRelationship::DocumentFlow && t_date < s_date {
390 violations.push(ConsistencyViolation {
391 violation_type: ViolationType::DateInconsistency,
392 source_record_id: source.id.clone(),
393 target_record_id: Some(target.id.clone()),
394 description: format!(
395 "Date inconsistency: target date {t_date} before source date {s_date}"
396 ),
397 severity: 2,
398 });
399 }
400 }
401 }
402
403 violations
404 }
405
406 fn analyze_cascades(&self, data: &MultiTableData) -> CascadeAnomalyAnalysis {
408 let mut analysis = CascadeAnomalyAnalysis::default();
409 analysis.total_anomalies = data.anomalies.len();
410
411 let mut reverse_refs: HashMap<(String, String), Vec<(String, String)>> = HashMap::new();
413 for (table_name, records) in &data.tables {
414 for record in records {
415 for (ref_table, ref_id) in &record.references {
416 reverse_refs
417 .entry((ref_table.clone(), ref_id.clone()))
418 .or_default()
419 .push((table_name.clone(), record.id.clone()));
420 }
421 }
422 }
423
424 for anomaly in &data.anomalies {
426 let cascade_path = self.trace_cascade(
427 data,
428 &reverse_refs,
429 &anomaly.source_table,
430 &anomaly.source_record_id,
431 &anomaly.anomaly_id,
432 &anomaly.anomaly_type,
433 anomaly.monetary_impact,
434 );
435
436 if cascade_path.depth > 0 {
437 analysis.anomalies_with_cascades += 1;
438
439 for table in &cascade_path.affected_tables {
441 *analysis.tables_affected.entry(table.clone()).or_insert(0) += 1;
442 }
443
444 if cascade_path.depth > analysis.max_cascade_depth {
445 analysis.max_cascade_depth = cascade_path.depth;
446 }
447
448 analysis.cascade_paths.push(cascade_path);
449 }
450 }
451
452 if !analysis.cascade_paths.is_empty() {
454 analysis.average_cascade_depth = analysis
455 .cascade_paths
456 .iter()
457 .map(|p| p.depth as f64)
458 .sum::<f64>()
459 / analysis.cascade_paths.len() as f64;
460 }
461
462 analysis
463 }
464
465 fn trace_cascade(
467 &self,
468 data: &MultiTableData,
469 reverse_refs: &HashMap<(String, String), Vec<(String, String)>>,
470 source_table: &str,
471 source_id: &str,
472 anomaly_id: &str,
473 anomaly_type: &str,
474 monetary_impact: Option<f64>,
475 ) -> CascadePath {
476 let mut path = CascadePath {
477 anomaly_id: anomaly_id.to_string(),
478 anomaly_type: anomaly_type.to_string(),
479 source_id: source_id.to_string(),
480 source_table: source_table.to_string(),
481 affected_tables: Vec::new(),
482 records_affected: 0,
483 depth: 0,
484 monetary_impact,
485 };
486
487 let mut visited = HashSet::new();
488 let mut to_visit = vec![(source_table.to_string(), source_id.to_string(), 0usize)];
489
490 while let Some((table, id, depth)) = to_visit.pop() {
491 if depth > self.max_cascade_depth {
492 continue;
493 }
494 if visited.contains(&(table.clone(), id.clone())) {
495 continue;
496 }
497 visited.insert((table.clone(), id.clone()));
498
499 if let Some(refs) = reverse_refs.get(&(table.clone(), id.clone())) {
501 for (ref_table, ref_id) in refs {
502 if !visited.contains(&(ref_table.clone(), ref_id.clone())) {
503 if !path.affected_tables.contains(ref_table) {
504 path.affected_tables.push(ref_table.clone());
505 }
506 path.records_affected += 1;
507 path.depth = path.depth.max(depth + 1);
508 to_visit.push((ref_table.clone(), ref_id.clone(), depth + 1));
509 }
510 }
511 }
512
513 if let Some(records) = data.tables.get(&table) {
515 if let Some(record) = records.iter().find(|r| r.id == id) {
516 for (ref_table, ref_id) in &record.references {
517 if !visited.contains(&(ref_table.clone(), ref_id.clone())) {
518 if !path.affected_tables.contains(ref_table) {
519 path.affected_tables.push(ref_table.clone());
520 }
521 path.records_affected += 1;
522 path.depth = path.depth.max(depth + 1);
523 to_visit.push((ref_table.clone(), ref_id.clone(), depth + 1));
524 }
525 }
526 }
527 }
528 }
529
530 path
531 }
532
533 fn calculate_overall_metrics(&self, evaluation: &mut MultiTableEvaluation) {
535 let total_records: usize = evaluation
537 .table_consistency
538 .iter()
539 .map(|r| r.records_checked)
540 .sum();
541
542 let total_matching: usize = evaluation
543 .table_consistency
544 .iter()
545 .map(|r| r.matching_records)
546 .sum();
547
548 evaluation.overall_consistency_score = if total_records > 0 {
549 total_matching as f64 / total_records as f64
550 } else {
551 1.0
552 };
553
554 evaluation.total_violations = evaluation
556 .table_consistency
557 .iter()
558 .map(|r| r.violations.len())
559 .sum();
560
561 for result in &evaluation.table_consistency {
563 if result.consistency_score < self.min_consistency_score {
564 evaluation.issues.push(format!(
565 "{}->{}: consistency {:.2}% below threshold {:.2}%",
566 result.source_table,
567 result.target_table,
568 result.consistency_score * 100.0,
569 self.min_consistency_score * 100.0
570 ));
571 }
572
573 let orphan_rate = if result.records_checked > 0 {
574 (result.orphaned_source + result.orphaned_target) as f64
575 / result.records_checked as f64
576 } else {
577 0.0
578 };
579
580 if orphan_rate > self.max_orphan_rate {
581 evaluation.issues.push(format!(
582 "{}->{}: orphan rate {:.2}% exceeds threshold {:.2}%",
583 result.source_table,
584 result.target_table,
585 orphan_rate * 100.0,
586 self.max_orphan_rate * 100.0
587 ));
588 }
589 }
590
591 if evaluation.cascade_analysis.max_cascade_depth > 3 {
593 evaluation.issues.push(format!(
594 "High cascade depth detected: {} tables deep",
595 evaluation.cascade_analysis.max_cascade_depth
596 ));
597 }
598
599 evaluation.passes = evaluation.issues.is_empty()
600 && evaluation.overall_consistency_score >= self.min_consistency_score;
601 }
602}
603
604impl Default for MultiTableConsistencyEvaluator {
605 fn default() -> Self {
606 Self::new(0.95, 0.10, 5) }
608}
609
610pub fn get_p2p_flow_relationships() -> Vec<TableRelationshipDef> {
612 vec![
613 TableRelationshipDef {
614 source_table: "purchase_orders".to_string(),
615 target_table: "goods_receipts".to_string(),
616 source_key: "po_number".to_string(),
617 target_key: "po_number".to_string(),
618 relationship_type: TableRelationship::DocumentFlow,
619 validate_amounts: false,
620 validate_dates: true,
621 },
622 TableRelationshipDef {
623 source_table: "goods_receipts".to_string(),
624 target_table: "vendor_invoices".to_string(),
625 source_key: "gr_number".to_string(),
626 target_key: "gr_number".to_string(),
627 relationship_type: TableRelationship::DocumentFlow,
628 validate_amounts: true,
629 validate_dates: true,
630 },
631 TableRelationshipDef {
632 source_table: "vendor_invoices".to_string(),
633 target_table: "payments".to_string(),
634 source_key: "invoice_number".to_string(),
635 target_key: "invoice_number".to_string(),
636 relationship_type: TableRelationship::DocumentFlow,
637 validate_amounts: true,
638 validate_dates: true,
639 },
640 TableRelationshipDef {
641 source_table: "vendor_invoices".to_string(),
642 target_table: "journal_entries".to_string(),
643 source_key: "invoice_number".to_string(),
644 target_key: "source_document_id".to_string(),
645 relationship_type: TableRelationship::OneToMany,
646 validate_amounts: true,
647 validate_dates: true,
648 },
649 ]
650}
651
652pub fn get_o2c_flow_relationships() -> Vec<TableRelationshipDef> {
654 vec![
655 TableRelationshipDef {
656 source_table: "sales_orders".to_string(),
657 target_table: "deliveries".to_string(),
658 source_key: "so_number".to_string(),
659 target_key: "so_number".to_string(),
660 relationship_type: TableRelationship::DocumentFlow,
661 validate_amounts: false,
662 validate_dates: true,
663 },
664 TableRelationshipDef {
665 source_table: "deliveries".to_string(),
666 target_table: "customer_invoices".to_string(),
667 source_key: "delivery_number".to_string(),
668 target_key: "delivery_number".to_string(),
669 relationship_type: TableRelationship::DocumentFlow,
670 validate_amounts: true,
671 validate_dates: true,
672 },
673 TableRelationshipDef {
674 source_table: "customer_invoices".to_string(),
675 target_table: "customer_receipts".to_string(),
676 source_key: "invoice_number".to_string(),
677 target_key: "invoice_number".to_string(),
678 relationship_type: TableRelationship::DocumentFlow,
679 validate_amounts: true,
680 validate_dates: true,
681 },
682 TableRelationshipDef {
683 source_table: "customer_invoices".to_string(),
684 target_table: "journal_entries".to_string(),
685 source_key: "invoice_number".to_string(),
686 target_key: "source_document_id".to_string(),
687 relationship_type: TableRelationship::OneToMany,
688 validate_amounts: true,
689 validate_dates: true,
690 },
691 ]
692}
693
694#[cfg(test)]
695#[allow(clippy::unwrap_used)]
696mod tests {
697 use super::*;
698
699 fn create_test_data() -> MultiTableData {
700 let mut data = MultiTableData::default();
701
702 let mut po1 = TableRecord::default();
704 po1.id = "PO001".to_string();
705 po1.table = "purchase_orders".to_string();
706 po1.amount = Some(1000.0);
707 po1.date = Some("2024-01-01".to_string());
708
709 let mut po2 = TableRecord::default();
710 po2.id = "PO002".to_string();
711 po2.table = "purchase_orders".to_string();
712 po2.amount = Some(2000.0);
713 po2.date = Some("2024-01-02".to_string());
714 po2.is_anomalous = true;
715 po2.anomaly_id = Some("ANO001".to_string());
716
717 data.tables
718 .insert("purchase_orders".to_string(), vec![po1, po2]);
719
720 let mut gr1 = TableRecord::default();
722 gr1.id = "GR001".to_string();
723 gr1.table = "goods_receipts".to_string();
724 gr1.references
725 .insert("purchase_orders".to_string(), "PO001".to_string());
726 gr1.amount = Some(1000.0);
727 gr1.date = Some("2024-01-05".to_string());
728
729 let mut gr2 = TableRecord::default();
730 gr2.id = "GR002".to_string();
731 gr2.table = "goods_receipts".to_string();
732 gr2.references
733 .insert("purchase_orders".to_string(), "PO002".to_string());
734 gr2.amount = Some(2000.0);
735 gr2.date = Some("2024-01-06".to_string());
736
737 data.tables
738 .insert("goods_receipts".to_string(), vec![gr1, gr2]);
739
740 let mut inv1 = TableRecord::default();
742 inv1.id = "INV001".to_string();
743 inv1.table = "vendor_invoices".to_string();
744 inv1.references
745 .insert("goods_receipts".to_string(), "GR001".to_string());
746 inv1.amount = Some(1000.0);
747 inv1.date = Some("2024-01-10".to_string());
748
749 data.tables
750 .insert("vendor_invoices".to_string(), vec![inv1]);
751
752 data.relationships = vec![
754 TableRelationshipDef {
755 source_table: "goods_receipts".to_string(),
756 target_table: "purchase_orders".to_string(),
757 source_key: "po_number".to_string(),
758 target_key: "id".to_string(),
759 relationship_type: TableRelationship::DocumentFlow,
760 validate_amounts: true,
761 validate_dates: true,
762 },
763 TableRelationshipDef {
764 source_table: "vendor_invoices".to_string(),
765 target_table: "goods_receipts".to_string(),
766 source_key: "gr_number".to_string(),
767 target_key: "id".to_string(),
768 relationship_type: TableRelationship::DocumentFlow,
769 validate_amounts: true,
770 validate_dates: true,
771 },
772 ];
773
774 data.anomalies.push(AnomalyRecord {
776 anomaly_id: "ANO001".to_string(),
777 anomaly_type: "Fraud".to_string(),
778 source_record_id: "PO002".to_string(),
779 source_table: "purchase_orders".to_string(),
780 severity: 4,
781 monetary_impact: Some(2000.0),
782 });
783
784 data
785 }
786
787 #[test]
788 fn test_basic_consistency_evaluation() {
789 let data = create_test_data();
790 let evaluator = MultiTableConsistencyEvaluator::default();
791 let result = evaluator.evaluate(&data).unwrap();
792
793 assert_eq!(result.table_consistency.len(), 2);
794 for table_result in &result.table_consistency {
798 println!(
800 "{}->{}: checked={}, matching={}, orphaned_source={}",
801 table_result.source_table,
802 table_result.target_table,
803 table_result.records_checked,
804 table_result.matching_records,
805 table_result.orphaned_source
806 );
807 }
808 }
809
810 #[test]
811 fn test_cascade_analysis() {
812 let data = create_test_data();
813 let evaluator = MultiTableConsistencyEvaluator::default();
814 let result = evaluator.evaluate(&data).unwrap();
815
816 assert_eq!(result.cascade_analysis.total_anomalies, 1);
817 }
818
819 #[test]
820 fn test_empty_data() {
821 let data = MultiTableData::default();
822 let evaluator = MultiTableConsistencyEvaluator::default();
823 let result = evaluator.evaluate(&data).unwrap();
824
825 assert!(result.passes);
826 assert_eq!(result.total_violations, 0);
827 }
828
829 #[test]
830 fn test_missing_reference() {
831 let mut data = MultiTableData::default();
832
833 let mut inv = TableRecord::default();
835 inv.id = "INV001".to_string();
836 inv.table = "vendor_invoices".to_string();
837 inv.references
838 .insert("goods_receipts".to_string(), "GR999".to_string()); inv.amount = Some(1000.0);
840
841 data.tables.insert("vendor_invoices".to_string(), vec![inv]);
842 data.tables.insert("goods_receipts".to_string(), Vec::new());
843
844 data.relationships.push(TableRelationshipDef {
845 source_table: "vendor_invoices".to_string(),
846 target_table: "goods_receipts".to_string(),
847 source_key: "gr_number".to_string(),
848 target_key: "id".to_string(),
849 relationship_type: TableRelationship::DocumentFlow,
850 validate_amounts: true,
851 validate_dates: false,
852 });
853
854 let evaluator = MultiTableConsistencyEvaluator::default();
855 let result = evaluator.evaluate(&data).unwrap();
856
857 assert!(!result.table_consistency.is_empty());
858 let inv_gr = &result.table_consistency[0];
859 assert_eq!(inv_gr.orphaned_source, 1);
860 assert_eq!(inv_gr.violations.len(), 1);
861 assert_eq!(
862 inv_gr.violations[0].violation_type,
863 ViolationType::MissingReference
864 );
865 }
866}