1use std::collections::{HashMap, HashSet};
2
3use crate::fingerprint::{FieldStats, SchemaFingerprint};
4
5pub const EXACT_MATCH_THRESHOLD: f32 = 0.0;
8
9pub const WARM_START_THRESHOLD: f32 = 0.25;
13
14fn field_set(stats: &[FieldStats]) -> HashSet<String> {
18 stats.iter().map(|f| format!("{}:{:?}", f.name, f.kind)).collect()
19}
20
21fn jaccard_field_sets(a: &SchemaFingerprint, b: &SchemaFingerprint) -> f32 {
24 let a_set = field_set(&a.field_stats);
25 let b_set = field_set(&b.field_stats);
26
27 if a_set.is_empty() && b_set.is_empty() {
28 return 1.0;
29 }
30
31 let intersection = a_set.intersection(&b_set).count();
32 let union = a_set.union(&b_set).count();
33
34 if union == 0 {
35 return 1.0;
36 }
37
38 intersection as f32 / union as f32
39}
40
41fn matching_field_stat_similarity(a: &SchemaFingerprint, b: &SchemaFingerprint) -> f32 {
47 if a.record_count == 0 || b.record_count == 0 {
49 return 1.0;
50 }
51
52 let a_map: HashMap<&str, &FieldStats> =
53 a.field_stats.iter().map(|f| (f.name.as_str(), f)).collect();
54 let b_map: HashMap<&str, &FieldStats> =
55 b.field_stats.iter().map(|f| (f.name.as_str(), f)).collect();
56
57 let matching: Vec<&str> = a_map
58 .keys()
59 .copied()
60 .filter(|name| b_map.contains_key(name))
61 .collect();
62
63 if matching.is_empty() {
64 return 0.0;
65 }
66
67 let total_sim: f32 = matching
68 .iter()
69 .map(|name| {
70 let fa = a_map[name];
71 let fb = b_map[name];
72
73 let null_sim = 1.0 - (fa.null_rate - fb.null_rate).abs().min(1.0);
75
76 let card_sim = if fa.cardinality == 0 && fb.cardinality == 0 {
78 1.0
79 } else {
80 let max_c = fa.cardinality.max(fb.cardinality) as f32;
81 1.0 - (fa.cardinality as f32 - fb.cardinality as f32).abs() / max_c
82 };
83
84 (null_sim + card_sim) / 2.0
85 })
86 .sum();
87
88 total_sim / matching.len() as f32
89}
90
91pub fn fingerprint_distance(a: &SchemaFingerprint, b: &SchemaFingerprint) -> f32 {
104 if a.schema_hash == b.schema_hash {
106 return EXACT_MATCH_THRESHOLD;
107 }
108
109 let jaccard = jaccard_field_sets(a, b);
110 let stat_sim = matching_field_stat_similarity(a, b);
111
112 (1.0 - jaccard) * 0.7 + (1.0 - stat_sim) * 0.3
113}
114
115#[cfg(test)]
118mod tests {
119 use super::*;
120 use zer_core::schema::{FieldKind, SchemaBuilder};
121
122 use crate::fingerprint::SchemaFingerprint;
123
124 fn brp_schema() -> zer_core::schema::Schema {
125 SchemaBuilder::new()
126 .field("voornamen", FieldKind::Name)
127 .field("achternaam", FieldKind::Name)
128 .field("tussenvoegsel", FieldKind::Categorical)
129 .field("geboortedatum", FieldKind::Date)
130 .field("geboorteland", FieldKind::Categorical)
131 .field("nationaliteit", FieldKind::Categorical)
132 .field("straatnaam", FieldKind::Address)
133 .field("huisnummer", FieldKind::Address)
134 .field("postcode", FieldKind::Id)
135 .field("woonplaats", FieldKind::Address)
136 .build()
137 .unwrap()
138 }
139
140 #[test]
141 fn identical_fingerprints_zero_distance() {
142 let schema = brp_schema();
143 let fp1 = SchemaFingerprint::from_schema(&schema);
144 let fp2 = SchemaFingerprint::from_schema(&schema);
145 assert_eq!(
146 fingerprint_distance(&fp1, &fp2),
147 0.0,
148 "identical fingerprints must have distance 0.0"
149 );
150 }
151
152 #[test]
153 fn one_extra_field_is_warm_start_range() {
154 let base = brp_schema();
155
156 let extended = SchemaBuilder::new()
158 .field("voornamen", FieldKind::Name)
159 .field("achternaam", FieldKind::Name)
160 .field("tussenvoegsel", FieldKind::Categorical)
161 .field("geboortedatum", FieldKind::Date)
162 .field("geboorteland", FieldKind::Categorical)
163 .field("nationaliteit", FieldKind::Categorical)
164 .field("straatnaam", FieldKind::Address)
165 .field("huisnummer", FieldKind::Address)
166 .field("postcode", FieldKind::Id)
167 .field("woonplaats", FieldKind::Address)
168 .field("verblijfstitel", FieldKind::Categorical) .build()
170 .unwrap();
171
172 let fp_base = SchemaFingerprint::from_schema(&base);
173 let fp_ext = SchemaFingerprint::from_schema(&extended);
174 let dist = fingerprint_distance(&fp_base, &fp_ext);
175
176 assert!(
177 dist > EXACT_MATCH_THRESHOLD,
178 "schemas differ, distance must be > 0"
179 );
180 assert!(
181 dist <= WARM_START_THRESHOLD,
182 "one extra field out of 11 should be warm-start eligible, got dist={dist:.4}"
183 );
184 }
185
186 #[test]
187 fn completely_different_schema_is_cold_start() {
188 let sim = SchemaBuilder::new()
190 .field("sim_id", FieldKind::Id)
191 .field("msisdn", FieldKind::Phone)
192 .field("imsi", FieldKind::Id)
193 .field("iccid", FieldKind::Id)
194 .field("carrier", FieldKind::Categorical)
195 .field("contract_type", FieldKind::Categorical)
196 .field("activatiedatum", FieldKind::Date)
197 .field("voornamen", FieldKind::Name)
198 .field("achternaam", FieldKind::Name)
199 .field("geboortedatum", FieldKind::Date)
200 .field("nationaliteit", FieldKind::Categorical)
201 .field("document_type", FieldKind::Categorical)
202 .field("document_nummer", FieldKind::Id)
203 .field("bsn", FieldKind::Id)
204 .build()
205 .unwrap();
206
207 let brp = brp_schema();
208 let fp_brp = SchemaFingerprint::from_schema(&brp);
209 let fp_sim = SchemaFingerprint::from_schema(&sim);
210 let dist = fingerprint_distance(&fp_brp, &fp_sim);
211
212 assert!(
213 dist > WARM_START_THRESHOLD,
214 "BRP vs SIM should exceed warm-start threshold, got dist={dist:.4}"
215 );
216 }
217
218 #[test]
219 fn reordered_fields_same_schema_zero_distance() {
220 let s1 = SchemaBuilder::new()
221 .field("alpha", FieldKind::Name)
222 .field("beta", FieldKind::Date)
223 .build()
224 .unwrap();
225 let s2 = SchemaBuilder::new()
226 .field("beta", FieldKind::Date)
227 .field("alpha", FieldKind::Name)
228 .build()
229 .unwrap();
230
231 let fp1 = SchemaFingerprint::from_schema(&s1);
232 let fp2 = SchemaFingerprint::from_schema(&s2);
233 assert_eq!(
234 fingerprint_distance(&fp1, &fp2),
235 0.0,
236 "reordered fields must produce identical fingerprints (distance = 0)"
237 );
238 }
239
240 #[test]
241 fn distance_is_symmetric() {
242 let brp = brp_schema();
243 let sim = SchemaBuilder::new()
244 .field("msisdn", FieldKind::Phone)
245 .field("voornamen", FieldKind::Name)
246 .field("achternaam", FieldKind::Name)
247 .build()
248 .unwrap();
249
250 let fp_brp = SchemaFingerprint::from_schema(&brp);
251 let fp_sim = SchemaFingerprint::from_schema(&sim);
252
253 let d_ab = fingerprint_distance(&fp_brp, &fp_sim);
254 let d_ba = fingerprint_distance(&fp_sim, &fp_brp);
255
256 assert!(
257 (d_ab - d_ba).abs() < 1e-6,
258 "distance must be symmetric: d(a,b)={d_ab} d(b,a)={d_ba}"
259 );
260 }
261
262 #[test]
263 fn distance_bounded_zero_to_one() {
264 let s1 = SchemaBuilder::new()
265 .field("x", FieldKind::Name)
266 .build()
267 .unwrap();
268 let s2 = SchemaBuilder::new()
269 .field("y", FieldKind::Date)
270 .build()
271 .unwrap();
272
273 let fp1 = SchemaFingerprint::from_schema(&s1);
274 let fp2 = SchemaFingerprint::from_schema(&s2);
275 let d = fingerprint_distance(&fp1, &fp2);
276 assert!(d >= 0.0 && d <= 1.0, "distance must be in [0, 1], got {d}");
277 }
278}