pub mod batch;
pub mod compile;
pub mod config;
pub mod record;
pub use compile::CompiledQuality;
pub use config::{BatchCheck, CompareOp, JsonType, OnFailure, QualitySpec, RecordCheck};
use crate::error::FaucetError;
use crate::quality::batch::{evaluate_aggregate_check, evaluate_unique_check};
use crate::quality::compile::CompiledBatchKind;
use crate::quality::record::evaluate_record_check;
use serde_json::Value;
use std::collections::HashMap;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct QuarantinedRecord {
pub record: Value,
pub check: &'static str,
pub field: Option<String>,
pub message: String,
pub page_index: usize,
}
#[derive(Debug, Clone, Default)]
pub struct CheckTally {
pub pass: u64,
pub fail: u64,
pub elapsed: Duration,
}
#[derive(Debug, Clone, Default)]
pub struct QualityOutcome {
pub survivors: Vec<Value>,
pub quarantined: Vec<QuarantinedRecord>,
pub tally: HashMap<&'static str, CheckTally>,
}
impl QualityOutcome {
fn bump(&mut self, name: &'static str, passed: bool, elapsed: Duration) {
let e = self.tally.entry(name).or_default();
if passed {
e.pass += 1;
} else {
e.fail += 1;
}
e.elapsed += elapsed;
}
}
pub fn apply_quality(
records: Vec<Value>,
q: &CompiledQuality,
) -> Result<QualityOutcome, FaucetError> {
let mut out = QualityOutcome::default();
let mut survivor_idx: Vec<usize> = Vec::new();
'next_record: for (page_index, rec) in records.into_iter().enumerate() {
for check in &q.record {
let start = Instant::now();
let result = evaluate_record_check(check, &rec);
let elapsed = start.elapsed();
match result {
Ok(()) => out.bump(check.name, true, elapsed),
Err(message) => {
out.bump(check.name, false, elapsed);
match check.on_failure {
OnFailure::Abort => {
return Err(FaucetError::QualityFailure {
check: check.name.to_string(),
message: field_msg(&check.field, &message),
});
}
OnFailure::Quarantine | OnFailure::QuarantineBatch => {
out.quarantined.push(QuarantinedRecord {
record: rec,
check: check.name,
field: check.field.clone(),
message: field_msg(&check.field, &message),
page_index,
});
continue 'next_record;
}
}
}
}
}
out.survivors.push(rec);
survivor_idx.push(page_index);
}
for check in &q.batch {
let start = Instant::now();
if matches!(check.kind, CompiledBatchKind::Unique { .. }) {
let dups = evaluate_unique_check(check, &out.survivors);
let elapsed = start.elapsed();
let passed = dups.is_empty();
out.bump(check.name, passed, elapsed);
if passed {
continue;
}
match check.on_failure {
OnFailure::Abort => {
return Err(FaucetError::QualityFailure {
check: check.name.to_string(),
message: format!("{} duplicate row(s)", dups.len()),
});
}
_ => {
let dup_set: std::collections::HashSet<usize> = dups.into_iter().collect();
let mut survivors = Vec::with_capacity(out.survivors.len());
let mut new_idx = Vec::with_capacity(survivor_idx.len());
let old_idx = std::mem::take(&mut survivor_idx);
for (i, rec) in std::mem::take(&mut out.survivors).into_iter().enumerate() {
if dup_set.contains(&i) {
out.quarantined.push(QuarantinedRecord {
record: rec,
check: check.name,
field: check.field.clone(),
message: field_msg(&check.field, "duplicate key"),
page_index: old_idx[i],
});
} else {
survivors.push(rec);
new_idx.push(old_idx[i]);
}
}
out.survivors = survivors;
survivor_idx = new_idx;
}
}
} else {
let result = evaluate_aggregate_check(check, &out.survivors);
let elapsed = start.elapsed();
match result {
Ok(()) => out.bump(check.name, true, elapsed),
Err(message) => {
out.bump(check.name, false, elapsed);
match check.on_failure {
OnFailure::Abort => {
return Err(FaucetError::QualityFailure {
check: check.name.to_string(),
message: field_msg(&check.field, &message),
});
}
_ => {
let old_idx = std::mem::take(&mut survivor_idx);
for (rec, page_index) in
std::mem::take(&mut out.survivors).into_iter().zip(old_idx)
{
out.quarantined.push(QuarantinedRecord {
record: rec,
check: check.name,
field: check.field.clone(),
message: field_msg(&check.field, &message),
page_index,
});
}
}
}
}
}
}
}
Ok(out)
}
fn field_msg(field: &Option<String>, message: &str) -> String {
match field {
Some(f) => format!("field '{f}': {message}"),
None => message.to_string(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::quality::config::{BatchCheck, CompareOp, OnFailure, RecordCheck};
use serde_json::json;
fn compiled(spec: QualitySpec) -> CompiledQuality {
CompiledQuality::compile(&spec).unwrap()
}
#[test]
fn per_record_quarantine_partitions_page() {
let q = compiled(QualitySpec {
record: vec![RecordCheck::NotNull {
field: "id".into(),
treat_missing_as_null: true,
on_failure: OnFailure::Quarantine,
}],
batch: vec![],
});
let page = vec![json!({"id": 1}), json!({"id": null}), json!({"id": 3})];
let out = apply_quality(page, &q).unwrap();
assert_eq!(out.survivors, vec![json!({"id": 1}), json!({"id": 3})]);
assert_eq!(out.quarantined.len(), 1);
assert_eq!(out.quarantined[0].check, "not_null");
assert_eq!(out.quarantined[0].field.as_deref(), Some("id"));
}
#[test]
fn quarantined_records_carry_page_index_not_list_index() {
let q = compiled(QualitySpec {
record: vec![RecordCheck::NotNull {
field: "id".into(),
treat_missing_as_null: true,
on_failure: OnFailure::Quarantine,
}],
batch: vec![],
});
let page = vec![
json!({"id": 0}),
json!({"id": null}),
json!({"id": 2}),
json!({"id": null}),
];
let out = apply_quality(page, &q).unwrap();
let idxs: Vec<usize> = out.quarantined.iter().map(|qr| qr.page_index).collect();
assert_eq!(
idxs,
vec![1, 3],
"quarantined records must carry their original page position"
);
}
#[test]
fn batch_quarantined_records_carry_page_index() {
let q = compiled(QualitySpec {
record: vec![],
batch: vec![BatchCheck::Unique {
fields: vec!["k".into()],
on_failure: OnFailure::Quarantine,
}],
});
let page = vec![
json!({"k": "a"}),
json!({"k": "a"}),
json!({"k": "b"}),
json!({"k": "a"}),
];
let out = apply_quality(page, &q).unwrap();
let mut idxs: Vec<usize> = out.quarantined.iter().map(|qr| qr.page_index).collect();
idxs.sort_unstable();
assert_eq!(
idxs,
vec![1, 3],
"batch-quarantined duplicates must carry their original page positions"
);
}
#[test]
fn per_record_abort_returns_err() {
let q = compiled(QualitySpec {
record: vec![RecordCheck::Compare {
field: "age".into(),
op: CompareOp::Gte,
value: json!(0),
on_failure: OnFailure::Abort,
}],
batch: vec![],
});
let page = vec![json!({"age": 5}), json!({"age": -1})];
let err = apply_quality(page, &q).unwrap_err();
assert!(matches!(err, crate::FaucetError::QualityFailure { .. }));
}
#[test]
fn batch_eval_runs_over_survivors() {
let q = compiled(QualitySpec {
record: vec![RecordCheck::NotNull {
field: "id".into(),
treat_missing_as_null: true,
on_failure: OnFailure::Quarantine,
}],
batch: vec![BatchCheck::RowCount {
min: Some(2),
max: None,
on_failure: OnFailure::Abort,
}],
});
let page = vec![json!({"id": 1}), json!({"id": null})];
let err = apply_quality(page, &q).unwrap_err();
assert!(
matches!(err, crate::FaucetError::QualityFailure { check, .. } if check == "row_count")
);
}
#[test]
fn unique_quarantine_routes_dupes_keeps_first() {
let q = compiled(QualitySpec {
record: vec![],
batch: vec![BatchCheck::Unique {
fields: vec!["id".into()],
on_failure: OnFailure::Quarantine,
}],
});
let page = vec![json!({"id": 1}), json!({"id": 1}), json!({"id": 2})];
let out = apply_quality(page, &q).unwrap();
assert_eq!(out.survivors, vec![json!({"id": 1}), json!({"id": 2})]);
assert_eq!(out.quarantined.len(), 1);
assert_eq!(out.quarantined[0].check, "unique");
}
#[test]
fn quarantine_batch_routes_all_survivors() {
let q = compiled(QualitySpec {
record: vec![],
batch: vec![BatchCheck::RowCount {
min: Some(10),
max: None,
on_failure: OnFailure::QuarantineBatch,
}],
});
let page = vec![json!({"id": 1}), json!({"id": 2})];
let out = apply_quality(page, &q).unwrap();
assert!(out.survivors.is_empty());
assert_eq!(out.quarantined.len(), 2);
}
#[test]
fn tally_counts_pass_and_fail() {
let q = compiled(QualitySpec {
record: vec![RecordCheck::NotNull {
field: "id".into(),
treat_missing_as_null: true,
on_failure: OnFailure::Quarantine,
}],
batch: vec![],
});
let page = vec![json!({"id": 1}), json!({"id": null})];
let out = apply_quality(page, &q).unwrap();
let t = out.tally.get("not_null").unwrap();
assert_eq!(t.pass, 1);
assert_eq!(t.fail, 1);
}
}