1use crate::core::{
9 current_validation_context, ColumnSpec, Constraint, ConstraintMetadata, ConstraintOptions,
10 ConstraintResult, LogicalOperator, UnifiedConstraint,
11};
12use crate::prelude::*;
13use crate::security::SqlSecurity;
14use async_trait::async_trait;
15use datafusion::prelude::*;
16use tracing::{debug, instrument};
17
18#[derive(Debug, Clone)]
60pub struct CompletenessConstraint {
61 columns: ColumnSpec,
63 threshold: f64,
65 operator: LogicalOperator,
67}
68
69impl CompletenessConstraint {
70 pub fn new(columns: impl Into<ColumnSpec>, options: ConstraintOptions) -> Self {
81 let threshold = options.threshold_or(1.0);
82 assert!(
83 (0.0..=1.0).contains(&threshold),
84 "Threshold must be between 0.0 and 1.0"
85 );
86
87 Self {
88 columns: columns.into(),
89 threshold,
90 operator: options.operator_or(LogicalOperator::All),
91 }
92 }
93
94 pub fn with_threshold(columns: impl Into<ColumnSpec>, threshold: f64) -> Self {
97 Self::new(columns, ConstraintOptions::new().with_threshold(threshold))
98 }
99
100 pub fn complete(columns: impl Into<ColumnSpec>) -> Self {
103 Self::new(columns, ConstraintOptions::new().with_threshold(1.0))
104 }
105
106 pub fn with_operator(
109 columns: impl Into<ColumnSpec>,
110 operator: LogicalOperator,
111 threshold: f64,
112 ) -> Self {
113 Self::new(
114 columns,
115 ConstraintOptions::new()
116 .with_operator(operator)
117 .with_threshold(threshold),
118 )
119 }
120}
121
122#[async_trait]
123impl UnifiedConstraint for CompletenessConstraint {
124 fn column_spec(&self) -> &ColumnSpec {
125 &self.columns
126 }
127
128 fn logical_operator(&self) -> Option<LogicalOperator> {
129 Some(self.operator)
130 }
131
132 #[instrument(skip(self, ctx), fields(
133 constraint.name = %self.name(),
134 constraint.threshold = %self.threshold,
135 constraint.operator = %self.operator
136 ))]
137 async fn evaluate_column(
138 &self,
139 ctx: &SessionContext,
140 column: &str,
141 ) -> Result<ConstraintResult> {
142 debug!(
143 constraint.name = %self.name(),
144 constraint.column = %column,
145 constraint.threshold = %self.threshold,
146 "Evaluating completeness for single column"
147 );
148
149 SqlSecurity::validate_identifier(column)?;
151 let column_identifier = SqlSecurity::escape_identifier(column)?;
152
153 let validation_ctx = current_validation_context();
155 let table_name = validation_ctx.table_name();
156
157 let sql = format!(
159 "SELECT
160 COUNT(*) as total_count,
161 COUNT({column_identifier}) as non_null_count
162 FROM {table_name}"
163 );
164
165 let df = ctx.sql(&sql).await?;
167 let batches = df.collect().await?;
168
169 if batches.is_empty() {
171 debug!(
172 constraint.name = %self.name(),
173 constraint.column = %column,
174 skip.reason = "No data to validate",
175 "Skipping constraint due to empty result set"
176 );
177 return Ok(ConstraintResult::skipped("No data to validate"));
178 }
179
180 let batch = &batches[0];
181 if batch.num_rows() == 0 {
182 return Ok(ConstraintResult::skipped("No data to validate"));
183 }
184
185 let total_count = batch
186 .column(0)
187 .as_any()
188 .downcast_ref::<arrow::array::Int64Array>()
189 .ok_or_else(|| TermError::Internal("Failed to extract total count".to_string()))?
190 .value(0) as f64;
191
192 if total_count == 0.0 {
193 debug!(
194 constraint.name = %self.name(),
195 constraint.column = %column,
196 skip.reason = "No data to validate",
197 data.rows = 0,
198 "Skipping constraint due to zero rows"
199 );
200 return Ok(ConstraintResult::skipped("No data to validate"));
201 }
202
203 let non_null_count = batch
204 .column(1)
205 .as_any()
206 .downcast_ref::<arrow::array::Int64Array>()
207 .ok_or_else(|| TermError::Internal("Failed to extract non-null count".to_string()))?
208 .value(0) as f64;
209
210 let completeness = non_null_count / total_count;
212
213 if completeness >= self.threshold {
215 debug!(
216 constraint.name = %self.name(),
217 constraint.column = %column,
218 constraint.threshold = %self.threshold,
219 result.completeness = %format!("{completeness:.4}"),
220 result.non_null_count = non_null_count as i64,
221 result.total_count = total_count as i64,
222 result.status = "success",
223 "Completeness constraint passed for column"
224 );
225 Ok(ConstraintResult::success_with_metric(completeness))
226 } else {
227 debug!(
228 constraint.name = %self.name(),
229 constraint.column = %column,
230 constraint.threshold = %self.threshold,
231 result.completeness = %format!("{completeness:.4}"),
232 result.non_null_count = non_null_count as i64,
233 result.total_count = total_count as i64,
234 result.status = "failure",
235 "Completeness constraint failed for column"
236 );
237 Ok(ConstraintResult::failure_with_metric(
238 completeness,
239 format!(
240 "Column '{column}' completeness {:.2}% is below threshold {:.2}%",
241 completeness * 100.0,
242 self.threshold * 100.0
243 ),
244 ))
245 }
246 }
247}
248
249#[async_trait]
250impl Constraint for CompletenessConstraint {
251 async fn evaluate(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
252 self.evaluate_unified(ctx).await
253 }
254
255 fn name(&self) -> &str {
256 "completeness"
257 }
258
259 fn column(&self) -> Option<&str> {
260 match &self.columns {
261 ColumnSpec::Single(col) => Some(col),
262 ColumnSpec::Multiple(_) => None,
263 }
264 }
265
266 fn metadata(&self) -> ConstraintMetadata {
267 let mut metadata = match &self.columns {
268 ColumnSpec::Single(col) => ConstraintMetadata::for_column(col),
269 ColumnSpec::Multiple(cols) => ConstraintMetadata::for_columns(cols),
270 };
271
272 let operator_desc = match &self.columns {
273 ColumnSpec::Single(_) => String::new(),
274 ColumnSpec::Multiple(_) => format!(" ({})", self.operator.description()),
275 };
276
277 metadata = metadata
278 .with_description(format!(
279 "Checks that {}{operator_desc} have at least {:.1}% completeness",
280 match &self.columns {
281 ColumnSpec::Single(_) => "column",
282 ColumnSpec::Multiple(_) => "columns",
283 },
284 self.threshold * 100.0
285 ))
286 .with_custom("threshold", self.threshold.to_string())
287 .with_custom("constraint_type", "data_quality");
288
289 if let ColumnSpec::Multiple(_) = &self.columns {
290 metadata = metadata.with_custom("operator", self.operator.description());
291 }
292
293 metadata
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use crate::core::ConstraintStatus;
301 use arrow::array::Int64Array;
302 use arrow::datatypes::{DataType, Field, Schema};
303 use arrow::record_batch::RecordBatch;
304 use datafusion::datasource::MemTable;
305 use std::sync::Arc;
306
307 use crate::test_helpers::evaluate_constraint_with_context;
308 async fn create_test_context(
309 columns: Vec<&str>,
310 data: Vec<Vec<Option<i64>>>,
311 ) -> SessionContext {
312 let ctx = SessionContext::new();
313
314 let fields: Vec<Field> = columns
316 .iter()
317 .map(|&name| Field::new(name, DataType::Int64, true))
318 .collect();
319 let schema = Arc::new(Schema::new(fields));
320
321 let arrays: Vec<Arc<dyn arrow::array::Array>> = (0..columns.len())
323 .map(|col_idx| {
324 let values: Vec<Option<i64>> = data.iter().map(|row| row[col_idx]).collect();
325 Arc::new(Int64Array::from(values)) as Arc<dyn arrow::array::Array>
326 })
327 .collect();
328
329 let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap();
331
332 let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
334 ctx.register_table("data", Arc::new(provider)).unwrap();
335
336 ctx
337 }
338
339 #[tokio::test]
340 async fn test_single_column_complete() {
341 let ctx = create_test_context(
342 vec!["id"],
343 vec![vec![Some(1)], vec![Some(2)], vec![Some(3)], vec![Some(4)]],
344 )
345 .await;
346
347 let constraint = CompletenessConstraint::complete("id");
348
349 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
350 .await
351 .unwrap();
352 assert_eq!(result.status, ConstraintStatus::Success);
353 assert_eq!(result.metric, Some(1.0));
354 }
355
356 #[tokio::test]
357 async fn test_single_column_with_threshold() {
358 let ctx = create_test_context(
359 vec!["email"],
360 vec![
361 vec![Some(1)],
362 vec![Some(2)],
363 vec![None],
364 vec![Some(4)],
365 vec![Some(5)],
366 ],
367 )
368 .await;
369
370 let constraint = CompletenessConstraint::with_threshold("email", 0.8);
371
372 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
373 .await
374 .unwrap();
375 assert_eq!(result.status, ConstraintStatus::Success);
376 assert_eq!(result.metric, Some(0.8)); }
378
379 #[tokio::test]
380 async fn test_single_column_below_threshold() {
381 let ctx = create_test_context(
382 vec!["phone"],
383 vec![vec![Some(1)], vec![None], vec![None], vec![Some(4)]],
384 )
385 .await;
386
387 let constraint = CompletenessConstraint::with_threshold("phone", 0.8);
388
389 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
390 .await
391 .unwrap();
392 assert_eq!(result.status, ConstraintStatus::Failure);
393 assert_eq!(result.metric, Some(0.5)); assert!(result.message.is_some());
395 assert!(result.message.unwrap().contains("50.00%"));
396 }
397
398 #[tokio::test]
399 async fn test_multiple_columns_all_operator() {
400 let ctx = create_test_context(
401 vec!["first_name", "last_name"],
402 vec![
403 vec![Some(1), Some(10)],
404 vec![Some(2), Some(20)],
405 vec![Some(3), Some(30)],
406 ],
407 )
408 .await;
409
410 let constraint = CompletenessConstraint::new(
411 vec!["first_name", "last_name"],
412 ConstraintOptions::new()
413 .with_operator(LogicalOperator::All)
414 .with_threshold(1.0),
415 );
416
417 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
418 .await
419 .unwrap();
420 assert_eq!(result.status, ConstraintStatus::Success);
421 assert_eq!(result.metric, Some(1.0));
422 }
423
424 #[tokio::test]
425 async fn test_multiple_columns_all_operator_failure() {
426 let ctx = create_test_context(
427 vec!["col1", "col2", "col3"],
428 vec![
429 vec![Some(1), None, Some(100)],
430 vec![Some(2), Some(20), Some(200)],
431 vec![Some(3), Some(30), Some(300)],
432 ],
433 )
434 .await;
435
436 let constraint = CompletenessConstraint::new(
437 vec!["col1", "col2", "col3"],
438 ConstraintOptions::new()
439 .with_operator(LogicalOperator::All)
440 .with_threshold(1.0),
441 );
442
443 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
444 .await
445 .unwrap();
446 assert_eq!(result.status, ConstraintStatus::Failure);
447 assert!(result.message.is_some());
448 assert!(result.message.unwrap().contains("col2"));
449 }
450
451 #[tokio::test]
452 async fn test_multiple_columns_any_operator() {
453 let ctx = create_test_context(
454 vec!["phone", "email", "address"],
455 vec![
456 vec![Some(1), None, None],
457 vec![None, Some(2), None],
458 vec![None, None, None],
459 ],
460 )
461 .await;
462
463 let constraint = CompletenessConstraint::with_operator(
464 vec!["phone", "email", "address"],
465 LogicalOperator::Any,
466 0.3, );
468
469 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
470 .await
471 .unwrap();
472 assert_eq!(result.status, ConstraintStatus::Success);
473 }
476
477 #[tokio::test]
478 async fn test_multiple_columns_at_least_operator() {
479 let ctx = create_test_context(
480 vec!["col1", "col2", "col3", "col4"],
481 vec![
482 vec![Some(1), Some(10), None, Some(100)],
483 vec![Some(2), Some(20), Some(200), None],
484 vec![Some(3), Some(30), Some(300), Some(3000)],
485 vec![Some(4), Some(40), Some(400), Some(4000)],
486 ],
487 )
488 .await;
489
490 let constraint = CompletenessConstraint::with_operator(
493 vec!["col1", "col2", "col3", "col4"],
494 LogicalOperator::AtLeast(2),
495 0.8,
496 );
497
498 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
499 .await
500 .unwrap();
501 assert_eq!(result.status, ConstraintStatus::Success);
502 }
504
505 #[tokio::test]
506 async fn test_multiple_columns_exactly_operator() {
507 let ctx = create_test_context(
508 vec!["a", "b", "c"],
509 vec![
510 vec![Some(1), Some(10), None],
511 vec![Some(2), None, None],
512 vec![Some(3), Some(30), None],
513 ],
514 )
515 .await;
516
517 let constraint = CompletenessConstraint::with_operator(
520 vec!["a", "b", "c"],
521 LogicalOperator::Exactly(1),
522 1.0,
523 );
524
525 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
526 .await
527 .unwrap();
528 assert_eq!(result.status, ConstraintStatus::Success);
529 }
530
531 #[tokio::test]
532 async fn test_empty_data() {
533 let ctx = create_test_context(vec!["id"], vec![]).await;
534 let constraint = CompletenessConstraint::complete("id");
535
536 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
537 .await
538 .unwrap();
539 assert_eq!(result.status, ConstraintStatus::Skipped);
540 }
541
542 #[test]
543 #[should_panic(expected = "Threshold must be between 0.0 and 1.0")]
544 fn test_invalid_threshold() {
545 CompletenessConstraint::with_threshold("col", 1.5);
546 }
547
548 #[tokio::test]
549 async fn test_metadata() {
550 let single = CompletenessConstraint::with_threshold("email", 0.95);
551 let metadata1 = single.metadata();
552 assert_eq!(metadata1.columns, vec!["email"]);
553 assert!(metadata1.description.is_some());
554 assert!(metadata1.description.unwrap().contains("95.0%"));
555
556 let multiple =
557 CompletenessConstraint::with_operator(vec!["a", "b"], LogicalOperator::Any, 0.8);
558 let metadata2 = multiple.metadata();
559 assert_eq!(metadata2.columns, vec!["a", "b"]);
560 assert!(metadata2.description.is_some());
561 assert!(metadata2.description.unwrap().contains("any"));
562 assert_eq!(metadata2.custom.get("operator"), Some(&"any".to_string()));
563 }
564}