Skip to main content

faucet_core/quality/
compile.rs

1//! Compile a `QualitySpec` into a `CompiledQuality`: parse paths/regexes/
2//! schemas once, validate the `on_failure` subset and numeric bounds. Failures
3//! surface as `FaucetError::Config`.
4
5use crate::error::FaucetError;
6use crate::quality::config::{
7    BatchCheck, CompareOp, JsonType, OnFailure, QualitySpec, RecordCheck,
8};
9use crate::stage::CompiledPath;
10use serde_json::Value;
11
12/// A fully compiled quality spec. Built once via [`CompiledQuality::compile`].
13#[derive(Debug)]
14pub struct CompiledQuality {
15    pub record: Vec<CompiledRecordCheck>,
16    pub batch: Vec<CompiledBatchCheck>,
17}
18
19/// One compiled per-record check: the check kind plus its failure policy and
20/// labelling metadata.
21#[derive(Debug)]
22pub struct CompiledRecordCheck {
23    pub kind: CompiledRecordKind,
24    pub on_failure: OnFailure,
25    /// Stable metric-label name, e.g. `"not_null"`.
26    pub name: &'static str,
27    /// Addressed field (for envelope/metric `field` label); `None` for whole-
28    /// record checks like `json_schema`.
29    pub field: Option<String>,
30}
31
32/// Compiled form of a per-record check, keyed by check kind.
33#[derive(Debug)]
34pub enum CompiledRecordKind {
35    NotNull {
36        path: CompiledPath,
37        treat_missing_as_null: bool,
38    },
39    NotEmpty {
40        path: CompiledPath,
41    },
42    RegexMatch {
43        path: CompiledPath,
44        re: regex::Regex,
45    },
46    ValueInSet {
47        path: CompiledPath,
48        values: Vec<Value>,
49    },
50    NotInSet {
51        path: CompiledPath,
52        values: Vec<Value>,
53    },
54    Compare {
55        path: CompiledPath,
56        op: CompareOp,
57        value: Value,
58    },
59    TypeIs {
60        path: CompiledPath,
61        expected: JsonType,
62    },
63    StringLength {
64        path: CompiledPath,
65        min: Option<usize>,
66        max: Option<usize>,
67    },
68    #[cfg(feature = "quality-jsonschema")]
69    JsonSchema {
70        validator: jsonschema::Validator,
71    },
72}
73
74/// One compiled per-batch check.
75#[derive(Debug)]
76pub struct CompiledBatchCheck {
77    pub kind: CompiledBatchKind,
78    pub on_failure: OnFailure,
79    pub name: &'static str,
80    pub field: Option<String>,
81}
82
83/// Compiled form of a per-batch check, keyed by check kind.
84#[derive(Debug)]
85pub enum CompiledBatchKind {
86    RowCount {
87        min: Option<usize>,
88        max: Option<usize>,
89    },
90    NullRate {
91        path: CompiledPath,
92        max: f64,
93    },
94    Unique {
95        paths: Vec<CompiledPath>,
96    },
97    DistinctCount {
98        path: CompiledPath,
99        min: Option<usize>,
100        max: Option<usize>,
101    },
102}
103
104fn config_err(msg: impl Into<String>) -> FaucetError {
105    FaucetError::Config(format!("quality: {}", msg.into()))
106}
107
108fn compile_path(field: &str) -> Result<CompiledPath, FaucetError> {
109    CompiledPath::compile(field).map_err(|e| config_err(format!("invalid field '{field}': {e}")))
110}
111
112/// Per-record checks accept only `quarantine` / `abort`.
113fn check_record_policy(name: &str, on_failure: OnFailure) -> Result<(), FaucetError> {
114    match on_failure {
115        OnFailure::Quarantine | OnFailure::Abort => Ok(()),
116        OnFailure::QuarantineBatch => Err(config_err(format!(
117            "check '{name}': on_failure 'quarantine_batch' is only valid on aggregate batch checks"
118        ))),
119    }
120}
121
122impl CompiledQuality {
123    /// Compile and validate a [`QualitySpec`]. Returns [`FaucetError::Config`]
124    /// on any invalid path, regex, schema, bound, or `on_failure` choice.
125    pub fn compile(spec: &QualitySpec) -> Result<Self, FaucetError> {
126        let record = spec
127            .record
128            .iter()
129            .map(compile_record_check)
130            .collect::<Result<_, _>>()?;
131        let batch = spec
132            .batch
133            .iter()
134            .map(compile_batch_check)
135            .collect::<Result<_, _>>()?;
136        Ok(Self { record, batch })
137    }
138
139    /// True if any check would route records to the DLQ (so a DLQ sink is
140    /// required). Checked by the pipeline at run start.
141    pub fn requires_dlq(&self) -> bool {
142        self.record
143            .iter()
144            .any(|c| matches!(c.on_failure, OnFailure::Quarantine))
145            || self.batch.iter().any(|c| {
146                matches!(
147                    c.on_failure,
148                    OnFailure::Quarantine | OnFailure::QuarantineBatch
149                )
150            })
151    }
152}
153
154fn compile_record_check(c: &RecordCheck) -> Result<CompiledRecordCheck, FaucetError> {
155    Ok(match c {
156        RecordCheck::NotNull {
157            field,
158            treat_missing_as_null,
159            on_failure,
160        } => {
161            check_record_policy("not_null", *on_failure)?;
162            CompiledRecordCheck {
163                kind: CompiledRecordKind::NotNull {
164                    path: compile_path(field)?,
165                    treat_missing_as_null: *treat_missing_as_null,
166                },
167                on_failure: *on_failure,
168                name: "not_null",
169                field: Some(field.clone()),
170            }
171        }
172        RecordCheck::NotEmpty { field, on_failure } => {
173            check_record_policy("not_empty", *on_failure)?;
174            CompiledRecordCheck {
175                kind: CompiledRecordKind::NotEmpty {
176                    path: compile_path(field)?,
177                },
178                on_failure: *on_failure,
179                name: "not_empty",
180                field: Some(field.clone()),
181            }
182        }
183        RecordCheck::RegexMatch {
184            field,
185            pattern,
186            on_failure,
187        } => {
188            check_record_policy("regex_match", *on_failure)?;
189            let re = regex::Regex::new(pattern).map_err(|e| {
190                config_err(format!("regex_match: invalid pattern '{pattern}': {e}"))
191            })?;
192            CompiledRecordCheck {
193                kind: CompiledRecordKind::RegexMatch {
194                    path: compile_path(field)?,
195                    re,
196                },
197                on_failure: *on_failure,
198                name: "regex_match",
199                field: Some(field.clone()),
200            }
201        }
202        RecordCheck::ValueInSet {
203            field,
204            values,
205            on_failure,
206        } => {
207            check_record_policy("value_in_set", *on_failure)?;
208            if values.is_empty() {
209                return Err(config_err("value_in_set: `values` must be non-empty"));
210            }
211            CompiledRecordCheck {
212                kind: CompiledRecordKind::ValueInSet {
213                    path: compile_path(field)?,
214                    values: values.clone(),
215                },
216                on_failure: *on_failure,
217                name: "value_in_set",
218                field: Some(field.clone()),
219            }
220        }
221        RecordCheck::NotInSet {
222            field,
223            values,
224            on_failure,
225        } => {
226            check_record_policy("not_in_set", *on_failure)?;
227            if values.is_empty() {
228                return Err(config_err("not_in_set: `values` must be non-empty"));
229            }
230            CompiledRecordCheck {
231                kind: CompiledRecordKind::NotInSet {
232                    path: compile_path(field)?,
233                    values: values.clone(),
234                },
235                on_failure: *on_failure,
236                name: "not_in_set",
237                field: Some(field.clone()),
238            }
239        }
240        RecordCheck::Compare {
241            field,
242            op,
243            value,
244            on_failure,
245        } => {
246            check_record_policy("compare", *on_failure)?;
247            // Ordering ops require a numeric configured value.
248            if matches!(
249                op,
250                CompareOp::Gt | CompareOp::Gte | CompareOp::Lt | CompareOp::Lte
251            ) && !value.is_number()
252            {
253                return Err(config_err(format!(
254                    "compare: op '{op}' requires a numeric `value`"
255                )));
256            }
257            CompiledRecordCheck {
258                kind: CompiledRecordKind::Compare {
259                    path: compile_path(field)?,
260                    op: *op,
261                    value: value.clone(),
262                },
263                on_failure: *on_failure,
264                name: "compare",
265                field: Some(field.clone()),
266            }
267        }
268        RecordCheck::TypeIs {
269            field,
270            expected,
271            on_failure,
272        } => {
273            check_record_policy("type_is", *on_failure)?;
274            CompiledRecordCheck {
275                kind: CompiledRecordKind::TypeIs {
276                    path: compile_path(field)?,
277                    expected: *expected,
278                },
279                on_failure: *on_failure,
280                name: "type_is",
281                field: Some(field.clone()),
282            }
283        }
284        RecordCheck::StringLength {
285            field,
286            min,
287            max,
288            on_failure,
289        } => {
290            check_record_policy("string_length", *on_failure)?;
291            if min.is_none() && max.is_none() {
292                return Err(config_err(
293                    "string_length: at least one of `min`/`max` is required",
294                ));
295            }
296            if let (Some(lo), Some(hi)) = (min, max)
297                && lo > hi
298            {
299                return Err(config_err("string_length: `min` must be <= `max`"));
300            }
301            CompiledRecordCheck {
302                kind: CompiledRecordKind::StringLength {
303                    path: compile_path(field)?,
304                    min: *min,
305                    max: *max,
306                },
307                on_failure: *on_failure,
308                name: "string_length",
309                field: Some(field.clone()),
310            }
311        }
312        #[cfg(feature = "quality-jsonschema")]
313        RecordCheck::JsonSchema { schema, on_failure } => {
314            check_record_policy("json_schema", *on_failure)?;
315            let validator = jsonschema::validator_for(schema)
316                .map_err(|e| config_err(format!("json_schema: invalid schema: {e}")))?;
317            CompiledRecordCheck {
318                kind: CompiledRecordKind::JsonSchema { validator },
319                on_failure: *on_failure,
320                name: "json_schema",
321                field: None,
322            }
323        }
324    })
325}
326
327fn compile_batch_check(c: &BatchCheck) -> Result<CompiledBatchCheck, FaucetError> {
328    // Aggregate batch checks accept only `abort` / `quarantine_batch`.
329    fn check_aggregate_policy(name: &str, on_failure: OnFailure) -> Result<(), FaucetError> {
330        match on_failure {
331            OnFailure::Abort | OnFailure::QuarantineBatch => Ok(()),
332            OnFailure::Quarantine => Err(config_err(format!(
333                "check '{name}': on_failure 'quarantine' is not row-attributable here; \
334                 use 'quarantine_batch' or 'abort'"
335            ))),
336        }
337    }
338
339    Ok(match c {
340        BatchCheck::RowCount {
341            min,
342            max,
343            on_failure,
344        } => {
345            check_aggregate_policy("row_count", *on_failure)?;
346            if min.is_none() && max.is_none() {
347                return Err(config_err(
348                    "row_count: at least one of `min`/`max` is required",
349                ));
350            }
351            if let (Some(lo), Some(hi)) = (min, max)
352                && lo > hi
353            {
354                return Err(config_err("row_count: `min` must be <= `max`"));
355            }
356            CompiledBatchCheck {
357                kind: CompiledBatchKind::RowCount {
358                    min: *min,
359                    max: *max,
360                },
361                on_failure: *on_failure,
362                name: "row_count",
363                field: None,
364            }
365        }
366        BatchCheck::NullRate {
367            field,
368            max,
369            on_failure,
370        } => {
371            check_aggregate_policy("null_rate", *on_failure)?;
372            if !(0.0..=1.0).contains(max) {
373                return Err(config_err("null_rate: `max` must be within [0.0, 1.0]"));
374            }
375            CompiledBatchCheck {
376                kind: CompiledBatchKind::NullRate {
377                    path: compile_path(field)?,
378                    max: *max,
379                },
380                on_failure: *on_failure,
381                name: "null_rate",
382                field: Some(field.clone()),
383            }
384        }
385        BatchCheck::Unique { fields, on_failure } => {
386            // Unique IS row-attributable: accept quarantine / abort.
387            check_record_policy("unique", *on_failure)?;
388            if fields.is_empty() {
389                return Err(config_err("unique: `fields` must be non-empty"));
390            }
391            let paths = fields
392                .iter()
393                .map(|f| compile_path(f))
394                .collect::<Result<_, _>>()?;
395            CompiledBatchCheck {
396                kind: CompiledBatchKind::Unique { paths },
397                on_failure: *on_failure,
398                name: "unique",
399                field: Some(fields.join(",")),
400            }
401        }
402        BatchCheck::DistinctCount {
403            field,
404            min,
405            max,
406            on_failure,
407        } => {
408            check_aggregate_policy("distinct_count", *on_failure)?;
409            if min.is_none() && max.is_none() {
410                return Err(config_err(
411                    "distinct_count: at least one of `min`/`max` is required",
412                ));
413            }
414            if let (Some(lo), Some(hi)) = (min, max)
415                && lo > hi
416            {
417                return Err(config_err("distinct_count: `min` must be <= `max`"));
418            }
419            CompiledBatchCheck {
420                kind: CompiledBatchKind::DistinctCount {
421                    path: compile_path(field)?,
422                    min: *min,
423                    max: *max,
424                },
425                on_failure: *on_failure,
426                name: "distinct_count",
427                field: Some(field.clone()),
428            }
429        }
430    })
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436    use crate::quality::config::{BatchCheck, CompareOp, OnFailure, QualitySpec, RecordCheck};
437    use serde_json::json;
438
439    fn compile(spec: QualitySpec) -> Result<CompiledQuality, crate::FaucetError> {
440        CompiledQuality::compile(&spec)
441    }
442
443    #[test]
444    fn compiles_valid_spec() {
445        let spec = QualitySpec {
446            record: vec![RecordCheck::NotNull {
447                field: "user_id".into(),
448                treat_missing_as_null: true,
449                on_failure: OnFailure::Quarantine,
450            }],
451            batch: vec![BatchCheck::RowCount {
452                min: Some(1),
453                max: Some(10),
454                on_failure: OnFailure::Abort,
455            }],
456        };
457        let c = compile(spec).unwrap();
458        assert_eq!(c.record.len(), 1);
459        assert_eq!(c.batch.len(), 1);
460        assert!(c.requires_dlq());
461    }
462
463    #[test]
464    fn rejects_bad_regex_at_compile() {
465        let spec = QualitySpec {
466            record: vec![RecordCheck::RegexMatch {
467                field: "email".into(),
468                pattern: "[invalid".into(),
469                on_failure: OnFailure::Quarantine,
470            }],
471            batch: vec![],
472        };
473        assert!(matches!(compile(spec), Err(crate::FaucetError::Config(_))));
474    }
475
476    #[test]
477    fn rejects_disallowed_on_failure_for_record_check() {
478        // quarantine_batch is not valid on a per-record check.
479        let spec = QualitySpec {
480            record: vec![RecordCheck::NotEmpty {
481                field: "name".into(),
482                on_failure: OnFailure::QuarantineBatch,
483            }],
484            batch: vec![],
485        };
486        assert!(matches!(compile(spec), Err(crate::FaucetError::Config(_))));
487    }
488
489    #[test]
490    fn rejects_disallowed_on_failure_for_aggregate_batch_check() {
491        // quarantine (row-attributable) is not valid on row_count.
492        let spec = QualitySpec {
493            record: vec![],
494            batch: vec![BatchCheck::RowCount {
495                min: Some(1),
496                max: None,
497                on_failure: OnFailure::Quarantine,
498            }],
499        };
500        assert!(matches!(compile(spec), Err(crate::FaucetError::Config(_))));
501    }
502
503    #[test]
504    fn rejects_empty_bounds() {
505        let spec = QualitySpec {
506            record: vec![RecordCheck::StringLength {
507                field: "name".into(),
508                min: None,
509                max: None,
510                on_failure: OnFailure::Quarantine,
511            }],
512            batch: vec![],
513        };
514        assert!(matches!(compile(spec), Err(crate::FaucetError::Config(_))));
515    }
516
517    #[test]
518    fn rejects_non_numeric_compare_value_for_ordering_op() {
519        let spec = QualitySpec {
520            record: vec![RecordCheck::Compare {
521                field: "age".into(),
522                op: CompareOp::Gte,
523                value: json!("not a number"),
524                on_failure: OnFailure::Abort,
525            }],
526            batch: vec![],
527        };
528        assert!(matches!(compile(spec), Err(crate::FaucetError::Config(_))));
529    }
530
531    #[test]
532    fn rejects_null_rate_out_of_range() {
533        let spec = QualitySpec {
534            record: vec![],
535            batch: vec![BatchCheck::NullRate {
536                field: "name".into(),
537                max: 1.5,
538                on_failure: OnFailure::Abort,
539            }],
540        };
541        assert!(matches!(compile(spec), Err(crate::FaucetError::Config(_))));
542    }
543
544    #[test]
545    fn requires_dlq_false_when_only_abort() {
546        let spec = QualitySpec {
547            record: vec![RecordCheck::NotNull {
548                field: "id".into(),
549                treat_missing_as_null: true,
550                on_failure: OnFailure::Abort,
551            }],
552            batch: vec![],
553        };
554        assert!(!compile(spec).unwrap().requires_dlq());
555    }
556
557    #[test]
558    fn rejects_row_count_min_gt_max() {
559        let spec = QualitySpec {
560            record: vec![],
561            batch: vec![BatchCheck::RowCount {
562                min: Some(100),
563                max: Some(1),
564                on_failure: OnFailure::Abort,
565            }],
566        };
567        assert!(matches!(compile(spec), Err(crate::FaucetError::Config(_))));
568    }
569
570    #[test]
571    fn rejects_distinct_count_min_gt_max() {
572        let spec = QualitySpec {
573            record: vec![],
574            batch: vec![BatchCheck::DistinctCount {
575                field: "x".into(),
576                min: Some(5),
577                max: Some(2),
578                on_failure: OnFailure::Abort,
579            }],
580        };
581        assert!(matches!(compile(spec), Err(crate::FaucetError::Config(_))));
582    }
583}