Skip to main content

zer_blocking/
blocker.rs

1use std::collections::HashMap;
2
3use zer_core::{
4    record::{Record, RecordId},
5    schema::Schema,
6    traits::{BlockIndex, Blocker},
7};
8
9use crate::keys::BlockingKey;
10
11/// Composite blocker that applies multiple blocking keys.
12///
13/// For cross-schema linkage, `source_remaps` stores per-source field-name
14/// translations (b_field to a_field).  Before key extraction, any record
15/// whose `source` label has an entry in `source_remaps` gets its fields
16/// renamed to the canonical (A-side) names so the existing `BlockingKey`
17/// implementations can extract values without knowing about schema differences.
18pub struct CompositeBlocker {
19    keys:          Vec<Box<dyn BlockingKey>>,
20    source_remaps: HashMap<String, HashMap<String, String>>,
21}
22
23impl CompositeBlocker {
24    pub fn new() -> Self {
25        Self { keys: vec![], source_remaps: HashMap::new() }
26    }
27
28    pub fn add(mut self, key: impl BlockingKey + 'static) -> Self {
29        self.keys.push(Box::new(key));
30        self
31    }
32
33    pub fn add_boxed(mut self, key: Box<dyn BlockingKey>) -> Self {
34        self.keys.push(key);
35        self
36    }
37
38    /// Register a field-name remap for records from `source`.
39    ///
40    /// `remap` maps b_field to a_field so that the source-B fields are
41    /// visible under canonical source-A names during blocking key extraction.
42    pub fn with_source_remap(
43        mut self,
44        source: impl Into<String>,
45        remap:  HashMap<String, String>,
46    ) -> Self {
47        self.source_remaps.insert(source.into(), remap);
48        self
49    }
50
51    fn effective_record<'r>(&self, record: &'r Record) -> Option<Record> {
52        let src = record.source.as_deref()?;
53        let remap = self.source_remaps.get(src)?;
54        let mut new_rec = Record::new(record.id);
55        if let Some(s) = &record.source {
56            new_rec = new_rec.with_source(s);
57        }
58        for (field_name, value) in &record.fields {
59            let canonical = remap.get(field_name).cloned()
60                .unwrap_or_else(|| field_name.clone());
61            new_rec.fields.insert(canonical, value.clone());
62        }
63        Some(new_rec)
64    }
65}
66
67impl Default for CompositeBlocker {
68    fn default() -> Self {
69        Self::new()
70    }
71}
72
73impl Blocker for CompositeBlocker {
74    fn blocking_keys(&self, record: &Record, schema: &Schema) -> Vec<String> {
75        let remapped = self.effective_record(record);
76        let effective = remapped.as_ref().unwrap_or(record);
77        self.keys
78            .iter()
79            .flat_map(|k| {
80                k.extract(effective, schema)
81                    .into_iter()
82                    .map(|val| format!("{}:{}", k.name(), val))
83            })
84            .collect()
85    }
86
87    fn index_record(&self, record: &Record, schema: &Schema, index: &mut dyn BlockIndex) {
88        let keys = self.blocking_keys(record, schema);
89        index.insert(record.id, keys);
90    }
91
92    fn candidates(&self, record: &Record, schema: &Schema, index: &dyn BlockIndex) -> Vec<RecordId> {
93        let keys = self.blocking_keys(record, schema);
94        index.lookup_union(&keys, record.id)
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use super::*;
101    use zer_core::{record::FieldValue, schema::{SchemaBuilder, FieldKind}};
102    use crate::{index::InvertedIndex, keys::ExactFieldKey};
103
104    fn schema() -> Schema {
105        SchemaBuilder::new()
106            .field("category", FieldKind::Categorical)
107            .build()
108            .unwrap()
109    }
110
111    #[test]
112    fn index_and_candidates_round_trip() {
113        let schema  = schema();
114        let blocker = CompositeBlocker::new().add(ExactFieldKey::new("category"));
115        let mut idx = InvertedIndex::new();
116
117        let r1 = Record::new(1).insert("category", FieldValue::Text("TypeA".into()));
118        let r2 = Record::new(2).insert("category", FieldValue::Text("TypeA".into()));
119        let r3 = Record::new(3).insert("category", FieldValue::Text("TypeB".into()));
120
121        blocker.index_record(&r1, &schema, &mut idx);
122        blocker.index_record(&r2, &schema, &mut idx);
123        blocker.index_record(&r3, &schema, &mut idx);
124
125        let cands_r1 = blocker.candidates(&r1, &schema, &idx);
126        assert!(cands_r1.contains(&2), "r2 should be a candidate for r1");
127        assert!(!cands_r1.contains(&1), "r1 should not be its own candidate");
128        assert!(!cands_r1.contains(&3), "r3 should not match r1 (different category)");
129    }
130
131    #[test]
132    fn no_self_candidates() {
133        let schema  = schema();
134        let blocker = CompositeBlocker::new().add(ExactFieldKey::new("category"));
135        let mut idx = InvertedIndex::new();
136
137        let r = Record::new(1).insert("category", FieldValue::Text("X".into()));
138        blocker.index_record(&r, &schema, &mut idx);
139
140        let cands = blocker.candidates(&r, &schema, &idx);
141        assert!(!cands.contains(&1));
142    }
143}