1use crate::error::FaucetError;
6use crate::quality::config::{
7 BatchCheck, CompareOp, JsonType, OnFailure, QualitySpec, RecordCheck,
8};
9use crate::stage::CompiledPath;
10use serde_json::Value;
11
12#[derive(Debug)]
14pub struct CompiledQuality {
15 pub record: Vec<CompiledRecordCheck>,
16 pub batch: Vec<CompiledBatchCheck>,
17}
18
19#[derive(Debug)]
22pub struct CompiledRecordCheck {
23 pub kind: CompiledRecordKind,
24 pub on_failure: OnFailure,
25 pub name: &'static str,
27 pub field: Option<String>,
30}
31
32#[derive(Debug)]
34pub enum CompiledRecordKind {
35 NotNull {
36 path: CompiledPath,
37 treat_missing_as_null: bool,
38 },
39 NotEmpty {
40 path: CompiledPath,
41 },
42 RegexMatch {
43 path: CompiledPath,
44 re: regex::Regex,
45 },
46 ValueInSet {
47 path: CompiledPath,
48 values: Vec<Value>,
49 },
50 NotInSet {
51 path: CompiledPath,
52 values: Vec<Value>,
53 },
54 Compare {
55 path: CompiledPath,
56 op: CompareOp,
57 value: Value,
58 },
59 TypeIs {
60 path: CompiledPath,
61 expected: JsonType,
62 },
63 StringLength {
64 path: CompiledPath,
65 min: Option<usize>,
66 max: Option<usize>,
67 },
68 #[cfg(feature = "quality-jsonschema")]
69 JsonSchema {
70 validator: jsonschema::Validator,
71 },
72}
73
74#[derive(Debug)]
76pub struct CompiledBatchCheck {
77 pub kind: CompiledBatchKind,
78 pub on_failure: OnFailure,
79 pub name: &'static str,
80 pub field: Option<String>,
81}
82
83#[derive(Debug)]
85pub enum CompiledBatchKind {
86 RowCount {
87 min: Option<usize>,
88 max: Option<usize>,
89 },
90 NullRate {
91 path: CompiledPath,
92 max: f64,
93 },
94 Unique {
95 paths: Vec<CompiledPath>,
96 },
97 DistinctCount {
98 path: CompiledPath,
99 min: Option<usize>,
100 max: Option<usize>,
101 },
102}
103
104fn config_err(msg: impl Into<String>) -> FaucetError {
105 FaucetError::Config(format!("quality: {}", msg.into()))
106}
107
108fn compile_path(field: &str) -> Result<CompiledPath, FaucetError> {
109 CompiledPath::compile(field).map_err(|e| config_err(format!("invalid field '{field}': {e}")))
110}
111
112fn check_record_policy(name: &str, on_failure: OnFailure) -> Result<(), FaucetError> {
114 match on_failure {
115 OnFailure::Quarantine | OnFailure::Abort => Ok(()),
116 OnFailure::QuarantineBatch => Err(config_err(format!(
117 "check '{name}': on_failure 'quarantine_batch' is only valid on aggregate batch checks"
118 ))),
119 }
120}
121
122impl CompiledQuality {
123 pub fn compile(spec: &QualitySpec) -> Result<Self, FaucetError> {
126 let record = spec
127 .record
128 .iter()
129 .map(compile_record_check)
130 .collect::<Result<_, _>>()?;
131 let batch = spec
132 .batch
133 .iter()
134 .map(compile_batch_check)
135 .collect::<Result<_, _>>()?;
136 Ok(Self { record, batch })
137 }
138
139 pub fn requires_dlq(&self) -> bool {
142 self.record
143 .iter()
144 .any(|c| matches!(c.on_failure, OnFailure::Quarantine))
145 || self.batch.iter().any(|c| {
146 matches!(
147 c.on_failure,
148 OnFailure::Quarantine | OnFailure::QuarantineBatch
149 )
150 })
151 }
152}
153
154fn compile_record_check(c: &RecordCheck) -> Result<CompiledRecordCheck, FaucetError> {
155 Ok(match c {
156 RecordCheck::NotNull {
157 field,
158 treat_missing_as_null,
159 on_failure,
160 } => {
161 check_record_policy("not_null", *on_failure)?;
162 CompiledRecordCheck {
163 kind: CompiledRecordKind::NotNull {
164 path: compile_path(field)?,
165 treat_missing_as_null: *treat_missing_as_null,
166 },
167 on_failure: *on_failure,
168 name: "not_null",
169 field: Some(field.clone()),
170 }
171 }
172 RecordCheck::NotEmpty { field, on_failure } => {
173 check_record_policy("not_empty", *on_failure)?;
174 CompiledRecordCheck {
175 kind: CompiledRecordKind::NotEmpty {
176 path: compile_path(field)?,
177 },
178 on_failure: *on_failure,
179 name: "not_empty",
180 field: Some(field.clone()),
181 }
182 }
183 RecordCheck::RegexMatch {
184 field,
185 pattern,
186 on_failure,
187 } => {
188 check_record_policy("regex_match", *on_failure)?;
189 let re = regex::Regex::new(pattern).map_err(|e| {
190 config_err(format!("regex_match: invalid pattern '{pattern}': {e}"))
191 })?;
192 CompiledRecordCheck {
193 kind: CompiledRecordKind::RegexMatch {
194 path: compile_path(field)?,
195 re,
196 },
197 on_failure: *on_failure,
198 name: "regex_match",
199 field: Some(field.clone()),
200 }
201 }
202 RecordCheck::ValueInSet {
203 field,
204 values,
205 on_failure,
206 } => {
207 check_record_policy("value_in_set", *on_failure)?;
208 if values.is_empty() {
209 return Err(config_err("value_in_set: `values` must be non-empty"));
210 }
211 CompiledRecordCheck {
212 kind: CompiledRecordKind::ValueInSet {
213 path: compile_path(field)?,
214 values: values.clone(),
215 },
216 on_failure: *on_failure,
217 name: "value_in_set",
218 field: Some(field.clone()),
219 }
220 }
221 RecordCheck::NotInSet {
222 field,
223 values,
224 on_failure,
225 } => {
226 check_record_policy("not_in_set", *on_failure)?;
227 if values.is_empty() {
228 return Err(config_err("not_in_set: `values` must be non-empty"));
229 }
230 CompiledRecordCheck {
231 kind: CompiledRecordKind::NotInSet {
232 path: compile_path(field)?,
233 values: values.clone(),
234 },
235 on_failure: *on_failure,
236 name: "not_in_set",
237 field: Some(field.clone()),
238 }
239 }
240 RecordCheck::Compare {
241 field,
242 op,
243 value,
244 on_failure,
245 } => {
246 check_record_policy("compare", *on_failure)?;
247 if matches!(
249 op,
250 CompareOp::Gt | CompareOp::Gte | CompareOp::Lt | CompareOp::Lte
251 ) && !value.is_number()
252 {
253 return Err(config_err(format!(
254 "compare: op '{op}' requires a numeric `value`"
255 )));
256 }
257 CompiledRecordCheck {
258 kind: CompiledRecordKind::Compare {
259 path: compile_path(field)?,
260 op: *op,
261 value: value.clone(),
262 },
263 on_failure: *on_failure,
264 name: "compare",
265 field: Some(field.clone()),
266 }
267 }
268 RecordCheck::TypeIs {
269 field,
270 expected,
271 on_failure,
272 } => {
273 check_record_policy("type_is", *on_failure)?;
274 CompiledRecordCheck {
275 kind: CompiledRecordKind::TypeIs {
276 path: compile_path(field)?,
277 expected: *expected,
278 },
279 on_failure: *on_failure,
280 name: "type_is",
281 field: Some(field.clone()),
282 }
283 }
284 RecordCheck::StringLength {
285 field,
286 min,
287 max,
288 on_failure,
289 } => {
290 check_record_policy("string_length", *on_failure)?;
291 if min.is_none() && max.is_none() {
292 return Err(config_err(
293 "string_length: at least one of `min`/`max` is required",
294 ));
295 }
296 if let (Some(lo), Some(hi)) = (min, max)
297 && lo > hi
298 {
299 return Err(config_err("string_length: `min` must be <= `max`"));
300 }
301 CompiledRecordCheck {
302 kind: CompiledRecordKind::StringLength {
303 path: compile_path(field)?,
304 min: *min,
305 max: *max,
306 },
307 on_failure: *on_failure,
308 name: "string_length",
309 field: Some(field.clone()),
310 }
311 }
312 #[cfg(feature = "quality-jsonschema")]
313 RecordCheck::JsonSchema { schema, on_failure } => {
314 check_record_policy("json_schema", *on_failure)?;
315 let validator = jsonschema::validator_for(schema)
316 .map_err(|e| config_err(format!("json_schema: invalid schema: {e}")))?;
317 CompiledRecordCheck {
318 kind: CompiledRecordKind::JsonSchema { validator },
319 on_failure: *on_failure,
320 name: "json_schema",
321 field: None,
322 }
323 }
324 })
325}
326
327fn compile_batch_check(c: &BatchCheck) -> Result<CompiledBatchCheck, FaucetError> {
328 fn check_aggregate_policy(name: &str, on_failure: OnFailure) -> Result<(), FaucetError> {
330 match on_failure {
331 OnFailure::Abort | OnFailure::QuarantineBatch => Ok(()),
332 OnFailure::Quarantine => Err(config_err(format!(
333 "check '{name}': on_failure 'quarantine' is not row-attributable here; \
334 use 'quarantine_batch' or 'abort'"
335 ))),
336 }
337 }
338
339 Ok(match c {
340 BatchCheck::RowCount {
341 min,
342 max,
343 on_failure,
344 } => {
345 check_aggregate_policy("row_count", *on_failure)?;
346 if min.is_none() && max.is_none() {
347 return Err(config_err(
348 "row_count: at least one of `min`/`max` is required",
349 ));
350 }
351 if let (Some(lo), Some(hi)) = (min, max)
352 && lo > hi
353 {
354 return Err(config_err("row_count: `min` must be <= `max`"));
355 }
356 CompiledBatchCheck {
357 kind: CompiledBatchKind::RowCount {
358 min: *min,
359 max: *max,
360 },
361 on_failure: *on_failure,
362 name: "row_count",
363 field: None,
364 }
365 }
366 BatchCheck::NullRate {
367 field,
368 max,
369 on_failure,
370 } => {
371 check_aggregate_policy("null_rate", *on_failure)?;
372 if !(0.0..=1.0).contains(max) {
373 return Err(config_err("null_rate: `max` must be within [0.0, 1.0]"));
374 }
375 CompiledBatchCheck {
376 kind: CompiledBatchKind::NullRate {
377 path: compile_path(field)?,
378 max: *max,
379 },
380 on_failure: *on_failure,
381 name: "null_rate",
382 field: Some(field.clone()),
383 }
384 }
385 BatchCheck::Unique { fields, on_failure } => {
386 check_record_policy("unique", *on_failure)?;
388 if fields.is_empty() {
389 return Err(config_err("unique: `fields` must be non-empty"));
390 }
391 let paths = fields
392 .iter()
393 .map(|f| compile_path(f))
394 .collect::<Result<_, _>>()?;
395 CompiledBatchCheck {
396 kind: CompiledBatchKind::Unique { paths },
397 on_failure: *on_failure,
398 name: "unique",
399 field: Some(fields.join(",")),
400 }
401 }
402 BatchCheck::DistinctCount {
403 field,
404 min,
405 max,
406 on_failure,
407 } => {
408 check_aggregate_policy("distinct_count", *on_failure)?;
409 if min.is_none() && max.is_none() {
410 return Err(config_err(
411 "distinct_count: at least one of `min`/`max` is required",
412 ));
413 }
414 if let (Some(lo), Some(hi)) = (min, max)
415 && lo > hi
416 {
417 return Err(config_err("distinct_count: `min` must be <= `max`"));
418 }
419 CompiledBatchCheck {
420 kind: CompiledBatchKind::DistinctCount {
421 path: compile_path(field)?,
422 min: *min,
423 max: *max,
424 },
425 on_failure: *on_failure,
426 name: "distinct_count",
427 field: Some(field.clone()),
428 }
429 }
430 })
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436 use crate::quality::config::{BatchCheck, CompareOp, OnFailure, QualitySpec, RecordCheck};
437 use serde_json::json;
438
439 fn compile(spec: QualitySpec) -> Result<CompiledQuality, crate::FaucetError> {
440 CompiledQuality::compile(&spec)
441 }
442
443 #[test]
444 fn compiles_valid_spec() {
445 let spec = QualitySpec {
446 record: vec![RecordCheck::NotNull {
447 field: "user_id".into(),
448 treat_missing_as_null: true,
449 on_failure: OnFailure::Quarantine,
450 }],
451 batch: vec![BatchCheck::RowCount {
452 min: Some(1),
453 max: Some(10),
454 on_failure: OnFailure::Abort,
455 }],
456 };
457 let c = compile(spec).unwrap();
458 assert_eq!(c.record.len(), 1);
459 assert_eq!(c.batch.len(), 1);
460 assert!(c.requires_dlq());
461 }
462
463 #[test]
464 fn rejects_bad_regex_at_compile() {
465 let spec = QualitySpec {
466 record: vec![RecordCheck::RegexMatch {
467 field: "email".into(),
468 pattern: "[invalid".into(),
469 on_failure: OnFailure::Quarantine,
470 }],
471 batch: vec![],
472 };
473 assert!(matches!(compile(spec), Err(crate::FaucetError::Config(_))));
474 }
475
476 #[test]
477 fn rejects_disallowed_on_failure_for_record_check() {
478 let spec = QualitySpec {
480 record: vec![RecordCheck::NotEmpty {
481 field: "name".into(),
482 on_failure: OnFailure::QuarantineBatch,
483 }],
484 batch: vec![],
485 };
486 assert!(matches!(compile(spec), Err(crate::FaucetError::Config(_))));
487 }
488
489 #[test]
490 fn rejects_disallowed_on_failure_for_aggregate_batch_check() {
491 let spec = QualitySpec {
493 record: vec![],
494 batch: vec![BatchCheck::RowCount {
495 min: Some(1),
496 max: None,
497 on_failure: OnFailure::Quarantine,
498 }],
499 };
500 assert!(matches!(compile(spec), Err(crate::FaucetError::Config(_))));
501 }
502
503 #[test]
504 fn rejects_empty_bounds() {
505 let spec = QualitySpec {
506 record: vec![RecordCheck::StringLength {
507 field: "name".into(),
508 min: None,
509 max: None,
510 on_failure: OnFailure::Quarantine,
511 }],
512 batch: vec![],
513 };
514 assert!(matches!(compile(spec), Err(crate::FaucetError::Config(_))));
515 }
516
517 #[test]
518 fn rejects_non_numeric_compare_value_for_ordering_op() {
519 let spec = QualitySpec {
520 record: vec![RecordCheck::Compare {
521 field: "age".into(),
522 op: CompareOp::Gte,
523 value: json!("not a number"),
524 on_failure: OnFailure::Abort,
525 }],
526 batch: vec![],
527 };
528 assert!(matches!(compile(spec), Err(crate::FaucetError::Config(_))));
529 }
530
531 #[test]
532 fn rejects_null_rate_out_of_range() {
533 let spec = QualitySpec {
534 record: vec![],
535 batch: vec![BatchCheck::NullRate {
536 field: "name".into(),
537 max: 1.5,
538 on_failure: OnFailure::Abort,
539 }],
540 };
541 assert!(matches!(compile(spec), Err(crate::FaucetError::Config(_))));
542 }
543
544 #[test]
545 fn requires_dlq_false_when_only_abort() {
546 let spec = QualitySpec {
547 record: vec![RecordCheck::NotNull {
548 field: "id".into(),
549 treat_missing_as_null: true,
550 on_failure: OnFailure::Abort,
551 }],
552 batch: vec![],
553 };
554 assert!(!compile(spec).unwrap().requires_dlq());
555 }
556
557 #[test]
558 fn rejects_row_count_min_gt_max() {
559 let spec = QualitySpec {
560 record: vec![],
561 batch: vec![BatchCheck::RowCount {
562 min: Some(100),
563 max: Some(1),
564 on_failure: OnFailure::Abort,
565 }],
566 };
567 assert!(matches!(compile(spec), Err(crate::FaucetError::Config(_))));
568 }
569
570 #[test]
571 fn rejects_distinct_count_min_gt_max() {
572 let spec = QualitySpec {
573 record: vec![],
574 batch: vec![BatchCheck::DistinctCount {
575 field: "x".into(),
576 min: Some(5),
577 max: Some(2),
578 on_failure: OnFailure::Abort,
579 }],
580 };
581 assert!(matches!(compile(spec), Err(crate::FaucetError::Config(_))));
582 }
583}