1use crate::error::EvalResult;
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, HashSet};
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct MultiTableEvaluation {
15 pub table_consistency: Vec<TableConsistencyResult>,
17 pub cascade_analysis: CascadeAnomalyAnalysis,
19 pub overall_consistency_score: f64,
21 pub total_violations: usize,
23 pub passes: bool,
25 pub issues: Vec<String>,
27}
28
29impl Default for MultiTableEvaluation {
30 fn default() -> Self {
31 Self {
32 table_consistency: Vec::new(),
33 cascade_analysis: CascadeAnomalyAnalysis::default(),
34 overall_consistency_score: 1.0,
35 total_violations: 0,
36 passes: true,
37 issues: Vec::new(),
38 }
39 }
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct TableConsistencyResult {
45 pub source_table: String,
47 pub target_table: String,
49 pub relationship: TableRelationship,
51 pub records_checked: usize,
53 pub matching_records: usize,
55 pub mismatched_records: usize,
57 pub orphaned_source: usize,
59 pub orphaned_target: usize,
61 pub consistency_score: f64,
63 pub violations: Vec<ConsistencyViolation>,
65}
66
67impl Default for TableConsistencyResult {
68 fn default() -> Self {
69 Self {
70 source_table: String::new(),
71 target_table: String::new(),
72 relationship: TableRelationship::OneToMany,
73 records_checked: 0,
74 matching_records: 0,
75 mismatched_records: 0,
76 orphaned_source: 0,
77 orphaned_target: 0,
78 consistency_score: 1.0,
79 violations: Vec::new(),
80 }
81 }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
86pub enum TableRelationship {
87 OneToOne,
89 OneToMany,
91 ManyToMany,
93 Hierarchical,
95 DocumentFlow,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct ConsistencyViolation {
102 pub violation_type: ViolationType,
104 pub source_record_id: String,
106 pub target_record_id: Option<String>,
108 pub description: String,
110 pub severity: u8,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
116pub enum ViolationType {
117 MissingReference,
119 ValueMismatch,
121 OrphanedRecord,
123 CircularReference,
125 AmountInconsistency,
127 DateInconsistency,
129 StatusInconsistency,
131 DocumentChainBreak,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize, Default)]
137pub struct CascadeAnomalyAnalysis {
138 pub total_anomalies: usize,
140 pub anomalies_with_cascades: usize,
142 pub average_cascade_depth: f64,
144 pub max_cascade_depth: usize,
146 pub cascade_paths: Vec<CascadePath>,
148 pub tables_affected: HashMap<String, usize>,
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct CascadePath {
155 pub anomaly_id: String,
157 pub anomaly_type: String,
159 pub source_id: String,
161 pub source_table: String,
163 pub affected_tables: Vec<String>,
165 pub records_affected: usize,
167 pub depth: usize,
169 pub monetary_impact: Option<f64>,
171}
172
173#[derive(Debug, Clone, Default)]
175pub struct MultiTableData {
176 pub tables: HashMap<String, Vec<TableRecord>>,
178 pub relationships: Vec<TableRelationshipDef>,
180 pub anomalies: Vec<AnomalyRecord>,
182}
183
184#[derive(Debug, Clone, Default)]
186pub struct TableRecord {
187 pub id: String,
189 pub table: String,
191 pub references: HashMap<String, String>, pub key_values: HashMap<String, String>,
195 pub amount: Option<f64>,
197 pub date: Option<String>,
199 pub status: Option<String>,
201 pub is_anomalous: bool,
203 pub anomaly_id: Option<String>,
205}
206
207#[derive(Debug, Clone)]
209pub struct TableRelationshipDef {
210 pub source_table: String,
212 pub target_table: String,
214 pub source_key: String,
216 pub target_key: String,
218 pub relationship_type: TableRelationship,
220 pub validate_amounts: bool,
222 pub validate_dates: bool,
224}
225
226#[derive(Debug, Clone)]
228pub struct AnomalyRecord {
229 pub anomaly_id: String,
231 pub anomaly_type: String,
233 pub source_record_id: String,
235 pub source_table: String,
237 pub severity: u8,
239 pub monetary_impact: Option<f64>,
241}
242
243pub struct MultiTableConsistencyEvaluator {
245 min_consistency_score: f64,
247 max_orphan_rate: f64,
249 max_cascade_depth: usize,
251}
252
253impl MultiTableConsistencyEvaluator {
254 pub fn new(min_consistency_score: f64, max_orphan_rate: f64, max_cascade_depth: usize) -> Self {
256 Self {
257 min_consistency_score,
258 max_orphan_rate,
259 max_cascade_depth,
260 }
261 }
262
263 pub fn evaluate(&self, data: &MultiTableData) -> EvalResult<MultiTableEvaluation> {
265 let mut evaluation = MultiTableEvaluation::default();
266
267 for rel_def in &data.relationships {
269 let result = self.evaluate_relationship(data, rel_def);
270 evaluation.table_consistency.push(result);
271 }
272
273 evaluation.cascade_analysis = self.analyze_cascades(data);
275
276 self.calculate_overall_metrics(&mut evaluation);
278
279 Ok(evaluation)
280 }
281
282 fn evaluate_relationship(
284 &self,
285 data: &MultiTableData,
286 rel_def: &TableRelationshipDef,
287 ) -> TableConsistencyResult {
288 let mut result = TableConsistencyResult {
289 source_table: rel_def.source_table.clone(),
290 target_table: rel_def.target_table.clone(),
291 relationship: rel_def.relationship_type.clone(),
292 ..Default::default()
293 };
294
295 let source_records = data.tables.get(&rel_def.source_table);
296 let target_records = data.tables.get(&rel_def.target_table);
297
298 let (source_records, target_records) = match (source_records, target_records) {
299 (Some(s), Some(t)) => (s, t),
300 _ => return result,
301 };
302
303 let target_index: HashMap<_, _> =
305 target_records.iter().map(|r| (r.id.clone(), r)).collect();
306
307 let mut referenced_targets = HashSet::new();
309
310 for source_record in source_records {
311 result.records_checked += 1;
312
313 if let Some(target_id) = source_record.references.get(&rel_def.target_table) {
315 if let Some(target_record) = target_index.get(target_id) {
316 let violations =
318 self.check_value_consistency(source_record, target_record, rel_def);
319 if violations.is_empty() {
320 result.matching_records += 1;
321 } else {
322 result.mismatched_records += 1;
323 result.violations.extend(violations);
324 }
325 referenced_targets.insert(target_id.clone());
326 } else {
327 result.orphaned_source += 1;
329 result.violations.push(ConsistencyViolation {
330 violation_type: ViolationType::MissingReference,
331 source_record_id: source_record.id.clone(),
332 target_record_id: Some(target_id.clone()),
333 description: format!(
334 "Source record {} references non-existent target {}",
335 source_record.id, target_id
336 ),
337 severity: 3,
338 });
339 }
340 }
341 }
342
343 result.orphaned_target = target_records
345 .iter()
346 .filter(|r| !referenced_targets.contains(&r.id))
347 .count();
348
349 let total = result.matching_records + result.mismatched_records + result.orphaned_source;
351 result.consistency_score = if total > 0 {
352 result.matching_records as f64 / total as f64
353 } else {
354 1.0
355 };
356
357 result
358 }
359
360 fn check_value_consistency(
362 &self,
363 source: &TableRecord,
364 target: &TableRecord,
365 rel_def: &TableRelationshipDef,
366 ) -> Vec<ConsistencyViolation> {
367 let mut violations = Vec::new();
368
369 if rel_def.validate_amounts {
371 if let (Some(s_amt), Some(t_amt)) = (source.amount, target.amount) {
372 if (s_amt - t_amt).abs() > 0.01 {
374 violations.push(ConsistencyViolation {
375 violation_type: ViolationType::AmountInconsistency,
376 source_record_id: source.id.clone(),
377 target_record_id: Some(target.id.clone()),
378 description: format!("Amount mismatch: source={}, target={}", s_amt, t_amt),
379 severity: 3,
380 });
381 }
382 }
383 }
384
385 if rel_def.validate_dates {
387 if let (Some(ref s_date), Some(ref t_date)) = (&source.date, &target.date) {
388 if rel_def.relationship_type == TableRelationship::DocumentFlow && t_date < s_date {
390 violations.push(ConsistencyViolation {
391 violation_type: ViolationType::DateInconsistency,
392 source_record_id: source.id.clone(),
393 target_record_id: Some(target.id.clone()),
394 description: format!(
395 "Date inconsistency: target date {} before source date {}",
396 t_date, s_date
397 ),
398 severity: 2,
399 });
400 }
401 }
402 }
403
404 violations
405 }
406
407 fn analyze_cascades(&self, data: &MultiTableData) -> CascadeAnomalyAnalysis {
409 let mut analysis = CascadeAnomalyAnalysis::default();
410 analysis.total_anomalies = data.anomalies.len();
411
412 let mut reverse_refs: HashMap<(String, String), Vec<(String, String)>> = HashMap::new();
414 for (table_name, records) in &data.tables {
415 for record in records {
416 for (ref_table, ref_id) in &record.references {
417 reverse_refs
418 .entry((ref_table.clone(), ref_id.clone()))
419 .or_default()
420 .push((table_name.clone(), record.id.clone()));
421 }
422 }
423 }
424
425 for anomaly in &data.anomalies {
427 let cascade_path = self.trace_cascade(
428 data,
429 &reverse_refs,
430 &anomaly.source_table,
431 &anomaly.source_record_id,
432 &anomaly.anomaly_id,
433 &anomaly.anomaly_type,
434 anomaly.monetary_impact,
435 );
436
437 if cascade_path.depth > 0 {
438 analysis.anomalies_with_cascades += 1;
439
440 for table in &cascade_path.affected_tables {
442 *analysis.tables_affected.entry(table.clone()).or_insert(0) += 1;
443 }
444
445 if cascade_path.depth > analysis.max_cascade_depth {
446 analysis.max_cascade_depth = cascade_path.depth;
447 }
448
449 analysis.cascade_paths.push(cascade_path);
450 }
451 }
452
453 if !analysis.cascade_paths.is_empty() {
455 analysis.average_cascade_depth = analysis
456 .cascade_paths
457 .iter()
458 .map(|p| p.depth as f64)
459 .sum::<f64>()
460 / analysis.cascade_paths.len() as f64;
461 }
462
463 analysis
464 }
465
466 fn trace_cascade(
468 &self,
469 data: &MultiTableData,
470 reverse_refs: &HashMap<(String, String), Vec<(String, String)>>,
471 source_table: &str,
472 source_id: &str,
473 anomaly_id: &str,
474 anomaly_type: &str,
475 monetary_impact: Option<f64>,
476 ) -> CascadePath {
477 let mut path = CascadePath {
478 anomaly_id: anomaly_id.to_string(),
479 anomaly_type: anomaly_type.to_string(),
480 source_id: source_id.to_string(),
481 source_table: source_table.to_string(),
482 affected_tables: Vec::new(),
483 records_affected: 0,
484 depth: 0,
485 monetary_impact,
486 };
487
488 let mut visited = HashSet::new();
489 let mut to_visit = vec![(source_table.to_string(), source_id.to_string(), 0usize)];
490
491 while let Some((table, id, depth)) = to_visit.pop() {
492 if depth > self.max_cascade_depth {
493 continue;
494 }
495 if visited.contains(&(table.clone(), id.clone())) {
496 continue;
497 }
498 visited.insert((table.clone(), id.clone()));
499
500 if let Some(refs) = reverse_refs.get(&(table.clone(), id.clone())) {
502 for (ref_table, ref_id) in refs {
503 if !visited.contains(&(ref_table.clone(), ref_id.clone())) {
504 if !path.affected_tables.contains(ref_table) {
505 path.affected_tables.push(ref_table.clone());
506 }
507 path.records_affected += 1;
508 path.depth = path.depth.max(depth + 1);
509 to_visit.push((ref_table.clone(), ref_id.clone(), depth + 1));
510 }
511 }
512 }
513
514 if let Some(records) = data.tables.get(&table) {
516 if let Some(record) = records.iter().find(|r| r.id == id) {
517 for (ref_table, ref_id) in &record.references {
518 if !visited.contains(&(ref_table.clone(), ref_id.clone())) {
519 if !path.affected_tables.contains(ref_table) {
520 path.affected_tables.push(ref_table.clone());
521 }
522 path.records_affected += 1;
523 path.depth = path.depth.max(depth + 1);
524 to_visit.push((ref_table.clone(), ref_id.clone(), depth + 1));
525 }
526 }
527 }
528 }
529 }
530
531 path
532 }
533
534 fn calculate_overall_metrics(&self, evaluation: &mut MultiTableEvaluation) {
536 let total_records: usize = evaluation
538 .table_consistency
539 .iter()
540 .map(|r| r.records_checked)
541 .sum();
542
543 let total_matching: usize = evaluation
544 .table_consistency
545 .iter()
546 .map(|r| r.matching_records)
547 .sum();
548
549 evaluation.overall_consistency_score = if total_records > 0 {
550 total_matching as f64 / total_records as f64
551 } else {
552 1.0
553 };
554
555 evaluation.total_violations = evaluation
557 .table_consistency
558 .iter()
559 .map(|r| r.violations.len())
560 .sum();
561
562 for result in &evaluation.table_consistency {
564 if result.consistency_score < self.min_consistency_score {
565 evaluation.issues.push(format!(
566 "{}->{}: consistency {:.2}% below threshold {:.2}%",
567 result.source_table,
568 result.target_table,
569 result.consistency_score * 100.0,
570 self.min_consistency_score * 100.0
571 ));
572 }
573
574 let orphan_rate = if result.records_checked > 0 {
575 (result.orphaned_source + result.orphaned_target) as f64
576 / result.records_checked as f64
577 } else {
578 0.0
579 };
580
581 if orphan_rate > self.max_orphan_rate {
582 evaluation.issues.push(format!(
583 "{}->{}: orphan rate {:.2}% exceeds threshold {:.2}%",
584 result.source_table,
585 result.target_table,
586 orphan_rate * 100.0,
587 self.max_orphan_rate * 100.0
588 ));
589 }
590 }
591
592 if evaluation.cascade_analysis.max_cascade_depth > 3 {
594 evaluation.issues.push(format!(
595 "High cascade depth detected: {} tables deep",
596 evaluation.cascade_analysis.max_cascade_depth
597 ));
598 }
599
600 evaluation.passes = evaluation.issues.is_empty()
601 && evaluation.overall_consistency_score >= self.min_consistency_score;
602 }
603}
604
605impl Default for MultiTableConsistencyEvaluator {
606 fn default() -> Self {
607 Self::new(0.95, 0.10, 5) }
609}
610
611pub fn get_p2p_flow_relationships() -> Vec<TableRelationshipDef> {
613 vec![
614 TableRelationshipDef {
615 source_table: "purchase_orders".to_string(),
616 target_table: "goods_receipts".to_string(),
617 source_key: "po_number".to_string(),
618 target_key: "po_number".to_string(),
619 relationship_type: TableRelationship::DocumentFlow,
620 validate_amounts: false,
621 validate_dates: true,
622 },
623 TableRelationshipDef {
624 source_table: "goods_receipts".to_string(),
625 target_table: "vendor_invoices".to_string(),
626 source_key: "gr_number".to_string(),
627 target_key: "gr_number".to_string(),
628 relationship_type: TableRelationship::DocumentFlow,
629 validate_amounts: true,
630 validate_dates: true,
631 },
632 TableRelationshipDef {
633 source_table: "vendor_invoices".to_string(),
634 target_table: "payments".to_string(),
635 source_key: "invoice_number".to_string(),
636 target_key: "invoice_number".to_string(),
637 relationship_type: TableRelationship::DocumentFlow,
638 validate_amounts: true,
639 validate_dates: true,
640 },
641 TableRelationshipDef {
642 source_table: "vendor_invoices".to_string(),
643 target_table: "journal_entries".to_string(),
644 source_key: "invoice_number".to_string(),
645 target_key: "source_document_id".to_string(),
646 relationship_type: TableRelationship::OneToMany,
647 validate_amounts: true,
648 validate_dates: true,
649 },
650 ]
651}
652
653pub fn get_o2c_flow_relationships() -> Vec<TableRelationshipDef> {
655 vec![
656 TableRelationshipDef {
657 source_table: "sales_orders".to_string(),
658 target_table: "deliveries".to_string(),
659 source_key: "so_number".to_string(),
660 target_key: "so_number".to_string(),
661 relationship_type: TableRelationship::DocumentFlow,
662 validate_amounts: false,
663 validate_dates: true,
664 },
665 TableRelationshipDef {
666 source_table: "deliveries".to_string(),
667 target_table: "customer_invoices".to_string(),
668 source_key: "delivery_number".to_string(),
669 target_key: "delivery_number".to_string(),
670 relationship_type: TableRelationship::DocumentFlow,
671 validate_amounts: true,
672 validate_dates: true,
673 },
674 TableRelationshipDef {
675 source_table: "customer_invoices".to_string(),
676 target_table: "customer_receipts".to_string(),
677 source_key: "invoice_number".to_string(),
678 target_key: "invoice_number".to_string(),
679 relationship_type: TableRelationship::DocumentFlow,
680 validate_amounts: true,
681 validate_dates: true,
682 },
683 TableRelationshipDef {
684 source_table: "customer_invoices".to_string(),
685 target_table: "journal_entries".to_string(),
686 source_key: "invoice_number".to_string(),
687 target_key: "source_document_id".to_string(),
688 relationship_type: TableRelationship::OneToMany,
689 validate_amounts: true,
690 validate_dates: true,
691 },
692 ]
693}
694
695#[cfg(test)]
696mod tests {
697 use super::*;
698
699 fn create_test_data() -> MultiTableData {
700 let mut data = MultiTableData::default();
701
702 let mut po1 = TableRecord::default();
704 po1.id = "PO001".to_string();
705 po1.table = "purchase_orders".to_string();
706 po1.amount = Some(1000.0);
707 po1.date = Some("2024-01-01".to_string());
708
709 let mut po2 = TableRecord::default();
710 po2.id = "PO002".to_string();
711 po2.table = "purchase_orders".to_string();
712 po2.amount = Some(2000.0);
713 po2.date = Some("2024-01-02".to_string());
714 po2.is_anomalous = true;
715 po2.anomaly_id = Some("ANO001".to_string());
716
717 data.tables
718 .insert("purchase_orders".to_string(), vec![po1, po2]);
719
720 let mut gr1 = TableRecord::default();
722 gr1.id = "GR001".to_string();
723 gr1.table = "goods_receipts".to_string();
724 gr1.references
725 .insert("purchase_orders".to_string(), "PO001".to_string());
726 gr1.amount = Some(1000.0);
727 gr1.date = Some("2024-01-05".to_string());
728
729 let mut gr2 = TableRecord::default();
730 gr2.id = "GR002".to_string();
731 gr2.table = "goods_receipts".to_string();
732 gr2.references
733 .insert("purchase_orders".to_string(), "PO002".to_string());
734 gr2.amount = Some(2000.0);
735 gr2.date = Some("2024-01-06".to_string());
736
737 data.tables
738 .insert("goods_receipts".to_string(), vec![gr1, gr2]);
739
740 let mut inv1 = TableRecord::default();
742 inv1.id = "INV001".to_string();
743 inv1.table = "vendor_invoices".to_string();
744 inv1.references
745 .insert("goods_receipts".to_string(), "GR001".to_string());
746 inv1.amount = Some(1000.0);
747 inv1.date = Some("2024-01-10".to_string());
748
749 data.tables
750 .insert("vendor_invoices".to_string(), vec![inv1]);
751
752 data.relationships = vec![
754 TableRelationshipDef {
755 source_table: "goods_receipts".to_string(),
756 target_table: "purchase_orders".to_string(),
757 source_key: "po_number".to_string(),
758 target_key: "id".to_string(),
759 relationship_type: TableRelationship::DocumentFlow,
760 validate_amounts: true,
761 validate_dates: true,
762 },
763 TableRelationshipDef {
764 source_table: "vendor_invoices".to_string(),
765 target_table: "goods_receipts".to_string(),
766 source_key: "gr_number".to_string(),
767 target_key: "id".to_string(),
768 relationship_type: TableRelationship::DocumentFlow,
769 validate_amounts: true,
770 validate_dates: true,
771 },
772 ];
773
774 data.anomalies.push(AnomalyRecord {
776 anomaly_id: "ANO001".to_string(),
777 anomaly_type: "Fraud".to_string(),
778 source_record_id: "PO002".to_string(),
779 source_table: "purchase_orders".to_string(),
780 severity: 4,
781 monetary_impact: Some(2000.0),
782 });
783
784 data
785 }
786
787 #[test]
788 fn test_basic_consistency_evaluation() {
789 let data = create_test_data();
790 let evaluator = MultiTableConsistencyEvaluator::default();
791 let result = evaluator.evaluate(&data).unwrap();
792
793 assert_eq!(result.table_consistency.len(), 2);
794 for table_result in &result.table_consistency {
798 println!(
800 "{}->{}: checked={}, matching={}, orphaned_source={}",
801 table_result.source_table,
802 table_result.target_table,
803 table_result.records_checked,
804 table_result.matching_records,
805 table_result.orphaned_source
806 );
807 }
808 }
809
810 #[test]
811 fn test_cascade_analysis() {
812 let data = create_test_data();
813 let evaluator = MultiTableConsistencyEvaluator::default();
814 let result = evaluator.evaluate(&data).unwrap();
815
816 assert_eq!(result.cascade_analysis.total_anomalies, 1);
817 }
818
819 #[test]
820 fn test_empty_data() {
821 let data = MultiTableData::default();
822 let evaluator = MultiTableConsistencyEvaluator::default();
823 let result = evaluator.evaluate(&data).unwrap();
824
825 assert!(result.passes);
826 assert_eq!(result.total_violations, 0);
827 }
828
829 #[test]
830 fn test_missing_reference() {
831 let mut data = MultiTableData::default();
832
833 let mut inv = TableRecord::default();
835 inv.id = "INV001".to_string();
836 inv.table = "vendor_invoices".to_string();
837 inv.references
838 .insert("goods_receipts".to_string(), "GR999".to_string()); inv.amount = Some(1000.0);
840
841 data.tables.insert("vendor_invoices".to_string(), vec![inv]);
842 data.tables.insert("goods_receipts".to_string(), Vec::new());
843
844 data.relationships.push(TableRelationshipDef {
845 source_table: "vendor_invoices".to_string(),
846 target_table: "goods_receipts".to_string(),
847 source_key: "gr_number".to_string(),
848 target_key: "id".to_string(),
849 relationship_type: TableRelationship::DocumentFlow,
850 validate_amounts: true,
851 validate_dates: false,
852 });
853
854 let evaluator = MultiTableConsistencyEvaluator::default();
855 let result = evaluator.evaluate(&data).unwrap();
856
857 assert!(!result.table_consistency.is_empty());
858 let inv_gr = &result.table_consistency[0];
859 assert_eq!(inv_gr.orphaned_source, 1);
860 assert_eq!(inv_gr.violations.len(), 1);
861 assert_eq!(
862 inv_gr.violations[0].violation_type,
863 ViolationType::MissingReference
864 );
865 }
866}