Skip to main content

faucet_core/quality/
config.rs

1//! Config-shaped types for the data-quality layer. Pure declarations — no
2//! evaluation logic (that lives in `record.rs` / `batch.rs`) and no
3//! compilation (that lives in `compile.rs`).
4
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8
9/// What to do when a check fails. The allowed subset is validated per check
10/// at compile time (see `compile.rs`).
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
12#[serde(rename_all = "snake_case")]
13pub enum OnFailure {
14    /// Route the specific offending row(s) to the DLQ; keep the rest.
15    Quarantine,
16    /// Route all survivors of the page to the DLQ; write nothing this page.
17    QuarantineBatch,
18    /// Surface `FaucetError::QualityFailure` and fail the run.
19    Abort,
20}
21
22/// Ordering / equality operator for the `compare` check.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
24#[serde(rename_all = "snake_case")]
25pub enum CompareOp {
26    /// Greater than: `field > value`. Both must be JSON numbers.
27    Gt,
28    /// Greater than or equal: `field >= value`. Both must be JSON numbers.
29    Gte,
30    /// Less than: `field < value`. Both must be JSON numbers.
31    Lt,
32    /// Less than or equal: `field <= value`. Both must be JSON numbers.
33    Lte,
34    /// Exact JSON equality (no type coercion: string `"5"` != number `5`).
35    Eq,
36    /// Exact JSON inequality (no type coercion).
37    Ne,
38}
39
40impl std::fmt::Display for CompareOp {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        f.write_str(match self {
43            CompareOp::Gt => "gt",
44            CompareOp::Gte => "gte",
45            CompareOp::Lt => "lt",
46            CompareOp::Lte => "lte",
47            CompareOp::Eq => "eq",
48            CompareOp::Ne => "ne",
49        })
50    }
51}
52
53/// Expected JSON type for the `type_is` check.
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
55#[serde(rename_all = "snake_case")]
56pub enum JsonType {
57    /// JSON boolean (`true` / `false`).
58    Boolean,
59    /// JSON number (integer or float).
60    Number,
61    /// JSON string.
62    String,
63    /// JSON array.
64    Array,
65    /// JSON object.
66    Object,
67    /// JSON null. Note: a *missing* field is distinct from an explicit `null`.
68    Null,
69}
70
71impl std::fmt::Display for JsonType {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        f.write_str(match self {
74            JsonType::Boolean => "boolean",
75            JsonType::Number => "number",
76            JsonType::String => "string",
77            JsonType::Array => "array",
78            JsonType::Object => "object",
79            JsonType::Null => "null",
80        })
81    }
82}
83
84fn default_true() -> bool {
85    true
86}
87
88/// The `quality:` config block. Per-record checks run first (partitioning the
89/// page into survivors + quarantined); per-batch checks then run over the
90/// survivors.
91#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
92pub struct QualitySpec {
93    /// Per-record checks, evaluated in declared order (first failure wins).
94    #[serde(default)]
95    pub record: Vec<RecordCheck>,
96    /// Per-batch checks, evaluated per page over the survivors.
97    #[serde(default)]
98    pub batch: Vec<BatchCheck>,
99}
100
101/// A per-record check. Addressed field accepts the filter/explode path subset
102/// (bare key, `dot.path`, `$['bracketed']`).
103#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
104#[serde(tag = "type", rename_all = "snake_case")]
105pub enum RecordCheck {
106    /// Field present and non-null.
107    NotNull {
108        field: String,
109        /// When `true` (default) a missing field fails; when `false` only an
110        /// explicit JSON `null` fails.
111        #[serde(default = "default_true")]
112        treat_missing_as_null: bool,
113        on_failure: OnFailure,
114    },
115    /// Field is a string, non-empty after `trim()`.
116    NotEmpty {
117        field: String,
118        on_failure: OnFailure,
119    },
120    /// Field is a string matching `pattern`.
121    RegexMatch {
122        field: String,
123        pattern: String,
124        on_failure: OnFailure,
125    },
126    /// Field value is a member of `values` (exact JSON equality).
127    ValueInSet {
128        field: String,
129        values: Vec<Value>,
130        on_failure: OnFailure,
131    },
132    /// Field value is NOT a member of `values` (exact JSON equality).
133    NotInSet {
134        field: String,
135        values: Vec<Value>,
136        on_failure: OnFailure,
137    },
138    /// Field value compares against `value` under `op`.
139    Compare {
140        field: String,
141        op: CompareOp,
142        value: Value,
143        on_failure: OnFailure,
144    },
145    /// Field's JSON type equals `expected`.
146    TypeIs {
147        field: String,
148        expected: JsonType,
149        on_failure: OnFailure,
150    },
151    /// Field is a string whose char count is within `[min, max]`.
152    StringLength {
153        field: String,
154        #[serde(default)]
155        min: Option<usize>,
156        #[serde(default)]
157        max: Option<usize>,
158        on_failure: OnFailure,
159    },
160    /// The whole record validates against a JSON Schema document.
161    #[cfg(feature = "quality-jsonschema")]
162    JsonSchema {
163        schema: Value,
164        on_failure: OnFailure,
165    },
166}
167
168/// A per-batch check, evaluated per page over the survivors of the per-record
169/// pass.
170#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
171#[serde(tag = "type", rename_all = "snake_case")]
172pub enum BatchCheck {
173    /// Survivor count is within `[min, max]` (at least one bound required).
174    RowCount {
175        #[serde(default)]
176        min: Option<usize>,
177        #[serde(default)]
178        max: Option<usize>,
179        on_failure: OnFailure,
180    },
181    /// Null-or-missing rate of `field` across survivors is `<= max`.
182    NullRate {
183        field: String,
184        /// Maximum allowed null-or-missing proportion, in `[0.0, 1.0]`. Out-of-range values are rejected at compile time.
185        max: f64,
186        on_failure: OnFailure,
187    },
188    /// The composite `fields` tuple is unique across survivors.
189    Unique {
190        fields: Vec<String>,
191        on_failure: OnFailure,
192    },
193    /// Distinct values of `field` across survivors is within `[min, max]`.
194    DistinctCount {
195        field: String,
196        #[serde(default)]
197        min: Option<usize>,
198        #[serde(default)]
199        max: Option<usize>,
200        on_failure: OnFailure,
201    },
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207
208    #[test]
209    fn on_failure_serializes_snake_case() {
210        assert_eq!(
211            serde_json::to_string(&OnFailure::QuarantineBatch).unwrap(),
212            "\"quarantine_batch\""
213        );
214    }
215
216    #[test]
217    fn compare_op_round_trips() {
218        let op: CompareOp = serde_json::from_str("\"gte\"").unwrap();
219        assert_eq!(op, CompareOp::Gte);
220    }
221
222    #[test]
223    fn json_type_round_trips() {
224        let t: JsonType = serde_json::from_str("\"boolean\"").unwrap();
225        assert_eq!(t, JsonType::Boolean);
226    }
227
228    #[test]
229    fn parses_full_quality_block() {
230        let spec: QualitySpec = serde_json::from_value(serde_json::json!({
231            "record": [
232                { "type": "not_null", "field": "user_id", "on_failure": "quarantine" },
233                { "type": "compare", "field": "age", "op": "gte", "value": 0, "on_failure": "abort" },
234                { "type": "string_length", "field": "name", "min": 1, "max": 256, "on_failure": "quarantine" }
235            ],
236            "batch": [
237                { "type": "row_count", "min": 1, "max": 100000, "on_failure": "abort" },
238                { "type": "unique", "fields": ["id"], "on_failure": "quarantine" }
239            ]
240        }))
241        .unwrap();
242        assert_eq!(spec.record.len(), 3);
243        assert_eq!(spec.batch.len(), 2);
244        assert!(matches!(spec.record[0], RecordCheck::NotNull { .. }));
245        assert!(matches!(spec.batch[1], BatchCheck::Unique { .. }));
246        if let RecordCheck::NotNull {
247            treat_missing_as_null,
248            ..
249        } = &spec.record[0]
250        {
251            assert!(
252                *treat_missing_as_null,
253                "treat_missing_as_null defaults to true"
254            );
255        } else {
256            panic!("expected first record check to be NotNull");
257        }
258    }
259
260    #[test]
261    fn empty_quality_block_defaults_to_no_checks() {
262        let spec: QualitySpec = serde_json::from_str("{}").unwrap();
263        assert!(spec.record.is_empty());
264        assert!(spec.batch.is_empty());
265    }
266}