1pub mod batch;
8pub mod compile;
9pub mod config;
10pub mod record;
11
12pub use compile::CompiledQuality;
13pub use config::{BatchCheck, CompareOp, JsonType, OnFailure, QualitySpec, RecordCheck};
14
15use crate::error::FaucetError;
16use crate::quality::batch::{evaluate_aggregate_check, evaluate_unique_check};
17use crate::quality::compile::CompiledBatchKind;
18use crate::quality::record::evaluate_record_check;
19use serde_json::Value;
20use std::collections::HashMap;
21use std::time::{Duration, Instant};
22
23#[derive(Debug, Clone)]
25pub struct QuarantinedRecord {
26 pub record: Value,
27 pub check: &'static str,
29 pub field: Option<String>,
31 pub message: String,
33 pub page_index: usize,
37}
38
39#[derive(Debug, Clone, Default)]
47pub struct CheckTally {
48 pub pass: u64,
49 pub fail: u64,
50 pub elapsed: Duration,
51}
52
53#[derive(Debug, Clone, Default)]
55pub struct QualityOutcome {
56 pub survivors: Vec<Value>,
58 pub quarantined: Vec<QuarantinedRecord>,
60 pub tally: HashMap<&'static str, CheckTally>,
62}
63
64impl QualityOutcome {
65 fn bump(&mut self, name: &'static str, passed: bool, elapsed: Duration) {
66 let e = self.tally.entry(name).or_default();
67 if passed {
68 e.pass += 1;
69 } else {
70 e.fail += 1;
71 }
72 e.elapsed += elapsed;
73 }
74}
75
76pub fn apply_quality(
82 records: Vec<Value>,
83 q: &CompiledQuality,
84) -> Result<QualityOutcome, FaucetError> {
85 let mut out = QualityOutcome::default();
86 let mut survivor_idx: Vec<usize> = Vec::new();
90
91 'next_record: for (page_index, rec) in records.into_iter().enumerate() {
93 for check in &q.record {
94 let start = Instant::now();
98 let result = evaluate_record_check(check, &rec);
99 let elapsed = start.elapsed();
100 match result {
101 Ok(()) => out.bump(check.name, true, elapsed),
102 Err(message) => {
103 out.bump(check.name, false, elapsed);
104 match check.on_failure {
105 OnFailure::Abort => {
106 return Err(FaucetError::QualityFailure {
107 check: check.name.to_string(),
108 message: field_msg(&check.field, &message),
109 });
110 }
111 OnFailure::Quarantine | OnFailure::QuarantineBatch => {
112 out.quarantined.push(QuarantinedRecord {
114 record: rec,
115 check: check.name,
116 field: check.field.clone(),
117 message: field_msg(&check.field, &message),
118 page_index,
119 });
120 continue 'next_record;
121 }
122 }
123 }
124 }
125 }
126 out.survivors.push(rec);
127 survivor_idx.push(page_index);
128 }
129
130 for check in &q.batch {
132 let start = Instant::now();
133 if matches!(check.kind, CompiledBatchKind::Unique { .. }) {
134 let dups = evaluate_unique_check(check, &out.survivors);
135 let elapsed = start.elapsed();
136 let passed = dups.is_empty();
137 out.bump(check.name, passed, elapsed);
138 if passed {
139 continue;
140 }
141 match check.on_failure {
142 OnFailure::Abort => {
143 return Err(FaucetError::QualityFailure {
144 check: check.name.to_string(),
145 message: format!("{} duplicate row(s)", dups.len()),
146 });
147 }
148 _ => {
149 let dup_set: std::collections::HashSet<usize> = dups.into_iter().collect();
151 let mut survivors = Vec::with_capacity(out.survivors.len());
152 let mut new_idx = Vec::with_capacity(survivor_idx.len());
153 let old_idx = std::mem::take(&mut survivor_idx);
154 for (i, rec) in std::mem::take(&mut out.survivors).into_iter().enumerate() {
155 if dup_set.contains(&i) {
156 out.quarantined.push(QuarantinedRecord {
157 record: rec,
158 check: check.name,
159 field: check.field.clone(),
160 message: field_msg(&check.field, "duplicate key"),
161 page_index: old_idx[i],
162 });
163 } else {
164 survivors.push(rec);
165 new_idx.push(old_idx[i]);
166 }
167 }
168 out.survivors = survivors;
169 survivor_idx = new_idx;
170 }
171 }
172 } else {
173 let result = evaluate_aggregate_check(check, &out.survivors);
174 let elapsed = start.elapsed();
175 match result {
176 Ok(()) => out.bump(check.name, true, elapsed),
177 Err(message) => {
178 out.bump(check.name, false, elapsed);
179 match check.on_failure {
180 OnFailure::Abort => {
181 return Err(FaucetError::QualityFailure {
182 check: check.name.to_string(),
183 message: field_msg(&check.field, &message),
184 });
185 }
186 _ => {
188 let old_idx = std::mem::take(&mut survivor_idx);
189 for (rec, page_index) in
190 std::mem::take(&mut out.survivors).into_iter().zip(old_idx)
191 {
192 out.quarantined.push(QuarantinedRecord {
193 record: rec,
194 check: check.name,
195 field: check.field.clone(),
196 message: field_msg(&check.field, &message),
197 page_index,
198 });
199 }
200 }
201 }
202 }
203 }
204 }
205 }
206
207 Ok(out)
208}
209
210fn field_msg(field: &Option<String>, message: &str) -> String {
211 match field {
212 Some(f) => format!("field '{f}': {message}"),
213 None => message.to_string(),
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use crate::quality::config::{BatchCheck, CompareOp, OnFailure, RecordCheck};
221 use serde_json::json;
222
223 fn compiled(spec: QualitySpec) -> CompiledQuality {
224 CompiledQuality::compile(&spec).unwrap()
225 }
226
227 #[test]
228 fn per_record_quarantine_partitions_page() {
229 let q = compiled(QualitySpec {
230 record: vec![RecordCheck::NotNull {
231 field: "id".into(),
232 treat_missing_as_null: true,
233 on_failure: OnFailure::Quarantine,
234 }],
235 batch: vec![],
236 });
237 let page = vec![json!({"id": 1}), json!({"id": null}), json!({"id": 3})];
238 let out = apply_quality(page, &q).unwrap();
239 assert_eq!(out.survivors, vec![json!({"id": 1}), json!({"id": 3})]);
240 assert_eq!(out.quarantined.len(), 1);
241 assert_eq!(out.quarantined[0].check, "not_null");
242 assert_eq!(out.quarantined[0].field.as_deref(), Some("id"));
243 }
244
245 #[test]
246 fn quarantined_records_carry_page_index_not_list_index() {
247 let q = compiled(QualitySpec {
251 record: vec![RecordCheck::NotNull {
252 field: "id".into(),
253 treat_missing_as_null: true,
254 on_failure: OnFailure::Quarantine,
255 }],
256 batch: vec![],
257 });
258 let page = vec![
259 json!({"id": 0}),
260 json!({"id": null}),
261 json!({"id": 2}),
262 json!({"id": null}),
263 ];
264 let out = apply_quality(page, &q).unwrap();
265 let idxs: Vec<usize> = out.quarantined.iter().map(|qr| qr.page_index).collect();
266 assert_eq!(
267 idxs,
268 vec![1, 3],
269 "quarantined records must carry their original page position"
270 );
271 }
272
273 #[test]
274 fn batch_quarantined_records_carry_page_index() {
275 let q = compiled(QualitySpec {
280 record: vec![],
281 batch: vec![BatchCheck::Unique {
282 fields: vec!["k".into()],
283 on_failure: OnFailure::Quarantine,
284 }],
285 });
286 let page = vec![
287 json!({"k": "a"}),
288 json!({"k": "a"}),
289 json!({"k": "b"}),
290 json!({"k": "a"}),
291 ];
292 let out = apply_quality(page, &q).unwrap();
293 let mut idxs: Vec<usize> = out.quarantined.iter().map(|qr| qr.page_index).collect();
294 idxs.sort_unstable();
295 assert_eq!(
296 idxs,
297 vec![1, 3],
298 "batch-quarantined duplicates must carry their original page positions"
299 );
300 }
301
302 #[test]
303 fn per_record_abort_returns_err() {
304 let q = compiled(QualitySpec {
305 record: vec![RecordCheck::Compare {
306 field: "age".into(),
307 op: CompareOp::Gte,
308 value: json!(0),
309 on_failure: OnFailure::Abort,
310 }],
311 batch: vec![],
312 });
313 let page = vec![json!({"age": 5}), json!({"age": -1})];
314 let err = apply_quality(page, &q).unwrap_err();
315 assert!(matches!(err, crate::FaucetError::QualityFailure { .. }));
316 }
317
318 #[test]
319 fn batch_eval_runs_over_survivors() {
320 let q = compiled(QualitySpec {
323 record: vec![RecordCheck::NotNull {
324 field: "id".into(),
325 treat_missing_as_null: true,
326 on_failure: OnFailure::Quarantine,
327 }],
328 batch: vec![BatchCheck::RowCount {
329 min: Some(2),
330 max: None,
331 on_failure: OnFailure::Abort,
332 }],
333 });
334 let page = vec![json!({"id": 1}), json!({"id": null})];
335 let err = apply_quality(page, &q).unwrap_err();
336 assert!(
337 matches!(err, crate::FaucetError::QualityFailure { check, .. } if check == "row_count")
338 );
339 }
340
341 #[test]
342 fn unique_quarantine_routes_dupes_keeps_first() {
343 let q = compiled(QualitySpec {
344 record: vec![],
345 batch: vec![BatchCheck::Unique {
346 fields: vec!["id".into()],
347 on_failure: OnFailure::Quarantine,
348 }],
349 });
350 let page = vec![json!({"id": 1}), json!({"id": 1}), json!({"id": 2})];
351 let out = apply_quality(page, &q).unwrap();
352 assert_eq!(out.survivors, vec![json!({"id": 1}), json!({"id": 2})]);
353 assert_eq!(out.quarantined.len(), 1);
354 assert_eq!(out.quarantined[0].check, "unique");
355 }
356
357 #[test]
358 fn quarantine_batch_routes_all_survivors() {
359 let q = compiled(QualitySpec {
360 record: vec![],
361 batch: vec![BatchCheck::RowCount {
362 min: Some(10),
363 max: None,
364 on_failure: OnFailure::QuarantineBatch,
365 }],
366 });
367 let page = vec![json!({"id": 1}), json!({"id": 2})];
368 let out = apply_quality(page, &q).unwrap();
369 assert!(out.survivors.is_empty());
370 assert_eq!(out.quarantined.len(), 2);
371 }
372
373 #[test]
374 fn tally_counts_pass_and_fail() {
375 let q = compiled(QualitySpec {
376 record: vec![RecordCheck::NotNull {
377 field: "id".into(),
378 treat_missing_as_null: true,
379 on_failure: OnFailure::Quarantine,
380 }],
381 batch: vec![],
382 });
383 let page = vec![json!({"id": 1}), json!({"id": null})];
384 let out = apply_quality(page, &q).unwrap();
385 let t = out.tally.get("not_null").unwrap();
386 assert_eq!(t.pass, 1);
387 assert_eq!(t.fail, 1);
388 }
389}