use crate::error::EvalResult;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MultiTableEvaluation {
pub table_consistency: Vec<TableConsistencyResult>,
pub cascade_analysis: CascadeAnomalyAnalysis,
pub overall_consistency_score: f64,
pub total_violations: usize,
pub passes: bool,
pub issues: Vec<String>,
}
impl Default for MultiTableEvaluation {
fn default() -> Self {
Self {
table_consistency: Vec::new(),
cascade_analysis: CascadeAnomalyAnalysis::default(),
overall_consistency_score: 1.0,
total_violations: 0,
passes: true,
issues: Vec::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TableConsistencyResult {
pub source_table: String,
pub target_table: String,
pub relationship: TableRelationship,
pub records_checked: usize,
pub matching_records: usize,
pub mismatched_records: usize,
pub orphaned_source: usize,
pub orphaned_target: usize,
pub consistency_score: f64,
pub violations: Vec<ConsistencyViolation>,
}
impl Default for TableConsistencyResult {
fn default() -> Self {
Self {
source_table: String::new(),
target_table: String::new(),
relationship: TableRelationship::OneToMany,
records_checked: 0,
matching_records: 0,
mismatched_records: 0,
orphaned_source: 0,
orphaned_target: 0,
consistency_score: 1.0,
violations: Vec::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum TableRelationship {
OneToOne,
OneToMany,
ManyToMany,
Hierarchical,
DocumentFlow,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsistencyViolation {
pub violation_type: ViolationType,
pub source_record_id: String,
pub target_record_id: Option<String>,
pub description: String,
pub severity: u8,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ViolationType {
MissingReference,
ValueMismatch,
OrphanedRecord,
CircularReference,
AmountInconsistency,
DateInconsistency,
StatusInconsistency,
DocumentChainBreak,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct CascadeAnomalyAnalysis {
pub total_anomalies: usize,
pub anomalies_with_cascades: usize,
pub average_cascade_depth: f64,
pub max_cascade_depth: usize,
pub cascade_paths: Vec<CascadePath>,
pub tables_affected: HashMap<String, usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CascadePath {
pub anomaly_id: String,
pub anomaly_type: String,
pub source_id: String,
pub source_table: String,
pub affected_tables: Vec<String>,
pub records_affected: usize,
pub depth: usize,
pub monetary_impact: Option<f64>,
}
#[derive(Debug, Clone, Default)]
pub struct MultiTableData {
pub tables: HashMap<String, Vec<TableRecord>>,
pub relationships: Vec<TableRelationshipDef>,
pub anomalies: Vec<AnomalyRecord>,
}
#[derive(Debug, Clone, Default)]
pub struct TableRecord {
pub id: String,
pub table: String,
pub references: HashMap<String, String>, pub key_values: HashMap<String, String>,
pub amount: Option<f64>,
pub date: Option<String>,
pub status: Option<String>,
pub is_anomalous: bool,
pub anomaly_id: Option<String>,
}
#[derive(Debug, Clone)]
pub struct TableRelationshipDef {
pub source_table: String,
pub target_table: String,
pub source_key: String,
pub target_key: String,
pub relationship_type: TableRelationship,
pub validate_amounts: bool,
pub validate_dates: bool,
}
#[derive(Debug, Clone)]
pub struct AnomalyRecord {
pub anomaly_id: String,
pub anomaly_type: String,
pub source_record_id: String,
pub source_table: String,
pub severity: u8,
pub monetary_impact: Option<f64>,
}
pub struct MultiTableConsistencyEvaluator {
min_consistency_score: f64,
max_orphan_rate: f64,
max_cascade_depth: usize,
}
impl MultiTableConsistencyEvaluator {
pub fn new(min_consistency_score: f64, max_orphan_rate: f64, max_cascade_depth: usize) -> Self {
Self {
min_consistency_score,
max_orphan_rate,
max_cascade_depth,
}
}
pub fn evaluate(&self, data: &MultiTableData) -> EvalResult<MultiTableEvaluation> {
let mut evaluation = MultiTableEvaluation::default();
for rel_def in &data.relationships {
let result = self.evaluate_relationship(data, rel_def);
evaluation.table_consistency.push(result);
}
evaluation.cascade_analysis = self.analyze_cascades(data);
self.calculate_overall_metrics(&mut evaluation);
Ok(evaluation)
}
fn evaluate_relationship(
&self,
data: &MultiTableData,
rel_def: &TableRelationshipDef,
) -> TableConsistencyResult {
let mut result = TableConsistencyResult {
source_table: rel_def.source_table.clone(),
target_table: rel_def.target_table.clone(),
relationship: rel_def.relationship_type.clone(),
..Default::default()
};
let source_records = data.tables.get(&rel_def.source_table);
let target_records = data.tables.get(&rel_def.target_table);
let (source_records, target_records) = match (source_records, target_records) {
(Some(s), Some(t)) => (s, t),
_ => return result,
};
let target_index: HashMap<_, _> =
target_records.iter().map(|r| (r.id.clone(), r)).collect();
let mut referenced_targets = HashSet::new();
for source_record in source_records {
result.records_checked += 1;
if let Some(target_id) = source_record.references.get(&rel_def.target_table) {
if let Some(target_record) = target_index.get(target_id) {
let violations =
self.check_value_consistency(source_record, target_record, rel_def);
if violations.is_empty() {
result.matching_records += 1;
} else {
result.mismatched_records += 1;
result.violations.extend(violations);
}
referenced_targets.insert(target_id.clone());
} else {
result.orphaned_source += 1;
result.violations.push(ConsistencyViolation {
violation_type: ViolationType::MissingReference,
source_record_id: source_record.id.clone(),
target_record_id: Some(target_id.clone()),
description: format!(
"Source record {} references non-existent target {}",
source_record.id, target_id
),
severity: 3,
});
}
}
}
result.orphaned_target = target_records
.iter()
.filter(|r| !referenced_targets.contains(&r.id))
.count();
let total = result.matching_records + result.mismatched_records + result.orphaned_source;
result.consistency_score = if total > 0 {
result.matching_records as f64 / total as f64
} else {
1.0
};
result
}
fn check_value_consistency(
&self,
source: &TableRecord,
target: &TableRecord,
rel_def: &TableRelationshipDef,
) -> Vec<ConsistencyViolation> {
let mut violations = Vec::new();
if rel_def.validate_amounts {
if let (Some(s_amt), Some(t_amt)) = (source.amount, target.amount) {
if (s_amt - t_amt).abs() > 0.01 {
violations.push(ConsistencyViolation {
violation_type: ViolationType::AmountInconsistency,
source_record_id: source.id.clone(),
target_record_id: Some(target.id.clone()),
description: format!("Amount mismatch: source={s_amt}, target={t_amt}"),
severity: 3,
});
}
}
}
if rel_def.validate_dates {
if let (Some(ref s_date), Some(ref t_date)) = (&source.date, &target.date) {
if rel_def.relationship_type == TableRelationship::DocumentFlow && t_date < s_date {
violations.push(ConsistencyViolation {
violation_type: ViolationType::DateInconsistency,
source_record_id: source.id.clone(),
target_record_id: Some(target.id.clone()),
description: format!(
"Date inconsistency: target date {t_date} before source date {s_date}"
),
severity: 2,
});
}
}
}
violations
}
fn analyze_cascades(&self, data: &MultiTableData) -> CascadeAnomalyAnalysis {
let mut analysis = CascadeAnomalyAnalysis::default();
analysis.total_anomalies = data.anomalies.len();
let mut reverse_refs: HashMap<(String, String), Vec<(String, String)>> = HashMap::new();
for (table_name, records) in &data.tables {
for record in records {
for (ref_table, ref_id) in &record.references {
reverse_refs
.entry((ref_table.clone(), ref_id.clone()))
.or_default()
.push((table_name.clone(), record.id.clone()));
}
}
}
for anomaly in &data.anomalies {
let cascade_path = self.trace_cascade(
data,
&reverse_refs,
&anomaly.source_table,
&anomaly.source_record_id,
&anomaly.anomaly_id,
&anomaly.anomaly_type,
anomaly.monetary_impact,
);
if cascade_path.depth > 0 {
analysis.anomalies_with_cascades += 1;
for table in &cascade_path.affected_tables {
*analysis.tables_affected.entry(table.clone()).or_insert(0) += 1;
}
if cascade_path.depth > analysis.max_cascade_depth {
analysis.max_cascade_depth = cascade_path.depth;
}
analysis.cascade_paths.push(cascade_path);
}
}
if !analysis.cascade_paths.is_empty() {
analysis.average_cascade_depth = analysis
.cascade_paths
.iter()
.map(|p| p.depth as f64)
.sum::<f64>()
/ analysis.cascade_paths.len() as f64;
}
analysis
}
fn trace_cascade(
&self,
data: &MultiTableData,
reverse_refs: &HashMap<(String, String), Vec<(String, String)>>,
source_table: &str,
source_id: &str,
anomaly_id: &str,
anomaly_type: &str,
monetary_impact: Option<f64>,
) -> CascadePath {
let mut path = CascadePath {
anomaly_id: anomaly_id.to_string(),
anomaly_type: anomaly_type.to_string(),
source_id: source_id.to_string(),
source_table: source_table.to_string(),
affected_tables: Vec::new(),
records_affected: 0,
depth: 0,
monetary_impact,
};
let mut visited = HashSet::new();
let mut to_visit = vec![(source_table.to_string(), source_id.to_string(), 0usize)];
while let Some((table, id, depth)) = to_visit.pop() {
if depth > self.max_cascade_depth {
continue;
}
if visited.contains(&(table.clone(), id.clone())) {
continue;
}
visited.insert((table.clone(), id.clone()));
if let Some(refs) = reverse_refs.get(&(table.clone(), id.clone())) {
for (ref_table, ref_id) in refs {
if !visited.contains(&(ref_table.clone(), ref_id.clone())) {
if !path.affected_tables.contains(ref_table) {
path.affected_tables.push(ref_table.clone());
}
path.records_affected += 1;
path.depth = path.depth.max(depth + 1);
to_visit.push((ref_table.clone(), ref_id.clone(), depth + 1));
}
}
}
if let Some(records) = data.tables.get(&table) {
if let Some(record) = records.iter().find(|r| r.id == id) {
for (ref_table, ref_id) in &record.references {
if !visited.contains(&(ref_table.clone(), ref_id.clone())) {
if !path.affected_tables.contains(ref_table) {
path.affected_tables.push(ref_table.clone());
}
path.records_affected += 1;
path.depth = path.depth.max(depth + 1);
to_visit.push((ref_table.clone(), ref_id.clone(), depth + 1));
}
}
}
}
}
path
}
fn calculate_overall_metrics(&self, evaluation: &mut MultiTableEvaluation) {
let total_records: usize = evaluation
.table_consistency
.iter()
.map(|r| r.records_checked)
.sum();
let total_matching: usize = evaluation
.table_consistency
.iter()
.map(|r| r.matching_records)
.sum();
evaluation.overall_consistency_score = if total_records > 0 {
total_matching as f64 / total_records as f64
} else {
1.0
};
evaluation.total_violations = evaluation
.table_consistency
.iter()
.map(|r| r.violations.len())
.sum();
for result in &evaluation.table_consistency {
if result.consistency_score < self.min_consistency_score {
evaluation.issues.push(format!(
"{}->{}: consistency {:.2}% below threshold {:.2}%",
result.source_table,
result.target_table,
result.consistency_score * 100.0,
self.min_consistency_score * 100.0
));
}
let orphan_rate = if result.records_checked > 0 {
(result.orphaned_source + result.orphaned_target) as f64
/ result.records_checked as f64
} else {
0.0
};
if orphan_rate > self.max_orphan_rate {
evaluation.issues.push(format!(
"{}->{}: orphan rate {:.2}% exceeds threshold {:.2}%",
result.source_table,
result.target_table,
orphan_rate * 100.0,
self.max_orphan_rate * 100.0
));
}
}
if evaluation.cascade_analysis.max_cascade_depth > 3 {
evaluation.issues.push(format!(
"High cascade depth detected: {} tables deep",
evaluation.cascade_analysis.max_cascade_depth
));
}
evaluation.passes = evaluation.issues.is_empty()
&& evaluation.overall_consistency_score >= self.min_consistency_score;
}
}
impl Default for MultiTableConsistencyEvaluator {
fn default() -> Self {
Self::new(0.95, 0.10, 5) }
}
pub fn get_p2p_flow_relationships() -> Vec<TableRelationshipDef> {
vec![
TableRelationshipDef {
source_table: "purchase_orders".to_string(),
target_table: "goods_receipts".to_string(),
source_key: "po_number".to_string(),
target_key: "po_number".to_string(),
relationship_type: TableRelationship::DocumentFlow,
validate_amounts: false,
validate_dates: true,
},
TableRelationshipDef {
source_table: "goods_receipts".to_string(),
target_table: "vendor_invoices".to_string(),
source_key: "gr_number".to_string(),
target_key: "gr_number".to_string(),
relationship_type: TableRelationship::DocumentFlow,
validate_amounts: true,
validate_dates: true,
},
TableRelationshipDef {
source_table: "vendor_invoices".to_string(),
target_table: "payments".to_string(),
source_key: "invoice_number".to_string(),
target_key: "invoice_number".to_string(),
relationship_type: TableRelationship::DocumentFlow,
validate_amounts: true,
validate_dates: true,
},
TableRelationshipDef {
source_table: "vendor_invoices".to_string(),
target_table: "journal_entries".to_string(),
source_key: "invoice_number".to_string(),
target_key: "source_document_id".to_string(),
relationship_type: TableRelationship::OneToMany,
validate_amounts: true,
validate_dates: true,
},
]
}
pub fn get_o2c_flow_relationships() -> Vec<TableRelationshipDef> {
vec![
TableRelationshipDef {
source_table: "sales_orders".to_string(),
target_table: "deliveries".to_string(),
source_key: "so_number".to_string(),
target_key: "so_number".to_string(),
relationship_type: TableRelationship::DocumentFlow,
validate_amounts: false,
validate_dates: true,
},
TableRelationshipDef {
source_table: "deliveries".to_string(),
target_table: "customer_invoices".to_string(),
source_key: "delivery_number".to_string(),
target_key: "delivery_number".to_string(),
relationship_type: TableRelationship::DocumentFlow,
validate_amounts: true,
validate_dates: true,
},
TableRelationshipDef {
source_table: "customer_invoices".to_string(),
target_table: "customer_receipts".to_string(),
source_key: "invoice_number".to_string(),
target_key: "invoice_number".to_string(),
relationship_type: TableRelationship::DocumentFlow,
validate_amounts: true,
validate_dates: true,
},
TableRelationshipDef {
source_table: "customer_invoices".to_string(),
target_table: "journal_entries".to_string(),
source_key: "invoice_number".to_string(),
target_key: "source_document_id".to_string(),
relationship_type: TableRelationship::OneToMany,
validate_amounts: true,
validate_dates: true,
},
]
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
fn create_test_data() -> MultiTableData {
let mut data = MultiTableData::default();
let mut po1 = TableRecord::default();
po1.id = "PO001".to_string();
po1.table = "purchase_orders".to_string();
po1.amount = Some(1000.0);
po1.date = Some("2024-01-01".to_string());
let mut po2 = TableRecord::default();
po2.id = "PO002".to_string();
po2.table = "purchase_orders".to_string();
po2.amount = Some(2000.0);
po2.date = Some("2024-01-02".to_string());
po2.is_anomalous = true;
po2.anomaly_id = Some("ANO001".to_string());
data.tables
.insert("purchase_orders".to_string(), vec![po1, po2]);
let mut gr1 = TableRecord::default();
gr1.id = "GR001".to_string();
gr1.table = "goods_receipts".to_string();
gr1.references
.insert("purchase_orders".to_string(), "PO001".to_string());
gr1.amount = Some(1000.0);
gr1.date = Some("2024-01-05".to_string());
let mut gr2 = TableRecord::default();
gr2.id = "GR002".to_string();
gr2.table = "goods_receipts".to_string();
gr2.references
.insert("purchase_orders".to_string(), "PO002".to_string());
gr2.amount = Some(2000.0);
gr2.date = Some("2024-01-06".to_string());
data.tables
.insert("goods_receipts".to_string(), vec![gr1, gr2]);
let mut inv1 = TableRecord::default();
inv1.id = "INV001".to_string();
inv1.table = "vendor_invoices".to_string();
inv1.references
.insert("goods_receipts".to_string(), "GR001".to_string());
inv1.amount = Some(1000.0);
inv1.date = Some("2024-01-10".to_string());
data.tables
.insert("vendor_invoices".to_string(), vec![inv1]);
data.relationships = vec![
TableRelationshipDef {
source_table: "goods_receipts".to_string(),
target_table: "purchase_orders".to_string(),
source_key: "po_number".to_string(),
target_key: "id".to_string(),
relationship_type: TableRelationship::DocumentFlow,
validate_amounts: true,
validate_dates: true,
},
TableRelationshipDef {
source_table: "vendor_invoices".to_string(),
target_table: "goods_receipts".to_string(),
source_key: "gr_number".to_string(),
target_key: "id".to_string(),
relationship_type: TableRelationship::DocumentFlow,
validate_amounts: true,
validate_dates: true,
},
];
data.anomalies.push(AnomalyRecord {
anomaly_id: "ANO001".to_string(),
anomaly_type: "Fraud".to_string(),
source_record_id: "PO002".to_string(),
source_table: "purchase_orders".to_string(),
severity: 4,
monetary_impact: Some(2000.0),
});
data
}
#[test]
fn test_basic_consistency_evaluation() {
let data = create_test_data();
let evaluator = MultiTableConsistencyEvaluator::default();
let result = evaluator.evaluate(&data).unwrap();
assert_eq!(result.table_consistency.len(), 2);
for table_result in &result.table_consistency {
println!(
"{}->{}: checked={}, matching={}, orphaned_source={}",
table_result.source_table,
table_result.target_table,
table_result.records_checked,
table_result.matching_records,
table_result.orphaned_source
);
}
}
#[test]
fn test_cascade_analysis() {
let data = create_test_data();
let evaluator = MultiTableConsistencyEvaluator::default();
let result = evaluator.evaluate(&data).unwrap();
assert_eq!(result.cascade_analysis.total_anomalies, 1);
}
#[test]
fn test_empty_data() {
let data = MultiTableData::default();
let evaluator = MultiTableConsistencyEvaluator::default();
let result = evaluator.evaluate(&data).unwrap();
assert!(result.passes);
assert_eq!(result.total_violations, 0);
}
#[test]
fn test_missing_reference() {
let mut data = MultiTableData::default();
let mut inv = TableRecord::default();
inv.id = "INV001".to_string();
inv.table = "vendor_invoices".to_string();
inv.references
.insert("goods_receipts".to_string(), "GR999".to_string()); inv.amount = Some(1000.0);
data.tables.insert("vendor_invoices".to_string(), vec![inv]);
data.tables.insert("goods_receipts".to_string(), Vec::new());
data.relationships.push(TableRelationshipDef {
source_table: "vendor_invoices".to_string(),
target_table: "goods_receipts".to_string(),
source_key: "gr_number".to_string(),
target_key: "id".to_string(),
relationship_type: TableRelationship::DocumentFlow,
validate_amounts: true,
validate_dates: false,
});
let evaluator = MultiTableConsistencyEvaluator::default();
let result = evaluator.evaluate(&data).unwrap();
assert!(!result.table_consistency.is_empty());
let inv_gr = &result.table_consistency[0];
assert_eq!(inv_gr.orphaned_source, 1);
assert_eq!(inv_gr.violations.len(), 1);
assert_eq!(
inv_gr.violations[0].violation_type,
ViolationType::MissingReference
);
}
}