Skip to main content

zer_pipeline/
config.rs

1use std::path::PathBuf;
2
3use zer_cluster::ClusterConfig;
4use zer_core::field_mapping::FieldMapping;
5
6/// How the pipeline started relative to its stored schema artifact.
7///
8/// Mirrors the variants of `zer_schema::StartupMode` but as a plain `Copy`
9/// enum without any associated data, suitable for storing in [`crate::batch::BatchReport`].
10#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
11#[serde(rename_all = "PascalCase")]
12pub enum BatchStartupMode {
13    /// No prior artifact existed; EM ran from scratch with default priors.
14    ColdStart,
15    /// An artifact with an identical schema fingerprint was found; EM was skipped.
16    WarmLoad,
17    /// An artifact with a similar (but not identical) schema was found; EM ran
18    /// for a few refinement iterations.
19    WarmStart,
20}
21
22impl std::fmt::Display for BatchStartupMode {
23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24        match self {
25            Self::ColdStart => write!(f, "ColdStart"),
26            Self::WarmLoad => write!(f, "WarmLoad"),
27            Self::WarmStart => write!(f, "WarmStart"),
28        }
29    }
30}
31
32/// Controls which record pairs the pipeline generates candidates for.
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
34pub enum LinkMode {
35    /// Find duplicates within a single dataset.  All candidate pairs allowed.
36    #[default]
37    Deduplicate,
38
39    /// Link records across datasets.  Only pairs where the source labels differ
40    /// are generated.  Within-source pairs are never compared or scored.
41    ///
42    /// Use when you have two or more curated datasets (e.g. BRP + KvK) and want
43    /// to find cross-source matches without disturbing each source's internal
44    /// integrity.
45    LinkOnly,
46
47    /// Simultaneously deduplicate within each source and link across sources.
48    /// All candidate pairs are generated regardless of source label.
49    LinkAndDedupe,
50}
51
52impl LinkMode {
53    pub fn as_str(self) -> &'static str {
54        match self {
55            Self::Deduplicate => "deduplicate",
56            Self::LinkOnly => "link-only",
57            Self::LinkAndDedupe => "link-and-dedupe",
58        }
59    }
60}
61
62/// Rate-adaptive threshold configuration.
63#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
64pub struct RateConfig {
65    /// Records/sec below which processing is fully synchronous.
66    pub slow_threshold: f32,
67    /// Records/sec above which the auto-match threshold is widened.
68    pub fast_threshold: f32,
69    /// Threshold divisor applied to `upper_threshold` during bulk load.
70    pub bulk_threshold_multiplier: f32,
71}
72
73impl Default for RateConfig {
74    fn default() -> Self {
75        Self {
76            slow_threshold: 1.0,
77            fast_threshold: 100.0,
78            bulk_threshold_multiplier: 1.05,
79        }
80    }
81}
82
83/// All tunable parameters for a [`crate::pipeline::Pipeline`].
84#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
85pub struct PipelineConfig {
86    /// Path to the `.zsm` schema-registry file.
87    pub registry_path: PathBuf,
88
89    /// Maximum EM iterations for a cold start (no prior artifact).
90    pub em_max_iter_cold: usize,
91
92    /// Maximum EM iterations for a warm start (similar prior artifact).
93    pub em_max_iter_warm: usize,
94
95    /// Clustering shape parameters.
96    pub cluster_config: ClusterConfig,
97
98    /// Minimum pair-count to prefer GPU comparison (when available).
99    pub gpu_min_batch: usize,
100
101    /// Rate-adaptive threshold tuning.
102    pub rate_config: RateConfig,
103
104    /// Override the EM-estimated Fellegi-Sunter upper (auto-match) threshold.
105    ///
106    /// When `Some(t)`, pairs with a match probability ≥ `t` are auto-matched
107    /// regardless of what EM produces.  Use to tighten precision on high-stakes
108    /// pipelines or to force a specific operating point for benchmarking.
109    /// `None` (default) defers entirely to the EM estimate.
110    #[serde(default)]
111    pub upper_threshold: Option<f32>,
112
113    /// Override the EM-estimated Fellegi-Sunter lower (auto-reject) threshold.
114    ///
115    /// When `Some(t)`, pairs with a match probability ≤ `t` are auto-rejected.
116    /// `None` (default) defers entirely to the EM estimate.
117    #[serde(default)]
118    pub lower_threshold: Option<f32>,
119
120    /// Controls which record pairs are generated during blocking.
121    ///
122    /// `Deduplicate` (default) generates all candidate pairs.
123    /// `LinkOnly` skips pairs where both records share the same source label.
124    /// `LinkAndDedupe` is identical to `Deduplicate` at the pair-generation
125    /// level but is reported differently in `BatchReport`.
126    #[serde(default)]
127    pub link_mode: LinkMode,
128
129    /// Maximum number of records allowed in a blocking bucket before it is
130    /// skipped during candidate-pair generation.
131    ///
132    /// Buckets larger than this threshold have poor selectivity (e.g. a common
133    /// birth year-month) and produce O(n²) spurious pairs that exhaust memory.
134    /// Setting to `0` disables the cap entirely (not recommended for datasets
135    /// larger than a few thousand records).
136    ///
137    /// Default: 300.
138    #[serde(default = "default_max_bucket_size")]
139    pub max_bucket_size: usize,
140
141    /// Explicit field-to-field mappings for cross-schema linkage.
142    ///
143    /// When non-empty the pipeline uses `FieldComparator::compare_pair_mapped`
144    /// and `CompositeBlocker` source remaps instead of the standard single-
145    /// schema path.  Leave empty (default) for same-schema dedupe/link runs.
146    #[serde(default)]
147    pub field_mappings: Vec<FieldMapping>,
148}
149
150const DEFAULT_MAX_BUCKET_SIZE: usize = 300;
151
152fn default_max_bucket_size() -> usize {
153    DEFAULT_MAX_BUCKET_SIZE
154}
155
156impl Default for PipelineConfig {
157    fn default() -> Self {
158        Self {
159            registry_path: PathBuf::from("schema.zsm"),
160            em_max_iter_cold: 25,
161            em_max_iter_warm: 3,
162            cluster_config: ClusterConfig::default(),
163            gpu_min_batch: 1_000,
164            rate_config: RateConfig::default(),
165            upper_threshold: None,
166            lower_threshold: None,
167            link_mode: LinkMode::Deduplicate,
168            max_bucket_size: DEFAULT_MAX_BUCKET_SIZE,
169            field_mappings: Vec::new(),
170        }
171    }
172}
173
174// ── Unit tests ────────────────────────────────────────────────────────────────
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179
180    #[test]
181    fn default_config_has_sensible_values() {
182        let cfg = PipelineConfig::default();
183        assert_eq!(cfg.em_max_iter_cold, 25);
184        assert_eq!(cfg.em_max_iter_warm, 3);
185        assert_eq!(cfg.gpu_min_batch, 1_000);
186    }
187
188    #[test]
189    fn default_threshold_overrides_are_none() {
190        let cfg = PipelineConfig::default();
191        assert!(
192            cfg.upper_threshold.is_none(),
193            "upper_threshold must default to None"
194        );
195        assert!(
196            cfg.lower_threshold.is_none(),
197            "lower_threshold must default to None"
198        );
199    }
200
201    #[test]
202    fn threshold_overrides_round_trip_json() {
203        let cfg = PipelineConfig {
204            upper_threshold: Some(0.92),
205            lower_threshold: Some(0.08),
206            ..Default::default()
207        };
208        let json = serde_json::to_string(&cfg).expect("serialize");
209        let back: PipelineConfig = serde_json::from_str(&json).expect("deserialize");
210        assert_eq!(back.upper_threshold, Some(0.92));
211        assert_eq!(back.lower_threshold, Some(0.08));
212    }
213
214    #[test]
215    fn threshold_override_none_round_trips_from_json_without_field() {
216        // Old configs without the field must deserialize to None (serde default).
217        let json = r#"{"registry_path":"schema.zsm","em_max_iter_cold":25,"em_max_iter_warm":3,"cluster_config":{"max_cluster_size":50,"within_cluster_min":0.85},"gpu_min_batch":1000,"rate_config":{"slow_threshold":1.0,"fast_threshold":100.0,"bulk_threshold_multiplier":1.05}}"#;
218        let cfg: PipelineConfig = serde_json::from_str(json).expect("deserialize");
219        assert!(cfg.upper_threshold.is_none());
220        assert!(cfg.lower_threshold.is_none());
221        // link_mode must default to Deduplicate when absent from old configs
222        assert_eq!(cfg.link_mode, LinkMode::Deduplicate);
223        // max_bucket_size must default to 300 when absent from old configs
224        assert_eq!(cfg.max_bucket_size, 300);
225        // field_mappings must default to empty
226        assert!(cfg.field_mappings.is_empty());
227    }
228
229    #[test]
230    fn max_bucket_size_default_is_300() {
231        let cfg = PipelineConfig::default();
232        assert_eq!(cfg.max_bucket_size, 300);
233    }
234
235    #[test]
236    fn max_bucket_size_round_trips_json() {
237        let cfg = PipelineConfig {
238            max_bucket_size: 500,
239            ..Default::default()
240        };
241        let json = serde_json::to_string(&cfg).expect("serialize");
242        let back: PipelineConfig = serde_json::from_str(&json).expect("deserialize");
243        assert_eq!(back.max_bucket_size, 500);
244    }
245
246    #[test]
247    fn link_mode_default_is_deduplicate() {
248        let cfg = PipelineConfig::default();
249        assert_eq!(cfg.link_mode, LinkMode::Deduplicate);
250    }
251
252    #[test]
253    fn link_mode_round_trips_json() {
254        let cfg = PipelineConfig {
255            link_mode: LinkMode::LinkOnly,
256            ..Default::default()
257        };
258        let json = serde_json::to_string(&cfg).expect("serialize");
259        let back: PipelineConfig = serde_json::from_str(&json).expect("deserialize");
260        assert_eq!(back.link_mode, LinkMode::LinkOnly);
261    }
262
263    #[test]
264    fn link_mode_link_and_dedupe_round_trips_json() {
265        let cfg = PipelineConfig {
266            link_mode: LinkMode::LinkAndDedupe,
267            ..Default::default()
268        };
269        let json = serde_json::to_string(&cfg).expect("serialize");
270        let back: PipelineConfig = serde_json::from_str(&json).expect("deserialize");
271        assert_eq!(back.link_mode, LinkMode::LinkAndDedupe);
272    }
273
274    #[test]
275    fn default_rate_config_thresholds_ordered() {
276        let r = RateConfig::default();
277        assert!(r.slow_threshold < r.fast_threshold);
278        assert!(r.bulk_threshold_multiplier > 1.0);
279    }
280
281    #[test]
282    fn pipeline_config_roundtrip_json() {
283        let cfg = PipelineConfig::default();
284        let json = serde_json::to_string(&cfg).expect("serialize");
285        let back: PipelineConfig = serde_json::from_str(&json).expect("deserialize");
286        assert_eq!(cfg.em_max_iter_cold, back.em_max_iter_cold);
287        assert_eq!(cfg.em_max_iter_warm, back.em_max_iter_warm);
288        assert_eq!(
289            cfg.rate_config.fast_threshold,
290            back.rate_config.fast_threshold
291        );
292    }
293
294    #[test]
295    fn cluster_config_default_reasonable() {
296        let cfg = PipelineConfig::default();
297        assert!(cfg.cluster_config.max_cluster_size > 0);
298        assert!(cfg.cluster_config.within_cluster_min > 0.0);
299        assert!(cfg.cluster_config.within_cluster_min < 1.0);
300    }
301}