Skip to main content

cortexai_data/
matcher.rs

1//! Cross-source data matching and entity resolution
2//!
3//! Consolidates records from multiple sources into unified entities.
4
5use crate::{cpf::CpfMatcher, name::NameMatcher, types::*};
6use std::collections::HashMap;
7use std::time::Instant;
8use tracing::{debug, info};
9
10/// Data matcher for cross-source entity resolution
11#[derive(Debug, Clone)]
12pub struct DataMatcher {
13    /// CPF matcher
14    cpf_matcher: CpfMatcher,
15    /// Name matcher
16    name_matcher: NameMatcher,
17    /// Minimum confidence for CPF match
18    cpf_confidence_threshold: f64,
19    /// Minimum confidence for name match
20    name_confidence_threshold: f64,
21}
22
23impl Default for DataMatcher {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29impl DataMatcher {
30    /// Create a new data matcher
31    pub fn new() -> Self {
32        Self {
33            cpf_matcher: CpfMatcher::new(),
34            name_matcher: NameMatcher::new(),
35            cpf_confidence_threshold: 0.99,
36            name_confidence_threshold: 0.85,
37        }
38    }
39
40    /// Set CPF confidence threshold
41    pub fn with_cpf_threshold(mut self, threshold: f64) -> Self {
42        self.cpf_confidence_threshold = threshold;
43        self
44    }
45
46    /// Set name confidence threshold
47    pub fn with_name_threshold(mut self, threshold: f64) -> Self {
48        self.name_confidence_threshold = threshold;
49        self.name_matcher = self.name_matcher.with_threshold(threshold);
50        self
51    }
52
53    /// Match across multiple data sources
54    ///
55    /// Returns consolidated results grouping records that refer to the same entity
56    pub fn match_across_sources(
57        &self,
58        sources: &[DataSource],
59        query_name: &str,
60        query_cpf: Option<&str>,
61    ) -> Vec<MatchResult> {
62        let start = Instant::now();
63        let mut records_scanned = 0;
64        let mut candidates: Vec<(DataRecord, &DataSource, f64, MatchType)> = Vec::new();
65
66        // Normalize query CPF if provided
67        let normalized_cpf = query_cpf.and_then(|cpf| self.cpf_matcher.normalize(cpf));
68
69        info!(
70            query_name = query_name,
71            query_cpf = ?query_cpf,
72            sources_count = sources.len(),
73            "Starting cross-source match"
74        );
75
76        // Phase 1: Find all matching records
77        for source in sources {
78            for record in &source.records {
79                records_scanned += 1;
80
81                // Try CPF match first (highest confidence)
82                if let Some(ref cpf) = normalized_cpf {
83                    if let Some(record_cpf) = record.get_cpf_field() {
84                        if self.cpf_matcher.matches(cpf, record_cpf) {
85                            candidates.push((record.clone(), source, 1.0, MatchType::ExactId));
86                            continue;
87                        }
88                    }
89                }
90
91                // Try name match
92                if let Some(record_name) = record.get_name_field() {
93                    let similarity = self.name_matcher.similarity(query_name, record_name);
94
95                    if similarity >= self.name_confidence_threshold {
96                        let match_type = if similarity >= 0.99 {
97                            MatchType::ExactText
98                        } else {
99                            MatchType::Fuzzy
100                        };
101
102                        candidates.push((record.clone(), source, similarity, match_type));
103                    }
104                }
105            }
106        }
107
108        debug!(
109            candidates_found = candidates.len(),
110            records_scanned, "Initial candidate search complete"
111        );
112
113        // Phase 2: Group candidates by entity
114        let grouped = self.group_candidates(candidates, query_name);
115
116        // Phase 3: Create match results
117        let mut results: Vec<MatchResult> = grouped
118            .into_iter()
119            .map(|(entity_id, matches)| {
120                let mut result = MatchResult::new(&entity_id);
121
122                for (record, source, score, match_type) in matches {
123                    // Add fields to consolidated
124                    for (key, value) in &record.fields {
125                        result
126                            .consolidated_fields
127                            .entry(key.clone())
128                            .or_insert_with(|| value.clone());
129                    }
130
131                    result.add_source(SourceMatch {
132                        source_id: source.id.clone(),
133                        source_name: source.name.clone(),
134                        score,
135                        record,
136                        match_type,
137                    });
138                }
139
140                result.calculate_confidence();
141                result.metadata = MatchMetadata {
142                    match_time_ms: start.elapsed().as_millis() as u64,
143                    records_scanned,
144                    criteria: self.get_criteria_used(&result),
145                    warnings: Vec::new(),
146                };
147
148                result
149            })
150            .collect();
151
152        // Sort by confidence
153        results.sort_by(|a, b| b.confidence.partial_cmp(&a.confidence).unwrap());
154
155        info!(
156            results_count = results.len(),
157            elapsed_ms = start.elapsed().as_millis(),
158            "Cross-source match complete"
159        );
160
161        results
162    }
163
164    /// Group candidates by probable entity
165    fn group_candidates<'a>(
166        &self,
167        candidates: Vec<(DataRecord, &'a DataSource, f64, MatchType)>,
168        query_name: &str,
169    ) -> HashMap<String, Vec<(DataRecord, &'a DataSource, f64, MatchType)>> {
170        let mut groups: HashMap<String, Vec<(DataRecord, &'a DataSource, f64, MatchType)>> =
171            HashMap::new();
172
173        for candidate in candidates {
174            // Determine entity ID
175            let entity_id = self.determine_entity_id(&candidate.0, query_name);
176
177            groups.entry(entity_id).or_default().push(candidate);
178        }
179
180        // Merge groups that likely refer to the same entity
181        self.merge_similar_groups(groups)
182    }
183
184    /// Determine entity ID from a record
185    fn determine_entity_id(&self, record: &DataRecord, query_name: &str) -> String {
186        // If record has CPF, use that as primary ID
187        if let Some(cpf) = record.get_cpf_field() {
188            if let Some(normalized) = self.cpf_matcher.normalize(cpf) {
189                return format!("cpf_{}", normalized);
190            }
191        }
192
193        // Otherwise use normalized name
194        if let Some(name) = record.get_name_field() {
195            return self.name_matcher.to_entity_id(name);
196        }
197
198        // Fallback to query name
199        self.name_matcher.to_entity_id(query_name)
200    }
201
202    /// Merge groups that likely refer to the same entity
203    fn merge_similar_groups<'a>(
204        &self,
205        mut groups: HashMap<String, Vec<(DataRecord, &'a DataSource, f64, MatchType)>>,
206    ) -> HashMap<String, Vec<(DataRecord, &'a DataSource, f64, MatchType)>> {
207        let entity_ids: Vec<String> = groups.keys().cloned().collect();
208
209        // Find CPF-based groups
210        let cpf_groups: Vec<_> = entity_ids
211            .iter()
212            .filter(|id| id.starts_with("cpf_"))
213            .cloned()
214            .collect();
215
216        // For each CPF group, absorb name-based groups that match
217        for cpf_id in &cpf_groups {
218            let cpf_records = groups.get(cpf_id).cloned().unwrap_or_default();
219
220            // Get names from CPF group
221            let cpf_names: Vec<String> = cpf_records
222                .iter()
223                .filter_map(|(r, _, _, _)| r.get_name_field().map(String::from))
224                .collect();
225
226            // Find matching name groups
227            let mut to_merge = Vec::new();
228            for name_id in entity_ids.iter().filter(|id| !id.starts_with("cpf_")) {
229                for cpf_name in &cpf_names {
230                    let name_from_id = name_id.replace('_', " ");
231                    if self.name_matcher.similarity(cpf_name, &name_from_id)
232                        >= self.name_confidence_threshold
233                    {
234                        to_merge.push(name_id.clone());
235                        break;
236                    }
237                }
238            }
239
240            // Merge
241            for merge_id in to_merge {
242                if let Some(records) = groups.remove(&merge_id) {
243                    groups.entry(cpf_id.clone()).or_default().extend(records);
244                }
245            }
246        }
247
248        groups
249    }
250
251    /// Get criteria used for matching
252    fn get_criteria_used(&self, result: &MatchResult) -> Vec<String> {
253        let mut criteria = Vec::new();
254
255        let has_cpf_match = result
256            .sources
257            .iter()
258            .any(|s| matches!(s.match_type, MatchType::ExactId));
259
260        let has_fuzzy_match = result
261            .sources
262            .iter()
263            .any(|s| matches!(s.match_type, MatchType::Fuzzy));
264
265        if has_cpf_match {
266            criteria.push("CPF match".to_string());
267        }
268        if has_fuzzy_match {
269            criteria.push("Fuzzy name match".to_string());
270        }
271        if result.sources.len() > 1 {
272            criteria.push(format!("Cross-source ({} sources)", result.sources.len()));
273        }
274
275        criteria
276    }
277
278    /// Find a single best match
279    pub fn find_best_match(
280        &self,
281        sources: &[DataSource],
282        query_name: &str,
283        query_cpf: Option<&str>,
284    ) -> Option<MatchResult> {
285        self.match_across_sources(sources, query_name, query_cpf)
286            .into_iter()
287            .next()
288    }
289
290    /// Check if an entity exists in sources
291    pub fn entity_exists(
292        &self,
293        sources: &[DataSource],
294        query_name: &str,
295        query_cpf: Option<&str>,
296    ) -> bool {
297        !self
298            .match_across_sources(sources, query_name, query_cpf)
299            .is_empty()
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306
307    fn create_test_sources() -> Vec<DataSource> {
308        vec![
309            DataSource {
310                id: "source1".to_string(),
311                name: "Fonte A".to_string(),
312                schema: DataSchema::default(),
313                records: vec![
314                    DataRecord::new("source1")
315                        .with_field(
316                            "nome",
317                            FieldValue::Text("Lucas Melo de Oliveira".to_string()),
318                        )
319                        .with_field("cpf", FieldValue::Text("123.456.789-00".to_string()))
320                        .with_confidence(1.0),
321                    DataRecord::new("source1")
322                        .with_field("nome", FieldValue::Text("Ana Clara Silva".to_string()))
323                        .with_confidence(0.9),
324                ],
325            },
326            DataSource {
327                id: "source2".to_string(),
328                name: "Fonte B".to_string(),
329                schema: DataSchema::default(),
330                records: vec![
331                    DataRecord::new("source2")
332                        .with_field(
333                            "nome_completo",
334                            FieldValue::Text("Lucas M. Oliveira".to_string()),
335                        )
336                        .with_field("documento", FieldValue::Text("12345678900".to_string()))
337                        .with_confidence(1.0),
338                    DataRecord::new("source2")
339                        .with_field("nome", FieldValue::Text("Ana C Silva".to_string()))
340                        .with_field("cpf", FieldValue::Text("987.654.321-00".to_string()))
341                        .with_confidence(0.95),
342                ],
343            },
344        ]
345    }
346
347    #[test]
348    fn test_exact_cpf_match() {
349        let matcher = DataMatcher::new();
350        let sources = create_test_sources();
351
352        let results =
353            matcher.match_across_sources(&sources, "qualquer nome", Some("123.456.789-00"));
354
355        assert!(!results.is_empty(), "Should find CPF match");
356        assert_eq!(results[0].sources.len(), 2, "Should find in both sources");
357        assert!(results[0].confidence > 0.95);
358    }
359
360    #[test]
361    fn test_cpf_variations() {
362        let matcher = DataMatcher::new();
363        let sources = create_test_sources();
364
365        let variations = vec!["12345678900", "123.456.789-00", " 123 456 789 00 "];
366
367        for cpf in variations {
368            let results = matcher.match_across_sources(&sources, "x", Some(cpf));
369            assert!(!results.is_empty(), "Should match CPF: {}", cpf);
370        }
371    }
372
373    #[test]
374    fn test_fuzzy_name_matching() {
375        let matcher = DataMatcher::new();
376        let sources = create_test_sources();
377
378        let results = matcher.match_across_sources(&sources, "Lucas Oliveira", None);
379
380        assert!(!results.is_empty(), "Should find name match");
381        assert!(results[0].confidence >= 0.85);
382    }
383
384    #[test]
385    fn test_cross_source_consolidation() {
386        let matcher = DataMatcher::new();
387        let sources = create_test_sources();
388
389        let results = matcher.match_across_sources(&sources, "Ana Silva", None);
390
391        assert!(!results.is_empty(), "Should find Ana");
392        // Ana appears in both sources with slightly different names
393        assert!(!results[0].sources.is_empty());
394    }
395
396    #[test]
397    fn test_no_match() {
398        let matcher = DataMatcher::new();
399        let sources = create_test_sources();
400
401        let results =
402            matcher.match_across_sources(&sources, "José Inexistente", Some("000.000.000-00"));
403
404        assert!(results.is_empty(), "Should not find non-existent person");
405    }
406
407    #[test]
408    fn test_find_best_match() {
409        let matcher = DataMatcher::new();
410        let sources = create_test_sources();
411
412        let result = matcher.find_best_match(&sources, "Lucas", Some("123.456.789-00"));
413
414        assert!(result.is_some());
415        assert!(result.unwrap().confidence > 0.9);
416    }
417
418    #[test]
419    fn test_entity_exists() {
420        let matcher = DataMatcher::new();
421        let sources = create_test_sources();
422
423        assert!(matcher.entity_exists(&sources, "Lucas", Some("123.456.789-00")));
424        assert!(!matcher.entity_exists(&sources, "Nobody", Some("000.000.000-00")));
425    }
426}