term_guard/constraints/
completeness.rs

1//! Completeness constraint for validating non-null ratios in data.
2//!
3//! This module provides a flexible completeness constraint that supports:
4//! - Single column completeness checks
5//! - Multiple column completeness with logical operators
6//! - Configurable thresholds for partial completeness
7
8use crate::core::{
9    current_validation_context, ColumnSpec, Constraint, ConstraintMetadata, ConstraintOptions,
10    ConstraintResult, LogicalOperator, UnifiedConstraint,
11};
12use crate::prelude::*;
13use crate::security::SqlSecurity;
14use async_trait::async_trait;
15use datafusion::prelude::*;
16use tracing::{debug, instrument};
17
18/// A constraint that checks the completeness (non-null ratio) of one or more columns.
19///
20/// This constraint supports:
21/// - Single column completeness checks
22/// - Multiple column completeness with logical operators (All, Any, AtLeast, etc.)
23/// - Configurable thresholds
24/// - Flexible configuration through ConstraintOptions
25///
26/// # Examples
27///
28/// ```rust
29/// use term_guard::constraints::CompletenessConstraint;
30/// use term_guard::core::LogicalOperator;
31///
32/// // Single column - 100% complete (equivalent to is_complete)
33/// let constraint = CompletenessConstraint::complete("user_id");
34///
35/// // Single column with threshold (equivalent to has_completeness)
36/// let constraint = CompletenessConstraint::with_threshold("email", 0.95);
37///
38/// // Multiple columns - all must be complete (equivalent to are_complete)
39/// let constraint = CompletenessConstraint::with_operator(
40///     vec!["first_name", "last_name"],
41///     LogicalOperator::All,
42///     1.0,
43/// );
44///
45/// // Multiple columns - any must be complete (equivalent to are_any_complete)
46/// let constraint = CompletenessConstraint::with_operator(
47///     vec!["phone", "email", "address"],
48///     LogicalOperator::Any,
49///     1.0,
50/// );
51///
52/// // New capability: At least 2 columns must be 90% complete
53/// let constraint = CompletenessConstraint::with_operator(
54///     vec!["email", "phone", "address", "postal_code"],
55///     LogicalOperator::AtLeast(2),
56///     0.9,
57/// );
58/// ```
59#[derive(Debug, Clone)]
60pub struct CompletenessConstraint {
61    /// The columns to check for completeness
62    columns: ColumnSpec,
63    /// The minimum acceptable completeness ratio (0.0 to 1.0)
64    threshold: f64,
65    /// The logical operator for combining results
66    operator: LogicalOperator,
67}
68
69impl CompletenessConstraint {
70    /// Creates a new unified completeness constraint.
71    ///
72    /// # Arguments
73    ///
74    /// * `columns` - The column(s) to check (accepts single string or vector)
75    /// * `options` - Configuration options including threshold and operator
76    ///
77    /// # Panics
78    ///
79    /// Panics if threshold is not between 0.0 and 1.0
80    pub fn new(columns: impl Into<ColumnSpec>, options: ConstraintOptions) -> Self {
81        let threshold = options.threshold_or(1.0);
82        assert!(
83            (0.0..=1.0).contains(&threshold),
84            "Threshold must be between 0.0 and 1.0"
85        );
86
87        Self {
88            columns: columns.into(),
89            threshold,
90            operator: options.operator_or(LogicalOperator::All),
91        }
92    }
93
94    /// Creates a constraint with a specific threshold.
95    /// Convenience method for the common case.
96    pub fn with_threshold(columns: impl Into<ColumnSpec>, threshold: f64) -> Self {
97        Self::new(columns, ConstraintOptions::new().with_threshold(threshold))
98    }
99
100    /// Creates a constraint requiring 100% completeness.
101    /// Convenience method equivalent to `is_complete`.
102    pub fn complete(columns: impl Into<ColumnSpec>) -> Self {
103        Self::new(columns, ConstraintOptions::new().with_threshold(1.0))
104    }
105
106    /// Creates a constraint for multiple columns with a specific operator.
107    /// Convenience method for multi-column checks.
108    pub fn with_operator(
109        columns: impl Into<ColumnSpec>,
110        operator: LogicalOperator,
111        threshold: f64,
112    ) -> Self {
113        Self::new(
114            columns,
115            ConstraintOptions::new()
116                .with_operator(operator)
117                .with_threshold(threshold),
118        )
119    }
120}
121
122#[async_trait]
123impl UnifiedConstraint for CompletenessConstraint {
124    fn column_spec(&self) -> &ColumnSpec {
125        &self.columns
126    }
127
128    fn logical_operator(&self) -> Option<LogicalOperator> {
129        Some(self.operator)
130    }
131
132    #[instrument(skip(self, ctx), fields(
133        constraint.name = %self.name(),
134        constraint.threshold = %self.threshold,
135        constraint.operator = %self.operator
136    ))]
137    async fn evaluate_column(
138        &self,
139        ctx: &SessionContext,
140        column: &str,
141    ) -> Result<ConstraintResult> {
142        debug!(
143            constraint.name = %self.name(),
144            constraint.column = %column,
145            constraint.threshold = %self.threshold,
146            "Evaluating completeness for single column"
147        );
148
149        // Validate and escape column identifier
150        SqlSecurity::validate_identifier(column)?;
151        let column_identifier = SqlSecurity::escape_identifier(column)?;
152
153        // Get the table name from the validation context
154        let validation_ctx = current_validation_context();
155        let table_name = validation_ctx.table_name();
156
157        // Build SQL query to calculate completeness
158        let sql = format!(
159            "SELECT 
160                COUNT(*) as total_count,
161                COUNT({column_identifier}) as non_null_count
162             FROM {table_name}"
163        );
164
165        // Execute query
166        let df = ctx.sql(&sql).await?;
167        let batches = df.collect().await?;
168
169        // Extract results
170        if batches.is_empty() {
171            debug!(
172                constraint.name = %self.name(),
173                constraint.column = %column,
174                skip.reason = "No data to validate",
175                "Skipping constraint due to empty result set"
176            );
177            return Ok(ConstraintResult::skipped("No data to validate"));
178        }
179
180        let batch = &batches[0];
181        if batch.num_rows() == 0 {
182            return Ok(ConstraintResult::skipped("No data to validate"));
183        }
184
185        let total_count = batch
186            .column(0)
187            .as_any()
188            .downcast_ref::<arrow::array::Int64Array>()
189            .ok_or_else(|| TermError::Internal("Failed to extract total count".to_string()))?
190            .value(0) as f64;
191
192        if total_count == 0.0 {
193            debug!(
194                constraint.name = %self.name(),
195                constraint.column = %column,
196                skip.reason = "No data to validate",
197                data.rows = 0,
198                "Skipping constraint due to zero rows"
199            );
200            return Ok(ConstraintResult::skipped("No data to validate"));
201        }
202
203        let non_null_count = batch
204            .column(1)
205            .as_any()
206            .downcast_ref::<arrow::array::Int64Array>()
207            .ok_or_else(|| TermError::Internal("Failed to extract non-null count".to_string()))?
208            .value(0) as f64;
209
210        // Calculate completeness ratio
211        let completeness = non_null_count / total_count;
212
213        // Determine result based on threshold
214        if completeness >= self.threshold {
215            debug!(
216                constraint.name = %self.name(),
217                constraint.column = %column,
218                constraint.threshold = %self.threshold,
219                result.completeness = %format!("{completeness:.4}"),
220                result.non_null_count = non_null_count as i64,
221                result.total_count = total_count as i64,
222                result.status = "success",
223                "Completeness constraint passed for column"
224            );
225            Ok(ConstraintResult::success_with_metric(completeness))
226        } else {
227            debug!(
228                constraint.name = %self.name(),
229                constraint.column = %column,
230                constraint.threshold = %self.threshold,
231                result.completeness = %format!("{completeness:.4}"),
232                result.non_null_count = non_null_count as i64,
233                result.total_count = total_count as i64,
234                result.status = "failure",
235                "Completeness constraint failed for column"
236            );
237            Ok(ConstraintResult::failure_with_metric(
238                completeness,
239                format!(
240                    "Column '{column}' completeness {:.2}% is below threshold {:.2}%",
241                    completeness * 100.0,
242                    self.threshold * 100.0
243                ),
244            ))
245        }
246    }
247}
248
249#[async_trait]
250impl Constraint for CompletenessConstraint {
251    async fn evaluate(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
252        self.evaluate_unified(ctx).await
253    }
254
255    fn name(&self) -> &str {
256        "completeness"
257    }
258
259    fn column(&self) -> Option<&str> {
260        match &self.columns {
261            ColumnSpec::Single(col) => Some(col),
262            ColumnSpec::Multiple(_) => None,
263        }
264    }
265
266    fn metadata(&self) -> ConstraintMetadata {
267        let mut metadata = match &self.columns {
268            ColumnSpec::Single(col) => ConstraintMetadata::for_column(col),
269            ColumnSpec::Multiple(cols) => ConstraintMetadata::for_columns(cols),
270        };
271
272        let operator_desc = match &self.columns {
273            ColumnSpec::Single(_) => String::new(),
274            ColumnSpec::Multiple(_) => format!(" ({})", self.operator.description()),
275        };
276
277        metadata = metadata
278            .with_description(format!(
279                "Checks that {}{operator_desc} have at least {:.1}% completeness",
280                match &self.columns {
281                    ColumnSpec::Single(_) => "column",
282                    ColumnSpec::Multiple(_) => "columns",
283                },
284                self.threshold * 100.0
285            ))
286            .with_custom("threshold", self.threshold.to_string())
287            .with_custom("constraint_type", "data_quality");
288
289        if let ColumnSpec::Multiple(_) = &self.columns {
290            metadata = metadata.with_custom("operator", self.operator.description());
291        }
292
293        metadata
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use crate::core::ConstraintStatus;
301    use arrow::array::Int64Array;
302    use arrow::datatypes::{DataType, Field, Schema};
303    use arrow::record_batch::RecordBatch;
304    use datafusion::datasource::MemTable;
305    use std::sync::Arc;
306
307    use crate::test_helpers::evaluate_constraint_with_context;
308    async fn create_test_context(
309        columns: Vec<&str>,
310        data: Vec<Vec<Option<i64>>>,
311    ) -> SessionContext {
312        let ctx = SessionContext::new();
313
314        // Create schema
315        let fields: Vec<Field> = columns
316            .iter()
317            .map(|&name| Field::new(name, DataType::Int64, true))
318            .collect();
319        let schema = Arc::new(Schema::new(fields));
320
321        // Create arrays
322        let arrays: Vec<Arc<dyn arrow::array::Array>> = (0..columns.len())
323            .map(|col_idx| {
324                let values: Vec<Option<i64>> = data.iter().map(|row| row[col_idx]).collect();
325                Arc::new(Int64Array::from(values)) as Arc<dyn arrow::array::Array>
326            })
327            .collect();
328
329        // Create batch
330        let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap();
331
332        // Register as table
333        let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
334        ctx.register_table("data", Arc::new(provider)).unwrap();
335
336        ctx
337    }
338
339    #[tokio::test]
340    async fn test_single_column_complete() {
341        let ctx = create_test_context(
342            vec!["id"],
343            vec![vec![Some(1)], vec![Some(2)], vec![Some(3)], vec![Some(4)]],
344        )
345        .await;
346
347        let constraint = CompletenessConstraint::complete("id");
348
349        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
350            .await
351            .unwrap();
352        assert_eq!(result.status, ConstraintStatus::Success);
353        assert_eq!(result.metric, Some(1.0));
354    }
355
356    #[tokio::test]
357    async fn test_single_column_with_threshold() {
358        let ctx = create_test_context(
359            vec!["email"],
360            vec![
361                vec![Some(1)],
362                vec![Some(2)],
363                vec![None],
364                vec![Some(4)],
365                vec![Some(5)],
366            ],
367        )
368        .await;
369
370        let constraint = CompletenessConstraint::with_threshold("email", 0.8);
371
372        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
373            .await
374            .unwrap();
375        assert_eq!(result.status, ConstraintStatus::Success);
376        assert_eq!(result.metric, Some(0.8)); // 4 out of 5
377    }
378
379    #[tokio::test]
380    async fn test_single_column_below_threshold() {
381        let ctx = create_test_context(
382            vec!["phone"],
383            vec![vec![Some(1)], vec![None], vec![None], vec![Some(4)]],
384        )
385        .await;
386
387        let constraint = CompletenessConstraint::with_threshold("phone", 0.8);
388
389        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
390            .await
391            .unwrap();
392        assert_eq!(result.status, ConstraintStatus::Failure);
393        assert_eq!(result.metric, Some(0.5)); // 2 out of 4
394        assert!(result.message.is_some());
395        assert!(result.message.unwrap().contains("50.00%"));
396    }
397
398    #[tokio::test]
399    async fn test_multiple_columns_all_operator() {
400        let ctx = create_test_context(
401            vec!["first_name", "last_name"],
402            vec![
403                vec![Some(1), Some(10)],
404                vec![Some(2), Some(20)],
405                vec![Some(3), Some(30)],
406            ],
407        )
408        .await;
409
410        let constraint = CompletenessConstraint::new(
411            vec!["first_name", "last_name"],
412            ConstraintOptions::new()
413                .with_operator(LogicalOperator::All)
414                .with_threshold(1.0),
415        );
416
417        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
418            .await
419            .unwrap();
420        assert_eq!(result.status, ConstraintStatus::Success);
421        assert_eq!(result.metric, Some(1.0));
422    }
423
424    #[tokio::test]
425    async fn test_multiple_columns_all_operator_failure() {
426        let ctx = create_test_context(
427            vec!["col1", "col2", "col3"],
428            vec![
429                vec![Some(1), None, Some(100)],
430                vec![Some(2), Some(20), Some(200)],
431                vec![Some(3), Some(30), Some(300)],
432            ],
433        )
434        .await;
435
436        let constraint = CompletenessConstraint::new(
437            vec!["col1", "col2", "col3"],
438            ConstraintOptions::new()
439                .with_operator(LogicalOperator::All)
440                .with_threshold(1.0),
441        );
442
443        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
444            .await
445            .unwrap();
446        assert_eq!(result.status, ConstraintStatus::Failure);
447        assert!(result.message.is_some());
448        assert!(result.message.unwrap().contains("col2"));
449    }
450
451    #[tokio::test]
452    async fn test_multiple_columns_any_operator() {
453        let ctx = create_test_context(
454            vec!["phone", "email", "address"],
455            vec![
456                vec![Some(1), None, None],
457                vec![None, Some(2), None],
458                vec![None, None, None],
459            ],
460        )
461        .await;
462
463        let constraint = CompletenessConstraint::with_operator(
464            vec!["phone", "email", "address"],
465            LogicalOperator::Any,
466            0.3, // 30% complete
467        );
468
469        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
470            .await
471            .unwrap();
472        assert_eq!(result.status, ConstraintStatus::Success);
473        // phone: 1/3 = 0.33, email: 1/3 = 0.33, address: 0/3 = 0
474        // At least one column (phone and email) meets the 30% threshold
475    }
476
477    #[tokio::test]
478    async fn test_multiple_columns_at_least_operator() {
479        let ctx = create_test_context(
480            vec!["col1", "col2", "col3", "col4"],
481            vec![
482                vec![Some(1), Some(10), None, Some(100)],
483                vec![Some(2), Some(20), Some(200), None],
484                vec![Some(3), Some(30), Some(300), Some(3000)],
485                vec![Some(4), Some(40), Some(400), Some(4000)],
486            ],
487        )
488        .await;
489
490        // col1: 100%, col2: 100%, col3: 75%, col4: 75%
491        // At least 2 columns should be 80% complete
492        let constraint = CompletenessConstraint::with_operator(
493            vec!["col1", "col2", "col3", "col4"],
494            LogicalOperator::AtLeast(2),
495            0.8,
496        );
497
498        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
499            .await
500            .unwrap();
501        assert_eq!(result.status, ConstraintStatus::Success);
502        // 2 columns (col1 and col2) are >= 80% complete
503    }
504
505    #[tokio::test]
506    async fn test_multiple_columns_exactly_operator() {
507        let ctx = create_test_context(
508            vec!["a", "b", "c"],
509            vec![
510                vec![Some(1), Some(10), None],
511                vec![Some(2), None, None],
512                vec![Some(3), Some(30), None],
513            ],
514        )
515        .await;
516
517        // a: 100%, b: 66.7%, c: 0%
518        // Exactly 1 column should be 100% complete
519        let constraint = CompletenessConstraint::with_operator(
520            vec!["a", "b", "c"],
521            LogicalOperator::Exactly(1),
522            1.0,
523        );
524
525        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
526            .await
527            .unwrap();
528        assert_eq!(result.status, ConstraintStatus::Success);
529    }
530
531    #[tokio::test]
532    async fn test_empty_data() {
533        let ctx = create_test_context(vec!["id"], vec![]).await;
534        let constraint = CompletenessConstraint::complete("id");
535
536        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
537            .await
538            .unwrap();
539        assert_eq!(result.status, ConstraintStatus::Skipped);
540    }
541
542    #[test]
543    #[should_panic(expected = "Threshold must be between 0.0 and 1.0")]
544    fn test_invalid_threshold() {
545        CompletenessConstraint::with_threshold("col", 1.5);
546    }
547
548    #[tokio::test]
549    async fn test_metadata() {
550        let single = CompletenessConstraint::with_threshold("email", 0.95);
551        let metadata1 = single.metadata();
552        assert_eq!(metadata1.columns, vec!["email"]);
553        assert!(metadata1.description.is_some());
554        assert!(metadata1.description.unwrap().contains("95.0%"));
555
556        let multiple =
557            CompletenessConstraint::with_operator(vec!["a", "b"], LogicalOperator::Any, 0.8);
558        let metadata2 = multiple.metadata();
559        assert_eq!(metadata2.columns, vec!["a", "b"]);
560        assert!(metadata2.description.is_some());
561        assert!(metadata2.description.unwrap().contains("any"));
562        assert_eq!(metadata2.custom.get("operator"), Some(&"any".to_string()));
563    }
564}