term_guard/constraints/
uniqueness.rs

1//! Uniqueness constraint implementation.
2//!
3//! This module provides a comprehensive constraint that handles all uniqueness-related
4//! validations including full uniqueness, distinctness, unique value ratios, and primary keys.
5
6use crate::constraints::Assertion;
7use crate::core::{current_validation_context, Constraint, ConstraintMetadata, ConstraintResult};
8use crate::prelude::*;
9use crate::security::SqlSecurity;
10use arrow::array::Array;
11use async_trait::async_trait;
12use datafusion::prelude::*;
13use std::fmt;
14use tracing::instrument;
15/// Null handling strategy for uniqueness constraints.
16///
17/// Defines how NULL values should be treated when evaluating uniqueness.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
19pub enum NullHandling {
20    /// Exclude NULL values from uniqueness calculations (default behavior).
21    /// NULLs are not counted in distinct counts but are included in total counts.
22    #[default]
23    Exclude,
24
25    /// Include NULL values as regular values in uniqueness calculations.
26    /// Multiple NULLs are treated as duplicate values.
27    Include,
28
29    /// Treat each NULL as a distinct value.
30    /// Each NULL is considered unique from every other NULL.
31    Distinct,
32}
33
34impl fmt::Display for NullHandling {
35    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36        match self {
37            NullHandling::Exclude => write!(f, "exclude"),
38            NullHandling::Include => write!(f, "include"),
39            NullHandling::Distinct => write!(f, "distinct"),
40        }
41    }
42}
43
44/// Type of uniqueness validation to perform.
45///
46/// This enum encompasses all the different types of uniqueness checks that were
47/// previously handled by separate constraint types.
48#[derive(Debug, Clone, PartialEq)]
49pub enum UniquenessType {
50    /// Full uniqueness validation with configurable threshold.
51    ///
52    /// Replaces the functionality of `UniquenessConstraint`.
53    /// Validates that at least `threshold` ratio of values are unique.
54    FullUniqueness { threshold: f64 },
55
56    /// Distinctness validation using flexible assertions.
57    ///
58    /// Replaces the functionality of `DistinctnessConstraint`.
59    /// Validates the ratio of distinct values using assertion-based logic.
60    Distinctness(Assertion),
61
62    /// Unique value ratio validation using flexible assertions.
63    ///
64    /// Replaces the functionality of `UniqueValueRatioConstraint`.
65    /// Validates the ratio of values that appear exactly once.
66    UniqueValueRatio(Assertion),
67
68    /// Primary key validation (unique + non-null).
69    ///
70    /// Replaces the functionality of `PrimaryKeyConstraint`.
71    /// Enforces that values are both unique and contain no NULLs.
72    PrimaryKey,
73
74    /// Uniqueness validation that allows NULL values.
75    ///
76    /// Similar to FullUniqueness but with explicit NULL handling control.
77    UniqueWithNulls {
78        threshold: f64,
79        null_handling: NullHandling,
80    },
81
82    /// Composite uniqueness with advanced null handling.
83    ///
84    /// Optimized for multi-column uniqueness checks with configurable
85    /// null handling strategies.
86    UniqueComposite {
87        threshold: f64,
88        null_handling: NullHandling,
89        case_sensitive: bool,
90    },
91}
92
93impl UniquenessType {
94    /// Returns the name of this uniqueness type for tracing and metadata.
95    pub fn name(&self) -> &str {
96        match self {
97            UniquenessType::FullUniqueness { .. } => "full_uniqueness",
98            UniquenessType::Distinctness(_) => "distinctness",
99            UniquenessType::UniqueValueRatio(_) => "unique_value_ratio",
100            UniquenessType::PrimaryKey => "primary_key",
101            UniquenessType::UniqueWithNulls { .. } => "unique_with_nulls",
102            UniquenessType::UniqueComposite { .. } => "unique_composite",
103        }
104    }
105
106    /// Returns a human-readable description of this uniqueness type.
107    pub fn description(&self) -> String {
108        match self {
109            UniquenessType::FullUniqueness { threshold } => {
110                let threshold_pct = threshold * 100.0;
111                format!("validates that at least {threshold_pct:.1}% of values are unique")
112            }
113            UniquenessType::Distinctness(assertion) => {
114                format!(
115                    "validates that distinct value ratio {}",
116                    assertion.description()
117                )
118            }
119            UniquenessType::UniqueValueRatio(assertion) => {
120                format!(
121                    "validates that unique value ratio {}",
122                    assertion.description()
123                )
124            }
125            UniquenessType::PrimaryKey => {
126                "validates that values form a valid primary key (unique + non-null)".to_string()
127            }
128            UniquenessType::UniqueWithNulls {
129                threshold,
130                null_handling,
131            } => {
132                let threshold_pct = threshold * 100.0;
133                format!(
134                    "validates that at least {threshold_pct:.1}% of values are unique (nulls: {null_handling})"
135                )
136            }
137            UniquenessType::UniqueComposite {
138                threshold,
139                null_handling,
140                case_sensitive,
141            } => {
142                let threshold_pct = threshold * 100.0;
143                format!(
144                    "validates composite uniqueness at {threshold_pct:.1}% threshold (nulls: {null_handling}, case-sensitive: {case_sensitive})"
145                )
146            }
147        }
148    }
149}
150
151/// Options for configuring uniqueness constraint behavior.
152#[derive(Debug, Clone, PartialEq)]
153pub struct UniquenessOptions {
154    /// How to handle NULL values in uniqueness calculations.
155    pub null_handling: NullHandling,
156
157    /// Whether string comparisons should be case-sensitive.
158    pub case_sensitive: bool,
159
160    /// Whether to trim whitespace before comparison.
161    pub trim_whitespace: bool,
162}
163
164impl Default for UniquenessOptions {
165    fn default() -> Self {
166        Self {
167            null_handling: NullHandling::default(),
168            case_sensitive: true,
169            trim_whitespace: false,
170        }
171    }
172}
173
174impl UniquenessOptions {
175    /// Creates new options with default values.
176    pub fn new() -> Self {
177        Self::default()
178    }
179
180    /// Sets the null handling strategy.
181    pub fn with_null_handling(mut self, null_handling: NullHandling) -> Self {
182        self.null_handling = null_handling;
183        self
184    }
185
186    /// Sets whether string comparisons should be case-sensitive.
187    pub fn case_sensitive(mut self, case_sensitive: bool) -> Self {
188        self.case_sensitive = case_sensitive;
189        self
190    }
191
192    /// Sets whether to trim whitespace before comparison.
193    pub fn trim_whitespace(mut self, trim_whitespace: bool) -> Self {
194        self.trim_whitespace = trim_whitespace;
195        self
196    }
197}
198
199/// A unified constraint that handles all types of uniqueness validation.
200///
201/// This constraint consolidates the functionality of multiple uniqueness-related constraints:
202/// - `UniquenessConstraint` - Full uniqueness with threshold
203/// - `DistinctnessConstraint` - Distinctness ratio validation
204/// - `UniqueValueRatioConstraint` - Values appearing exactly once
205/// - `PrimaryKeyConstraint` - Unique + non-null validation
206///
207/// # Examples
208///
209/// ## Full Uniqueness (replacing UniquenessConstraint)
210///
211/// ```rust
212/// use term_guard::constraints::{UniquenessConstraint, UniquenessType};
213///
214/// // Single column uniqueness
215/// let constraint = UniquenessConstraint::full_uniqueness("user_id", 1.0)?;
216///
217/// // Multi-column uniqueness with threshold
218/// let constraint = UniquenessConstraint::full_uniqueness_multi(
219///     vec!["email", "domain"],
220///     0.95
221/// )?;
222/// # Ok::<(), Box<dyn std::error::Error>>(())
223/// ```
224///
225/// ## Distinctness (replacing DistinctnessConstraint)
226///
227/// ```rust
228/// use term_guard::constraints::{UniquenessConstraint, Assertion};
229///
230/// let constraint = UniquenessConstraint::distinctness(
231///     vec!["category"],
232///     Assertion::GreaterThan(0.8)
233/// )?;
234/// # Ok::<(), Box<dyn std::error::Error>>(())
235/// ```
236///
237/// ## Primary Key (replacing PrimaryKeyConstraint)
238///
239/// ```rust
240/// use term_guard::constraints::UniquenessConstraint;
241///
242/// let constraint = UniquenessConstraint::primary_key(
243///     vec!["order_id", "line_item_id"]
244/// )?;
245/// # Ok::<(), Box<dyn std::error::Error>>(())
246/// ```
247#[derive(Debug, Clone)]
248pub struct UniquenessConstraint {
249    columns: Vec<String>,
250    uniqueness_type: UniquenessType,
251    options: UniquenessOptions,
252}
253
254impl UniquenessConstraint {
255    /// Creates a new unified uniqueness constraint.
256    ///
257    /// # Arguments
258    ///
259    /// * `columns` - The columns to check for uniqueness
260    /// * `uniqueness_type` - The type of uniqueness validation to perform
261    /// * `options` - Configuration options for the constraint
262    ///
263    /// # Errors
264    ///
265    /// Returns error if column names are invalid or thresholds are out of range.
266    pub fn new<I, S>(
267        columns: I,
268        uniqueness_type: UniquenessType,
269        options: UniquenessOptions,
270    ) -> Result<Self>
271    where
272        I: IntoIterator<Item = S>,
273        S: Into<String>,
274    {
275        let column_vec: Vec<String> = columns.into_iter().map(Into::into).collect();
276
277        if column_vec.is_empty() {
278            return Err(TermError::validation_failed(
279                "unified_uniqueness",
280                "At least one column must be specified",
281            ));
282        }
283
284        // Validate column names
285        for column in &column_vec {
286            SqlSecurity::validate_identifier(column)?;
287        }
288
289        // Validate thresholds in uniqueness types
290        match &uniqueness_type {
291            UniquenessType::FullUniqueness { threshold }
292            | UniquenessType::UniqueWithNulls { threshold, .. }
293            | UniquenessType::UniqueComposite { threshold, .. } => {
294                if !((0.0..=1.0).contains(threshold)) {
295                    return Err(TermError::validation_failed(
296                        "unified_uniqueness",
297                        "Threshold must be between 0.0 and 1.0",
298                    ));
299                }
300            }
301            _ => {} // Other types don't have threshold validation
302        }
303
304        Ok(Self {
305            columns: column_vec,
306            uniqueness_type,
307            options,
308        })
309    }
310
311    /// Creates a full uniqueness constraint for a single column.
312    ///
313    /// This replaces `UniquenessConstraint::single()`.
314    pub fn full_uniqueness(column: impl Into<String>, threshold: f64) -> Result<Self> {
315        Self::new(
316            vec![column.into()],
317            UniquenessType::FullUniqueness { threshold },
318            UniquenessOptions::default(),
319        )
320    }
321
322    /// Creates a full uniqueness constraint for multiple columns.
323    ///
324    /// This replaces `UniquenessConstraint::multiple()` and `UniquenessConstraint::with_threshold()`.
325    pub fn full_uniqueness_multi<I, S>(columns: I, threshold: f64) -> Result<Self>
326    where
327        I: IntoIterator<Item = S>,
328        S: Into<String>,
329    {
330        Self::new(
331            columns,
332            UniquenessType::FullUniqueness { threshold },
333            UniquenessOptions::default(),
334        )
335    }
336
337    /// Creates a distinctness constraint.
338    ///
339    /// This replaces `DistinctnessConstraint::new()`.
340    pub fn distinctness<I, S>(columns: I, assertion: Assertion) -> Result<Self>
341    where
342        I: IntoIterator<Item = S>,
343        S: Into<String>,
344    {
345        Self::new(
346            columns,
347            UniquenessType::Distinctness(assertion),
348            UniquenessOptions::default(),
349        )
350    }
351
352    /// Creates a unique value ratio constraint.
353    ///
354    /// This replaces `UniqueValueRatioConstraint::new()`.
355    pub fn unique_value_ratio<I, S>(columns: I, assertion: Assertion) -> Result<Self>
356    where
357        I: IntoIterator<Item = S>,
358        S: Into<String>,
359    {
360        Self::new(
361            columns,
362            UniquenessType::UniqueValueRatio(assertion),
363            UniquenessOptions::default(),
364        )
365    }
366
367    /// Creates a primary key constraint.
368    ///
369    /// This replaces `PrimaryKeyConstraint::new()`.
370    pub fn primary_key<I, S>(columns: I) -> Result<Self>
371    where
372        I: IntoIterator<Item = S>,
373        S: Into<String>,
374    {
375        Self::new(
376            columns,
377            UniquenessType::PrimaryKey,
378            UniquenessOptions::default(),
379        )
380    }
381
382    /// Creates a uniqueness constraint that allows NULLs.
383    pub fn unique_with_nulls<I, S>(
384        columns: I,
385        threshold: f64,
386        null_handling: NullHandling,
387    ) -> Result<Self>
388    where
389        I: IntoIterator<Item = S>,
390        S: Into<String>,
391    {
392        Self::new(
393            columns,
394            UniquenessType::UniqueWithNulls {
395                threshold,
396                null_handling,
397            },
398            UniquenessOptions::default(),
399        )
400    }
401
402    /// Creates a composite uniqueness constraint with advanced options.
403    pub fn unique_composite<I, S>(
404        columns: I,
405        threshold: f64,
406        null_handling: NullHandling,
407        case_sensitive: bool,
408    ) -> Result<Self>
409    where
410        I: IntoIterator<Item = S>,
411        S: Into<String>,
412    {
413        Self::new(
414            columns,
415            UniquenessType::UniqueComposite {
416                threshold,
417                null_handling,
418                case_sensitive,
419            },
420            UniquenessOptions::new()
421                .with_null_handling(null_handling)
422                .case_sensitive(case_sensitive),
423        )
424    }
425
426    /// Returns the columns being validated.
427    pub fn columns(&self) -> &[String] {
428        &self.columns
429    }
430
431    /// Returns the uniqueness type.
432    pub fn uniqueness_type(&self) -> &UniquenessType {
433        &self.uniqueness_type
434    }
435
436    /// Returns the constraint options.
437    pub fn options(&self) -> &UniquenessOptions {
438        &self.options
439    }
440}
441
442#[async_trait]
443impl Constraint for UniquenessConstraint {
444    #[instrument(skip(self, ctx), fields(
445        columns = ?self.columns,
446        uniqueness_type = %self.uniqueness_type.name(),
447        null_handling = %self.options.null_handling
448    ))]
449    async fn evaluate(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
450        // Get the table name from the validation context
451        let validation_ctx = current_validation_context();
452        let table_name = validation_ctx.table_name();
453
454        // Generate SQL based on uniqueness type
455        let sql = self.generate_sql(table_name)?;
456
457        let df = ctx.sql(&sql).await?;
458        let batches = df.collect().await?;
459
460        if batches.is_empty() {
461            return Ok(ConstraintResult::skipped("No data to validate"));
462        }
463
464        let batch = &batches[0];
465        if batch.num_rows() == 0 {
466            return Ok(ConstraintResult::skipped("No data to validate"));
467        }
468
469        // Process results based on uniqueness type
470        match &self.uniqueness_type {
471            UniquenessType::FullUniqueness { threshold }
472            | UniquenessType::UniqueWithNulls { threshold, .. }
473            | UniquenessType::UniqueComposite { threshold, .. } => {
474                self.evaluate_threshold_based(batch, *threshold).await
475            }
476            UniquenessType::Distinctness(assertion)
477            | UniquenessType::UniqueValueRatio(assertion) => {
478                self.evaluate_assertion_based(batch, assertion).await
479            }
480            UniquenessType::PrimaryKey => self.evaluate_primary_key(batch).await,
481        }
482    }
483
484    fn name(&self) -> &str {
485        self.uniqueness_type.name()
486    }
487
488    fn column(&self) -> Option<&str> {
489        if self.columns.len() == 1 {
490            Some(&self.columns[0])
491        } else {
492            None
493        }
494    }
495
496    fn metadata(&self) -> ConstraintMetadata {
497        let mut metadata = if self.columns.len() == 1 {
498            ConstraintMetadata::for_column(&self.columns[0])
499        } else {
500            ConstraintMetadata::for_columns(&self.columns)
501        };
502
503        metadata = metadata
504            .with_description(format!(
505                "Unified uniqueness constraint that {}",
506                self.uniqueness_type.description()
507            ))
508            .with_custom("uniqueness_type", self.uniqueness_type.name())
509            .with_custom("null_handling", self.options.null_handling.to_string())
510            .with_custom("case_sensitive", self.options.case_sensitive.to_string())
511            .with_custom("constraint_type", "uniqueness");
512
513        // Add type-specific metadata
514        match &self.uniqueness_type {
515            UniquenessType::FullUniqueness { threshold }
516            | UniquenessType::UniqueWithNulls { threshold, .. }
517            | UniquenessType::UniqueComposite { threshold, .. } => {
518                metadata = metadata.with_custom("threshold", threshold.to_string());
519            }
520            UniquenessType::Distinctness(assertion)
521            | UniquenessType::UniqueValueRatio(assertion) => {
522                metadata = metadata.with_custom("assertion", assertion.to_string());
523            }
524            UniquenessType::PrimaryKey => {
525                metadata = metadata.with_custom("strict", "true");
526            }
527        }
528
529        metadata
530    }
531}
532
533impl UniquenessConstraint {
534    /// Generates SQL query based on the uniqueness type and options.
535    fn generate_sql(&self, table_name: &str) -> Result<String> {
536        match &self.uniqueness_type {
537            UniquenessType::FullUniqueness { .. }
538            | UniquenessType::UniqueWithNulls { .. }
539            | UniquenessType::UniqueComposite { .. } => {
540                self.generate_full_uniqueness_sql(table_name)
541            }
542            UniquenessType::Distinctness(_) => self.generate_distinctness_sql(table_name),
543            UniquenessType::UniqueValueRatio(_) => self.generate_unique_value_ratio_sql(table_name),
544            UniquenessType::PrimaryKey => self.generate_primary_key_sql(table_name),
545        }
546    }
547
548    /// Generates SQL for full uniqueness validation.
549    fn generate_full_uniqueness_sql(&self, table_name: &str) -> Result<String> {
550        let escaped_columns: Result<Vec<String>> = self
551            .columns
552            .iter()
553            .map(|col| SqlSecurity::escape_identifier(col))
554            .collect();
555        let escaped_columns = escaped_columns?;
556
557        let columns_expr = if self.columns.len() == 1 {
558            escaped_columns[0].clone()
559        } else {
560            let cols = escaped_columns.join(", ");
561            format!("({cols})")
562        };
563
564        let sql = match &self.uniqueness_type {
565            UniquenessType::UniqueWithNulls {
566                null_handling: NullHandling::Include,
567                ..
568            } => {
569                // Include NULLs in distinct count by coalescing to a special value
570                if self.columns.len() == 1 {
571                    let col = &escaped_columns[0];
572                    format!(
573                        "SELECT 
574                            COUNT(*) as total_count,
575                            COUNT(DISTINCT COALESCE({col}, '<NULL>')) as unique_count
576                         FROM {table_name}"
577                    )
578                } else {
579                    format!(
580                        "SELECT 
581                            COUNT(*) as total_count,
582                            COUNT(DISTINCT {columns_expr}) as unique_count
583                         FROM {table_name}"
584                    )
585                }
586            }
587            UniquenessType::UniqueWithNulls {
588                null_handling: NullHandling::Distinct,
589                ..
590            } => {
591                // Each NULL is treated as distinct
592                if self.columns.len() == 1 {
593                    let col = &escaped_columns[0];
594                    format!(
595                        "SELECT 
596                            COUNT(*) as total_count,
597                            COUNT(DISTINCT {col}) + CASE WHEN COUNT(*) - COUNT({col}) > 0 THEN COUNT(*) - COUNT({col}) ELSE 0 END as unique_count
598                         FROM {table_name}"
599                    )
600                } else {
601                    // For multi-column, this is more complex - treat as regular for now
602                    format!(
603                        "SELECT 
604                            COUNT(*) as total_count,
605                            COUNT(DISTINCT {columns_expr}) as unique_count
606                         FROM {table_name}"
607                    )
608                }
609            }
610            _ => {
611                // Standard uniqueness (excludes NULLs from distinct count)
612                format!(
613                    "SELECT 
614                        COUNT(*) as total_count,
615                        COUNT(DISTINCT {columns_expr}) as unique_count
616                     FROM {table_name}"
617                )
618            }
619        };
620
621        Ok(sql)
622    }
623
624    /// Generates SQL for distinctness validation.
625    fn generate_distinctness_sql(&self, table_name: &str) -> Result<String> {
626        let escaped_columns: Result<Vec<String>> = self
627            .columns
628            .iter()
629            .map(|col| SqlSecurity::escape_identifier(col))
630            .collect();
631        let escaped_columns = escaped_columns?;
632
633        let sql = if self.columns.len() == 1 {
634            let col = &escaped_columns[0];
635            format!(
636                "SELECT 
637                    COUNT(DISTINCT {col}) as distinct_count,
638                    COUNT(*) as total_count
639                 FROM {table_name}"
640            )
641        } else {
642            // Multi-column distinctness using concatenation with NULL handling
643            let concat_expr = escaped_columns
644                .iter()
645                .map(|col| format!("COALESCE(CAST({col} AS VARCHAR), '<NULL>')"))
646                .collect::<Vec<_>>()
647                .join(" || '|' || ");
648
649            format!(
650                "SELECT 
651                    COUNT(DISTINCT ({concat_expr})) as distinct_count,
652                    COUNT(*) as total_count
653                 FROM {table_name}"
654            )
655        };
656
657        Ok(sql)
658    }
659
660    /// Generates SQL for unique value ratio validation.
661    fn generate_unique_value_ratio_sql(&self, table_name: &str) -> Result<String> {
662        let escaped_columns: Result<Vec<String>> = self
663            .columns
664            .iter()
665            .map(|col| SqlSecurity::escape_identifier(col))
666            .collect();
667        let escaped_columns = escaped_columns?;
668
669        let columns_list = escaped_columns.join(", ");
670
671        let sql = format!(
672            "WITH value_counts AS (
673                SELECT {columns_list}, COUNT(*) as cnt
674                FROM {table_name}
675                GROUP BY {columns_list}
676            )
677            SELECT 
678                COALESCE(SUM(CASE WHEN cnt = 1 THEN cnt ELSE 0 END), 0) as unique_count,
679                COALESCE(SUM(cnt), 0) as total_count
680            FROM value_counts"
681        );
682
683        Ok(sql)
684    }
685
686    /// Generates SQL for primary key validation.
687    fn generate_primary_key_sql(&self, table_name: &str) -> Result<String> {
688        let escaped_columns: Result<Vec<String>> = self
689            .columns
690            .iter()
691            .map(|col| SqlSecurity::escape_identifier(col))
692            .collect();
693        let escaped_columns = escaped_columns?;
694
695        let columns_expr = if self.columns.len() == 1 {
696            escaped_columns[0].clone()
697        } else {
698            let cols = escaped_columns.join(", ");
699            format!("({cols})")
700        };
701
702        // Check for NULLs in all columns
703        let null_check = escaped_columns
704            .iter()
705            .map(|col| format!("{col} IS NOT NULL"))
706            .collect::<Vec<_>>()
707            .join(" AND ");
708
709        let sql = format!(
710            "SELECT 
711                COUNT(*) as total_count,
712                COUNT(DISTINCT {columns_expr}) as unique_count,
713                COUNT(*) - COUNT(CASE WHEN {null_check} THEN 1 END) as null_count
714             FROM {table_name}"
715        );
716
717        Ok(sql)
718    }
719
720    /// Evaluates threshold-based uniqueness results.
721    async fn evaluate_threshold_based(
722        &self,
723        batch: &arrow::record_batch::RecordBatch,
724        threshold: f64,
725    ) -> Result<ConstraintResult> {
726        let total_count = batch
727            .column(0)
728            .as_any()
729            .downcast_ref::<arrow::array::Int64Array>()
730            .ok_or_else(|| TermError::Internal("Failed to extract total count".to_string()))?
731            .value(0) as f64;
732
733        let unique_count = batch
734            .column(1)
735            .as_any()
736            .downcast_ref::<arrow::array::Int64Array>()
737            .ok_or_else(|| TermError::Internal("Failed to extract unique count".to_string()))?
738            .value(0) as f64;
739
740        if total_count == 0.0 {
741            return Ok(ConstraintResult::skipped("No data to validate"));
742        }
743
744        let uniqueness_ratio = unique_count / total_count;
745
746        if uniqueness_ratio >= threshold {
747            Ok(ConstraintResult::success_with_metric(uniqueness_ratio))
748        } else {
749            Ok(ConstraintResult::failure_with_metric(
750                uniqueness_ratio,
751                format!(
752                    "Uniqueness ratio {uniqueness_ratio:.3} is below threshold {threshold:.3} for columns: {}",
753                    self.columns.join(", ")
754                ),
755            ))
756        }
757    }
758
759    /// Evaluates assertion-based results (distinctness and unique value ratio).
760    async fn evaluate_assertion_based(
761        &self,
762        batch: &arrow::record_batch::RecordBatch,
763        assertion: &Assertion,
764    ) -> Result<ConstraintResult> {
765        let count = batch
766            .column(0)
767            .as_any()
768            .downcast_ref::<arrow::array::Int64Array>()
769            .ok_or_else(|| TermError::Internal("Failed to extract count".to_string()))?
770            .value(0) as f64;
771
772        let total_count = batch
773            .column(1)
774            .as_any()
775            .downcast_ref::<arrow::array::Int64Array>()
776            .ok_or_else(|| TermError::Internal("Failed to extract total count".to_string()))?
777            .value(0) as f64;
778
779        if total_count == 0.0 {
780            return Ok(ConstraintResult::skipped("No data to validate"));
781        }
782
783        let ratio = count / total_count;
784
785        if assertion.evaluate(ratio) {
786            Ok(ConstraintResult::success_with_metric(ratio))
787        } else {
788            Ok(ConstraintResult::failure_with_metric(
789                ratio,
790                format!(
791                    "{} ratio {ratio:.3} does not satisfy {} for columns: {}",
792                    self.uniqueness_type.name(),
793                    assertion.description(),
794                    self.columns.join(", ")
795                ),
796            ))
797        }
798    }
799
800    /// Evaluates primary key results.
801    async fn evaluate_primary_key(
802        &self,
803        batch: &arrow::record_batch::RecordBatch,
804    ) -> Result<ConstraintResult> {
805        let total_count = batch
806            .column(0)
807            .as_any()
808            .downcast_ref::<arrow::array::Int64Array>()
809            .ok_or_else(|| TermError::Internal("Failed to extract total count".to_string()))?
810            .value(0) as f64;
811
812        let unique_count = batch
813            .column(1)
814            .as_any()
815            .downcast_ref::<arrow::array::Int64Array>()
816            .ok_or_else(|| TermError::Internal("Failed to extract unique count".to_string()))?
817            .value(0) as f64;
818
819        let null_count = batch
820            .column(2)
821            .as_any()
822            .downcast_ref::<arrow::array::Int64Array>()
823            .ok_or_else(|| TermError::Internal("Failed to extract null count".to_string()))?
824            .value(0) as f64;
825
826        if total_count == 0.0 {
827            return Ok(ConstraintResult::skipped("No data to validate"));
828        }
829
830        // Primary key validation: no NULLs and all values unique
831        if null_count > 0.0 {
832            Ok(ConstraintResult::failure_with_metric(
833                null_count / total_count,
834                format!(
835                    "Primary key columns contain {null_count} NULL values: {}",
836                    self.columns.join(", ")
837                ),
838            ))
839        } else if unique_count != total_count {
840            let duplicate_ratio = (total_count - unique_count) / total_count;
841            Ok(ConstraintResult::failure_with_metric(
842                duplicate_ratio,
843                format!(
844                    "Primary key columns contain {} duplicate values: {}",
845                    total_count - unique_count,
846                    self.columns.join(", ")
847                ),
848            ))
849        } else {
850            Ok(ConstraintResult::success_with_metric(1.0))
851        }
852    }
853}
854
855#[cfg(test)]
856mod tests {
857    use super::*;
858    use crate::constraints::Assertion;
859    use crate::core::ConstraintStatus;
860    use arrow::array::StringArray;
861    use arrow::datatypes::{DataType, Field, Schema};
862    use arrow::record_batch::RecordBatch;
863    use datafusion::datasource::MemTable;
864    use std::sync::Arc;
865
866    use crate::test_helpers::evaluate_constraint_with_context;
867    async fn create_test_context(values: Vec<Option<&str>>) -> SessionContext {
868        let ctx = SessionContext::new();
869
870        let schema = Arc::new(Schema::new(vec![Field::new(
871            "test_col",
872            DataType::Utf8,
873            true,
874        )]));
875
876        let array = StringArray::from(values);
877        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
878
879        let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
880        ctx.register_table("data", Arc::new(provider)).unwrap();
881
882        ctx
883    }
884
885    async fn create_multi_column_test_context(
886        col1_values: Vec<Option<&str>>,
887        col2_values: Vec<Option<&str>>,
888    ) -> SessionContext {
889        let ctx = SessionContext::new();
890
891        let schema = Arc::new(Schema::new(vec![
892            Field::new("col1", DataType::Utf8, true),
893            Field::new("col2", DataType::Utf8, true),
894        ]));
895
896        let array1 = StringArray::from(col1_values);
897        let array2 = StringArray::from(col2_values);
898        let batch =
899            RecordBatch::try_new(schema.clone(), vec![Arc::new(array1), Arc::new(array2)]).unwrap();
900
901        let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
902        ctx.register_table("data", Arc::new(provider)).unwrap();
903
904        ctx
905    }
906
907    #[tokio::test]
908    async fn test_full_uniqueness_single_column() {
909        let values = vec![Some("A"), Some("B"), Some("C"), Some("A")];
910        let ctx = create_test_context(values).await;
911
912        let constraint = UniquenessConstraint::full_uniqueness("test_col", 0.7).unwrap();
913
914        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
915            .await
916            .unwrap();
917        assert_eq!(result.status, ConstraintStatus::Success);
918        assert_eq!(result.metric, Some(0.75)); // 3 unique out of 4 total
919    }
920
921    #[tokio::test]
922    async fn test_full_uniqueness_with_nulls() {
923        let values = vec![Some("A"), Some("B"), None, Some("A")];
924        let ctx = create_test_context(values).await;
925
926        // Standard uniqueness (excludes NULLs from distinct count)
927        let constraint = UniquenessConstraint::full_uniqueness("test_col", 0.4).unwrap();
928
929        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
930            .await
931            .unwrap();
932        assert_eq!(result.status, ConstraintStatus::Success);
933        assert_eq!(result.metric, Some(0.5)); // 2 unique non-null out of 4 total
934    }
935
936    #[tokio::test]
937    async fn test_distinctness_constraint() {
938        let values = vec![Some("A"), Some("B"), Some("C"), Some("A")];
939        let ctx = create_test_context(values).await;
940
941        let constraint =
942            UniquenessConstraint::distinctness(vec!["test_col"], Assertion::Equals(0.75)).unwrap();
943
944        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
945            .await
946            .unwrap();
947        assert_eq!(result.status, ConstraintStatus::Success);
948        assert_eq!(result.metric, Some(0.75)); // 3 distinct out of 4 total
949    }
950
951    #[tokio::test]
952    async fn test_unique_value_ratio_constraint() {
953        let values = vec![Some("A"), Some("B"), Some("C"), Some("A")];
954        let ctx = create_test_context(values).await;
955
956        let constraint =
957            UniquenessConstraint::unique_value_ratio(vec!["test_col"], Assertion::Equals(0.5))
958                .unwrap();
959
960        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
961            .await
962            .unwrap();
963        assert_eq!(result.status, ConstraintStatus::Success);
964        assert_eq!(result.metric, Some(0.5)); // 2 values appear exactly once out of 4 total
965    }
966
967    #[tokio::test]
968    async fn test_primary_key_success() {
969        let values = vec![Some("A"), Some("B"), Some("C")];
970        let ctx = create_test_context(values).await;
971
972        let constraint = UniquenessConstraint::primary_key(vec!["test_col"]).unwrap();
973
974        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
975            .await
976            .unwrap();
977        assert_eq!(result.status, ConstraintStatus::Success);
978        assert_eq!(result.metric, Some(1.0));
979    }
980
981    #[tokio::test]
982    async fn test_primary_key_with_nulls() {
983        let values = vec![Some("A"), Some("B"), None];
984        let ctx = create_test_context(values).await;
985
986        let constraint = UniquenessConstraint::primary_key(vec!["test_col"]).unwrap();
987
988        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
989            .await
990            .unwrap();
991        assert_eq!(result.status, ConstraintStatus::Failure);
992        assert!(result.message.unwrap().contains("NULL values"));
993    }
994
995    #[tokio::test]
996    async fn test_primary_key_with_duplicates() {
997        let values = vec![Some("A"), Some("B"), Some("A")];
998        let ctx = create_test_context(values).await;
999
1000        let constraint = UniquenessConstraint::primary_key(vec!["test_col"]).unwrap();
1001
1002        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
1003            .await
1004            .unwrap();
1005        assert_eq!(result.status, ConstraintStatus::Failure);
1006        assert!(result.message.unwrap().contains("duplicate values"));
1007    }
1008
1009    #[tokio::test]
1010    async fn test_multi_column_uniqueness() {
1011        let col1_values = vec![Some("A"), Some("B"), Some("A")];
1012        let col2_values = vec![Some("1"), Some("2"), Some("2")];
1013        let ctx = create_multi_column_test_context(col1_values, col2_values).await;
1014
1015        let constraint =
1016            UniquenessConstraint::full_uniqueness_multi(vec!["col1", "col2"], 0.9).unwrap();
1017
1018        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
1019            .await
1020            .unwrap();
1021        assert_eq!(result.status, ConstraintStatus::Success);
1022        assert_eq!(result.metric, Some(1.0)); // All combinations are unique
1023    }
1024
1025    #[tokio::test]
1026    async fn test_multi_column_distinctness() {
1027        let col1_values = vec![Some("A"), Some("B"), Some("A")];
1028        let col2_values = vec![Some("1"), Some("2"), Some("1")];
1029        let ctx = create_multi_column_test_context(col1_values, col2_values).await;
1030
1031        let constraint =
1032            UniquenessConstraint::distinctness(vec!["col1", "col2"], Assertion::GreaterThan(0.5))
1033                .unwrap();
1034
1035        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
1036            .await
1037            .unwrap();
1038        assert_eq!(result.status, ConstraintStatus::Success);
1039        // Two distinct combinations: A|1 and B|2, plus A|1 repeated = 2/3 = 0.67
1040        assert!((result.metric.unwrap() - 2.0 / 3.0).abs() < 0.01);
1041    }
1042
1043    #[tokio::test]
1044    async fn test_unique_with_nulls_include() {
1045        let values = vec![Some("A"), Some("B"), None, None];
1046        let ctx = create_test_context(values).await;
1047
1048        let constraint =
1049            UniquenessConstraint::unique_with_nulls(vec!["test_col"], 0.4, NullHandling::Include)
1050                .unwrap();
1051
1052        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
1053            .await
1054            .unwrap();
1055        assert_eq!(result.status, ConstraintStatus::Success);
1056        assert_eq!(result.metric, Some(0.75)); // A, B, NULL (treated as one value) = 3/4
1057    }
1058
1059    #[tokio::test]
1060    async fn test_empty_data() {
1061        let values: Vec<Option<&str>> = vec![];
1062        let ctx = create_test_context(values).await;
1063
1064        let constraint = UniquenessConstraint::full_uniqueness("test_col", 1.0).unwrap();
1065
1066        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
1067            .await
1068            .unwrap();
1069        assert_eq!(result.status, ConstraintStatus::Skipped);
1070    }
1071
1072    #[tokio::test]
1073    async fn test_invalid_threshold() {
1074        let result = UniquenessConstraint::full_uniqueness("col", 1.5);
1075        assert!(result.is_err());
1076        assert!(result
1077            .unwrap_err()
1078            .to_string()
1079            .contains("Threshold must be between 0.0 and 1.0"));
1080    }
1081
1082    #[tokio::test]
1083    async fn test_empty_columns() {
1084        let columns: Vec<String> = vec![];
1085        let result = UniquenessConstraint::new(
1086            columns,
1087            UniquenessType::FullUniqueness { threshold: 1.0 },
1088            UniquenessOptions::default(),
1089        );
1090        assert!(result.is_err());
1091        assert!(result
1092            .unwrap_err()
1093            .to_string()
1094            .contains("At least one column must be specified"));
1095    }
1096
1097    #[tokio::test]
1098    async fn test_constraint_metadata() {
1099        let constraint = UniquenessConstraint::full_uniqueness("test_col", 0.95).unwrap();
1100        let metadata = constraint.metadata();
1101
1102        assert!(metadata
1103            .description
1104            .unwrap_or_default()
1105            .contains("Unified uniqueness constraint"));
1106        assert_eq!(constraint.name(), "full_uniqueness");
1107        assert_eq!(constraint.column(), Some("test_col"));
1108    }
1109
1110    #[tokio::test]
1111    async fn test_multi_column_metadata() {
1112        let constraint =
1113            UniquenessConstraint::full_uniqueness_multi(vec!["col1", "col2"], 0.9).unwrap();
1114
1115        assert_eq!(constraint.column(), None); // Multi-column has no single column
1116        assert_eq!(constraint.columns(), &["col1", "col2"]);
1117    }
1118}