Skip to main content

faucet_core/quality/
mod.rs

1//! Data-quality checks: declarative per-record and per-batch assertions that
2//! quarantine violating records to the DLQ or abort the run. Pure evaluation;
3//! the pipeline wires the DLQ routing in `run_stream`.
4//!
5//! See `docs/superpowers/specs/2026-05-29-quality-checks-design.md`.
6
7pub mod batch;
8pub mod compile;
9pub mod config;
10pub mod record;
11
12pub use compile::CompiledQuality;
13pub use config::{BatchCheck, CompareOp, JsonType, OnFailure, QualitySpec, RecordCheck};
14
15use crate::error::FaucetError;
16use crate::quality::batch::{evaluate_aggregate_check, evaluate_unique_check};
17use crate::quality::compile::CompiledBatchKind;
18use crate::quality::record::evaluate_record_check;
19use serde_json::Value;
20use std::collections::HashMap;
21use std::time::{Duration, Instant};
22
23/// A record removed from the page by a quality check, destined for the DLQ.
24#[derive(Debug, Clone)]
25pub struct QuarantinedRecord {
26    pub record: Value,
27    /// Check name (stable metric-label value), e.g. `"not_null"`.
28    pub check: &'static str,
29    /// Addressed field, if any (for the metric/envelope `field` label).
30    pub field: Option<String>,
31    /// Human-readable failure message (goes into the DLQ envelope error).
32    pub message: String,
33    /// 0-based position **within the original page** (not within the quarantine
34    /// list). This is the value the DLQ envelope's `record_index` must carry —
35    /// the frozen contract is "position within the page that failed".
36    pub page_index: usize,
37}
38
39/// Per-check counters + elapsed time, keyed by check name. Emitted as metrics
40/// by the observability wrapper.
41///
42/// Note: for per-record checks `pass`/`fail` count individual records; for
43/// per-batch checks they count check *evaluations* (at most one per page).
44/// Per-record quarantine volume for batch checks is exposed separately via
45/// the `faucet_quality_records_quarantined_total` metric.
46#[derive(Debug, Clone, Default)]
47pub struct CheckTally {
48    pub pass: u64,
49    pub fail: u64,
50    pub elapsed: Duration,
51}
52
53/// Result of applying the quality pass to one page.
54#[derive(Debug, Clone, Default)]
55pub struct QualityOutcome {
56    /// Records that passed all checks and should be written to the sink.
57    pub survivors: Vec<Value>,
58    /// Records routed to the DLQ.
59    pub quarantined: Vec<QuarantinedRecord>,
60    /// Per-check pass/fail/elapsed tally.
61    pub tally: HashMap<&'static str, CheckTally>,
62}
63
64impl QualityOutcome {
65    fn bump(&mut self, name: &'static str, passed: bool, elapsed: Duration) {
66        let e = self.tally.entry(name).or_default();
67        if passed {
68            e.pass += 1;
69        } else {
70            e.fail += 1;
71        }
72        e.elapsed += elapsed;
73    }
74}
75
76/// Apply the full per-page quality pass. Pure: no metrics, no DLQ I/O.
77///
78/// Order: per-record checks (first-failure-wins) partition the page into
79/// survivors + quarantined; per-batch checks then run over the survivors.
80/// Returns `Err(FaucetError::QualityFailure)` on any `abort`.
81pub fn apply_quality(
82    records: Vec<Value>,
83    q: &CompiledQuality,
84) -> Result<QualityOutcome, FaucetError> {
85    let mut out = QualityOutcome::default();
86    // Original page index of each survivor, kept in lockstep with
87    // `out.survivors`, so a record quarantined later by a *batch* check still
88    // carries its true page position (not its index within the survivor slice).
89    let mut survivor_idx: Vec<usize> = Vec::new();
90
91    // ── Per-record pass ───────────────────────────────────────────────
92    'next_record: for (page_index, rec) in records.into_iter().enumerate() {
93        for check in &q.record {
94            // Per-check timing feeds the per-page duration histogram (catches slow
95            // checks like json_schema). The per-record Instant overhead is negligible
96            // relative to a real check; cheap checks dominate it but the loop is cheap.
97            let start = Instant::now();
98            let result = evaluate_record_check(check, &rec);
99            let elapsed = start.elapsed();
100            match result {
101                Ok(()) => out.bump(check.name, true, elapsed),
102                Err(message) => {
103                    out.bump(check.name, false, elapsed);
104                    match check.on_failure {
105                        OnFailure::Abort => {
106                            return Err(FaucetError::QualityFailure {
107                                check: check.name.to_string(),
108                                message: field_msg(&check.field, &message),
109                            });
110                        }
111                        OnFailure::Quarantine | OnFailure::QuarantineBatch => {
112                            // (compile guarantees record checks are quarantine/abort)
113                            out.quarantined.push(QuarantinedRecord {
114                                record: rec,
115                                check: check.name,
116                                field: check.field.clone(),
117                                message: field_msg(&check.field, &message),
118                                page_index,
119                            });
120                            continue 'next_record;
121                        }
122                    }
123                }
124            }
125        }
126        out.survivors.push(rec);
127        survivor_idx.push(page_index);
128    }
129
130    // ── Per-batch pass over survivors ─────────────────────────────────
131    for check in &q.batch {
132        let start = Instant::now();
133        if matches!(check.kind, CompiledBatchKind::Unique { .. }) {
134            let dups = evaluate_unique_check(check, &out.survivors);
135            let elapsed = start.elapsed();
136            let passed = dups.is_empty();
137            out.bump(check.name, passed, elapsed);
138            if passed {
139                continue;
140            }
141            match check.on_failure {
142                OnFailure::Abort => {
143                    return Err(FaucetError::QualityFailure {
144                        check: check.name.to_string(),
145                        message: format!("{} duplicate row(s)", dups.len()),
146                    });
147                }
148                _ => {
149                    // quarantine the duplicate occurrences (keep first).
150                    let dup_set: std::collections::HashSet<usize> = dups.into_iter().collect();
151                    let mut survivors = Vec::with_capacity(out.survivors.len());
152                    let mut new_idx = Vec::with_capacity(survivor_idx.len());
153                    let old_idx = std::mem::take(&mut survivor_idx);
154                    for (i, rec) in std::mem::take(&mut out.survivors).into_iter().enumerate() {
155                        if dup_set.contains(&i) {
156                            out.quarantined.push(QuarantinedRecord {
157                                record: rec,
158                                check: check.name,
159                                field: check.field.clone(),
160                                message: field_msg(&check.field, "duplicate key"),
161                                page_index: old_idx[i],
162                            });
163                        } else {
164                            survivors.push(rec);
165                            new_idx.push(old_idx[i]);
166                        }
167                    }
168                    out.survivors = survivors;
169                    survivor_idx = new_idx;
170                }
171            }
172        } else {
173            let result = evaluate_aggregate_check(check, &out.survivors);
174            let elapsed = start.elapsed();
175            match result {
176                Ok(()) => out.bump(check.name, true, elapsed),
177                Err(message) => {
178                    out.bump(check.name, false, elapsed);
179                    match check.on_failure {
180                        OnFailure::Abort => {
181                            return Err(FaucetError::QualityFailure {
182                                check: check.name.to_string(),
183                                message: field_msg(&check.field, &message),
184                            });
185                        }
186                        // quarantine_batch: route all survivors to the DLQ.
187                        _ => {
188                            let old_idx = std::mem::take(&mut survivor_idx);
189                            for (rec, page_index) in
190                                std::mem::take(&mut out.survivors).into_iter().zip(old_idx)
191                            {
192                                out.quarantined.push(QuarantinedRecord {
193                                    record: rec,
194                                    check: check.name,
195                                    field: check.field.clone(),
196                                    message: field_msg(&check.field, &message),
197                                    page_index,
198                                });
199                            }
200                        }
201                    }
202                }
203            }
204        }
205    }
206
207    Ok(out)
208}
209
210fn field_msg(field: &Option<String>, message: &str) -> String {
211    match field {
212        Some(f) => format!("field '{f}': {message}"),
213        None => message.to_string(),
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use crate::quality::config::{BatchCheck, CompareOp, OnFailure, RecordCheck};
221    use serde_json::json;
222
223    fn compiled(spec: QualitySpec) -> CompiledQuality {
224        CompiledQuality::compile(&spec).unwrap()
225    }
226
227    #[test]
228    fn per_record_quarantine_partitions_page() {
229        let q = compiled(QualitySpec {
230            record: vec![RecordCheck::NotNull {
231                field: "id".into(),
232                treat_missing_as_null: true,
233                on_failure: OnFailure::Quarantine,
234            }],
235            batch: vec![],
236        });
237        let page = vec![json!({"id": 1}), json!({"id": null}), json!({"id": 3})];
238        let out = apply_quality(page, &q).unwrap();
239        assert_eq!(out.survivors, vec![json!({"id": 1}), json!({"id": 3})]);
240        assert_eq!(out.quarantined.len(), 1);
241        assert_eq!(out.quarantined[0].check, "not_null");
242        assert_eq!(out.quarantined[0].field.as_deref(), Some("id"));
243    }
244
245    #[test]
246    fn quarantined_records_carry_page_index_not_list_index() {
247        // Records at page positions 1 and 3 fail; their `page_index` must be 1
248        // and 3 (the frozen DLQ `record_index` contract), NOT the quarantine-list
249        // indices 0 and 1 (#146 R).
250        let q = compiled(QualitySpec {
251            record: vec![RecordCheck::NotNull {
252                field: "id".into(),
253                treat_missing_as_null: true,
254                on_failure: OnFailure::Quarantine,
255            }],
256            batch: vec![],
257        });
258        let page = vec![
259            json!({"id": 0}),
260            json!({"id": null}),
261            json!({"id": 2}),
262            json!({"id": null}),
263        ];
264        let out = apply_quality(page, &q).unwrap();
265        let idxs: Vec<usize> = out.quarantined.iter().map(|qr| qr.page_index).collect();
266        assert_eq!(
267            idxs,
268            vec![1, 3],
269            "quarantined records must carry their original page position"
270        );
271    }
272
273    #[test]
274    fn batch_quarantined_records_carry_page_index() {
275        // A unique check: positions 1 and 3 duplicate position 0's key. They
276        // survive the (empty) per-record pass and are quarantined by the BATCH
277        // pass, yet must still carry page positions 1 and 3 — proving the page
278        // index survives the survivor-slice re-indexing.
279        let q = compiled(QualitySpec {
280            record: vec![],
281            batch: vec![BatchCheck::Unique {
282                fields: vec!["k".into()],
283                on_failure: OnFailure::Quarantine,
284            }],
285        });
286        let page = vec![
287            json!({"k": "a"}),
288            json!({"k": "a"}),
289            json!({"k": "b"}),
290            json!({"k": "a"}),
291        ];
292        let out = apply_quality(page, &q).unwrap();
293        let mut idxs: Vec<usize> = out.quarantined.iter().map(|qr| qr.page_index).collect();
294        idxs.sort_unstable();
295        assert_eq!(
296            idxs,
297            vec![1, 3],
298            "batch-quarantined duplicates must carry their original page positions"
299        );
300    }
301
302    #[test]
303    fn per_record_abort_returns_err() {
304        let q = compiled(QualitySpec {
305            record: vec![RecordCheck::Compare {
306                field: "age".into(),
307                op: CompareOp::Gte,
308                value: json!(0),
309                on_failure: OnFailure::Abort,
310            }],
311            batch: vec![],
312        });
313        let page = vec![json!({"age": 5}), json!({"age": -1})];
314        let err = apply_quality(page, &q).unwrap_err();
315        assert!(matches!(err, crate::FaucetError::QualityFailure { .. }));
316    }
317
318    #[test]
319    fn batch_eval_runs_over_survivors() {
320        // not_null quarantines the null id; row_count(min:2) then sees only 1
321        // survivor and aborts.
322        let q = compiled(QualitySpec {
323            record: vec![RecordCheck::NotNull {
324                field: "id".into(),
325                treat_missing_as_null: true,
326                on_failure: OnFailure::Quarantine,
327            }],
328            batch: vec![BatchCheck::RowCount {
329                min: Some(2),
330                max: None,
331                on_failure: OnFailure::Abort,
332            }],
333        });
334        let page = vec![json!({"id": 1}), json!({"id": null})];
335        let err = apply_quality(page, &q).unwrap_err();
336        assert!(
337            matches!(err, crate::FaucetError::QualityFailure { check, .. } if check == "row_count")
338        );
339    }
340
341    #[test]
342    fn unique_quarantine_routes_dupes_keeps_first() {
343        let q = compiled(QualitySpec {
344            record: vec![],
345            batch: vec![BatchCheck::Unique {
346                fields: vec!["id".into()],
347                on_failure: OnFailure::Quarantine,
348            }],
349        });
350        let page = vec![json!({"id": 1}), json!({"id": 1}), json!({"id": 2})];
351        let out = apply_quality(page, &q).unwrap();
352        assert_eq!(out.survivors, vec![json!({"id": 1}), json!({"id": 2})]);
353        assert_eq!(out.quarantined.len(), 1);
354        assert_eq!(out.quarantined[0].check, "unique");
355    }
356
357    #[test]
358    fn quarantine_batch_routes_all_survivors() {
359        let q = compiled(QualitySpec {
360            record: vec![],
361            batch: vec![BatchCheck::RowCount {
362                min: Some(10),
363                max: None,
364                on_failure: OnFailure::QuarantineBatch,
365            }],
366        });
367        let page = vec![json!({"id": 1}), json!({"id": 2})];
368        let out = apply_quality(page, &q).unwrap();
369        assert!(out.survivors.is_empty());
370        assert_eq!(out.quarantined.len(), 2);
371    }
372
373    #[test]
374    fn tally_counts_pass_and_fail() {
375        let q = compiled(QualitySpec {
376            record: vec![RecordCheck::NotNull {
377                field: "id".into(),
378                treat_missing_as_null: true,
379                on_failure: OnFailure::Quarantine,
380            }],
381            batch: vec![],
382        });
383        let page = vec![json!({"id": 1}), json!({"id": null})];
384        let out = apply_quality(page, &q).unwrap();
385        let t = out.tally.get("not_null").unwrap();
386        assert_eq!(t.pass, 1);
387        assert_eq!(t.fail, 1);
388    }
389}