term_guard/constraints/
uniqueness.rs

1//! Uniqueness constraint implementation.
2//!
3//! This module provides a comprehensive constraint that handles all uniqueness-related
4//! validations including full uniqueness, distinctness, unique value ratios, and primary keys.
5
6use crate::constraints::Assertion;
7use crate::core::{Constraint, ConstraintMetadata, ConstraintResult};
8use crate::prelude::*;
9use crate::security::SqlSecurity;
10use arrow::array::Array;
11use async_trait::async_trait;
12use datafusion::prelude::*;
13use std::fmt;
14use tracing::instrument;
15
16/// Null handling strategy for uniqueness constraints.
17///
18/// Defines how NULL values should be treated when evaluating uniqueness.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
20pub enum NullHandling {
21    /// Exclude NULL values from uniqueness calculations (default behavior).
22    /// NULLs are not counted in distinct counts but are included in total counts.
23    #[default]
24    Exclude,
25
26    /// Include NULL values as regular values in uniqueness calculations.
27    /// Multiple NULLs are treated as duplicate values.
28    Include,
29
30    /// Treat each NULL as a distinct value.
31    /// Each NULL is considered unique from every other NULL.
32    Distinct,
33}
34
35impl fmt::Display for NullHandling {
36    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37        match self {
38            NullHandling::Exclude => write!(f, "exclude"),
39            NullHandling::Include => write!(f, "include"),
40            NullHandling::Distinct => write!(f, "distinct"),
41        }
42    }
43}
44
45/// Type of uniqueness validation to perform.
46///
47/// This enum encompasses all the different types of uniqueness checks that were
48/// previously handled by separate constraint types.
49#[derive(Debug, Clone, PartialEq)]
50pub enum UniquenessType {
51    /// Full uniqueness validation with configurable threshold.
52    ///
53    /// Replaces the functionality of `UniquenessConstraint`.
54    /// Validates that at least `threshold` ratio of values are unique.
55    FullUniqueness { threshold: f64 },
56
57    /// Distinctness validation using flexible assertions.
58    ///
59    /// Replaces the functionality of `DistinctnessConstraint`.
60    /// Validates the ratio of distinct values using assertion-based logic.
61    Distinctness(Assertion),
62
63    /// Unique value ratio validation using flexible assertions.
64    ///
65    /// Replaces the functionality of `UniqueValueRatioConstraint`.
66    /// Validates the ratio of values that appear exactly once.
67    UniqueValueRatio(Assertion),
68
69    /// Primary key validation (unique + non-null).
70    ///
71    /// Replaces the functionality of `PrimaryKeyConstraint`.
72    /// Enforces that values are both unique and contain no NULLs.
73    PrimaryKey,
74
75    /// Uniqueness validation that allows NULL values.
76    ///
77    /// Similar to FullUniqueness but with explicit NULL handling control.
78    UniqueWithNulls {
79        threshold: f64,
80        null_handling: NullHandling,
81    },
82
83    /// Composite uniqueness with advanced null handling.
84    ///
85    /// Optimized for multi-column uniqueness checks with configurable
86    /// null handling strategies.
87    UniqueComposite {
88        threshold: f64,
89        null_handling: NullHandling,
90        case_sensitive: bool,
91    },
92}
93
94impl UniquenessType {
95    /// Returns the name of this uniqueness type for tracing and metadata.
96    pub fn name(&self) -> &str {
97        match self {
98            UniquenessType::FullUniqueness { .. } => "full_uniqueness",
99            UniquenessType::Distinctness(_) => "distinctness",
100            UniquenessType::UniqueValueRatio(_) => "unique_value_ratio",
101            UniquenessType::PrimaryKey => "primary_key",
102            UniquenessType::UniqueWithNulls { .. } => "unique_with_nulls",
103            UniquenessType::UniqueComposite { .. } => "unique_composite",
104        }
105    }
106
107    /// Returns a human-readable description of this uniqueness type.
108    pub fn description(&self) -> String {
109        match self {
110            UniquenessType::FullUniqueness { threshold } => {
111                let threshold_pct = threshold * 100.0;
112                format!("validates that at least {threshold_pct:.1}% of values are unique")
113            }
114            UniquenessType::Distinctness(assertion) => {
115                format!(
116                    "validates that distinct value ratio {}",
117                    assertion.description()
118                )
119            }
120            UniquenessType::UniqueValueRatio(assertion) => {
121                format!(
122                    "validates that unique value ratio {}",
123                    assertion.description()
124                )
125            }
126            UniquenessType::PrimaryKey => {
127                "validates that values form a valid primary key (unique + non-null)".to_string()
128            }
129            UniquenessType::UniqueWithNulls {
130                threshold,
131                null_handling,
132            } => {
133                let threshold_pct = threshold * 100.0;
134                format!(
135                    "validates that at least {threshold_pct:.1}% of values are unique (nulls: {null_handling})"
136                )
137            }
138            UniquenessType::UniqueComposite {
139                threshold,
140                null_handling,
141                case_sensitive,
142            } => {
143                let threshold_pct = threshold * 100.0;
144                format!(
145                    "validates composite uniqueness at {threshold_pct:.1}% threshold (nulls: {null_handling}, case-sensitive: {case_sensitive})"
146                )
147            }
148        }
149    }
150}
151
152/// Options for configuring uniqueness constraint behavior.
153#[derive(Debug, Clone, PartialEq)]
154pub struct UniquenessOptions {
155    /// How to handle NULL values in uniqueness calculations.
156    pub null_handling: NullHandling,
157
158    /// Whether string comparisons should be case-sensitive.
159    pub case_sensitive: bool,
160
161    /// Whether to trim whitespace before comparison.
162    pub trim_whitespace: bool,
163}
164
165impl Default for UniquenessOptions {
166    fn default() -> Self {
167        Self {
168            null_handling: NullHandling::default(),
169            case_sensitive: true,
170            trim_whitespace: false,
171        }
172    }
173}
174
175impl UniquenessOptions {
176    /// Creates new options with default values.
177    pub fn new() -> Self {
178        Self::default()
179    }
180
181    /// Sets the null handling strategy.
182    pub fn with_null_handling(mut self, null_handling: NullHandling) -> Self {
183        self.null_handling = null_handling;
184        self
185    }
186
187    /// Sets whether string comparisons should be case-sensitive.
188    pub fn case_sensitive(mut self, case_sensitive: bool) -> Self {
189        self.case_sensitive = case_sensitive;
190        self
191    }
192
193    /// Sets whether to trim whitespace before comparison.
194    pub fn trim_whitespace(mut self, trim_whitespace: bool) -> Self {
195        self.trim_whitespace = trim_whitespace;
196        self
197    }
198}
199
200/// A unified constraint that handles all types of uniqueness validation.
201///
202/// This constraint consolidates the functionality of multiple uniqueness-related constraints:
203/// - `UniquenessConstraint` - Full uniqueness with threshold
204/// - `DistinctnessConstraint` - Distinctness ratio validation
205/// - `UniqueValueRatioConstraint` - Values appearing exactly once
206/// - `PrimaryKeyConstraint` - Unique + non-null validation
207///
208/// # Examples
209///
210/// ## Full Uniqueness (replacing UniquenessConstraint)
211///
212/// ```rust
213/// use term_guard::constraints::{UniquenessConstraint, UniquenessType};
214///
215/// // Single column uniqueness
216/// let constraint = UniquenessConstraint::full_uniqueness("user_id", 1.0)?;
217///
218/// // Multi-column uniqueness with threshold
219/// let constraint = UniquenessConstraint::full_uniqueness_multi(
220///     vec!["email", "domain"],
221///     0.95
222/// )?;
223/// # Ok::<(), Box<dyn std::error::Error>>(())
224/// ```
225///
226/// ## Distinctness (replacing DistinctnessConstraint)
227///
228/// ```rust
229/// use term_guard::constraints::{UniquenessConstraint, Assertion};
230///
231/// let constraint = UniquenessConstraint::distinctness(
232///     vec!["category"],
233///     Assertion::GreaterThan(0.8)
234/// )?;
235/// # Ok::<(), Box<dyn std::error::Error>>(())
236/// ```
237///
238/// ## Primary Key (replacing PrimaryKeyConstraint)
239///
240/// ```rust
241/// use term_guard::constraints::UniquenessConstraint;
242///
243/// let constraint = UniquenessConstraint::primary_key(
244///     vec!["order_id", "line_item_id"]
245/// )?;
246/// # Ok::<(), Box<dyn std::error::Error>>(())
247/// ```
248#[derive(Debug, Clone)]
249pub struct UniquenessConstraint {
250    columns: Vec<String>,
251    uniqueness_type: UniquenessType,
252    options: UniquenessOptions,
253}
254
255impl UniquenessConstraint {
256    /// Creates a new unified uniqueness constraint.
257    ///
258    /// # Arguments
259    ///
260    /// * `columns` - The columns to check for uniqueness
261    /// * `uniqueness_type` - The type of uniqueness validation to perform
262    /// * `options` - Configuration options for the constraint
263    ///
264    /// # Errors
265    ///
266    /// Returns error if column names are invalid or thresholds are out of range.
267    pub fn new<I, S>(
268        columns: I,
269        uniqueness_type: UniquenessType,
270        options: UniquenessOptions,
271    ) -> Result<Self>
272    where
273        I: IntoIterator<Item = S>,
274        S: Into<String>,
275    {
276        let column_vec: Vec<String> = columns.into_iter().map(Into::into).collect();
277
278        if column_vec.is_empty() {
279            return Err(TermError::validation_failed(
280                "unified_uniqueness",
281                "At least one column must be specified",
282            ));
283        }
284
285        // Validate column names
286        for column in &column_vec {
287            SqlSecurity::validate_identifier(column)?;
288        }
289
290        // Validate thresholds in uniqueness types
291        match &uniqueness_type {
292            UniquenessType::FullUniqueness { threshold }
293            | UniquenessType::UniqueWithNulls { threshold, .. }
294            | UniquenessType::UniqueComposite { threshold, .. } => {
295                if !((0.0..=1.0).contains(threshold)) {
296                    return Err(TermError::validation_failed(
297                        "unified_uniqueness",
298                        "Threshold must be between 0.0 and 1.0",
299                    ));
300                }
301            }
302            _ => {} // Other types don't have threshold validation
303        }
304
305        Ok(Self {
306            columns: column_vec,
307            uniqueness_type,
308            options,
309        })
310    }
311
312    /// Creates a full uniqueness constraint for a single column.
313    ///
314    /// This replaces `UniquenessConstraint::single()`.
315    pub fn full_uniqueness(column: impl Into<String>, threshold: f64) -> Result<Self> {
316        Self::new(
317            vec![column.into()],
318            UniquenessType::FullUniqueness { threshold },
319            UniquenessOptions::default(),
320        )
321    }
322
323    /// Creates a full uniqueness constraint for multiple columns.
324    ///
325    /// This replaces `UniquenessConstraint::multiple()` and `UniquenessConstraint::with_threshold()`.
326    pub fn full_uniqueness_multi<I, S>(columns: I, threshold: f64) -> Result<Self>
327    where
328        I: IntoIterator<Item = S>,
329        S: Into<String>,
330    {
331        Self::new(
332            columns,
333            UniquenessType::FullUniqueness { threshold },
334            UniquenessOptions::default(),
335        )
336    }
337
338    /// Creates a distinctness constraint.
339    ///
340    /// This replaces `DistinctnessConstraint::new()`.
341    pub fn distinctness<I, S>(columns: I, assertion: Assertion) -> Result<Self>
342    where
343        I: IntoIterator<Item = S>,
344        S: Into<String>,
345    {
346        Self::new(
347            columns,
348            UniquenessType::Distinctness(assertion),
349            UniquenessOptions::default(),
350        )
351    }
352
353    /// Creates a unique value ratio constraint.
354    ///
355    /// This replaces `UniqueValueRatioConstraint::new()`.
356    pub fn unique_value_ratio<I, S>(columns: I, assertion: Assertion) -> Result<Self>
357    where
358        I: IntoIterator<Item = S>,
359        S: Into<String>,
360    {
361        Self::new(
362            columns,
363            UniquenessType::UniqueValueRatio(assertion),
364            UniquenessOptions::default(),
365        )
366    }
367
368    /// Creates a primary key constraint.
369    ///
370    /// This replaces `PrimaryKeyConstraint::new()`.
371    pub fn primary_key<I, S>(columns: I) -> Result<Self>
372    where
373        I: IntoIterator<Item = S>,
374        S: Into<String>,
375    {
376        Self::new(
377            columns,
378            UniquenessType::PrimaryKey,
379            UniquenessOptions::default(),
380        )
381    }
382
383    /// Creates a uniqueness constraint that allows NULLs.
384    pub fn unique_with_nulls<I, S>(
385        columns: I,
386        threshold: f64,
387        null_handling: NullHandling,
388    ) -> Result<Self>
389    where
390        I: IntoIterator<Item = S>,
391        S: Into<String>,
392    {
393        Self::new(
394            columns,
395            UniquenessType::UniqueWithNulls {
396                threshold,
397                null_handling,
398            },
399            UniquenessOptions::default(),
400        )
401    }
402
403    /// Creates a composite uniqueness constraint with advanced options.
404    pub fn unique_composite<I, S>(
405        columns: I,
406        threshold: f64,
407        null_handling: NullHandling,
408        case_sensitive: bool,
409    ) -> Result<Self>
410    where
411        I: IntoIterator<Item = S>,
412        S: Into<String>,
413    {
414        Self::new(
415            columns,
416            UniquenessType::UniqueComposite {
417                threshold,
418                null_handling,
419                case_sensitive,
420            },
421            UniquenessOptions::new()
422                .with_null_handling(null_handling)
423                .case_sensitive(case_sensitive),
424        )
425    }
426
427    /// Returns the columns being validated.
428    pub fn columns(&self) -> &[String] {
429        &self.columns
430    }
431
432    /// Returns the uniqueness type.
433    pub fn uniqueness_type(&self) -> &UniquenessType {
434        &self.uniqueness_type
435    }
436
437    /// Returns the constraint options.
438    pub fn options(&self) -> &UniquenessOptions {
439        &self.options
440    }
441}
442
443#[async_trait]
444impl Constraint for UniquenessConstraint {
445    #[instrument(skip(self, ctx), fields(
446        columns = ?self.columns,
447        uniqueness_type = %self.uniqueness_type.name(),
448        null_handling = %self.options.null_handling
449    ))]
450    async fn evaluate(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
451        // Generate SQL based on uniqueness type
452        let sql = self.generate_sql()?;
453
454        let df = ctx.sql(&sql).await?;
455        let batches = df.collect().await?;
456
457        if batches.is_empty() {
458            return Ok(ConstraintResult::skipped("No data to validate"));
459        }
460
461        let batch = &batches[0];
462        if batch.num_rows() == 0 {
463            return Ok(ConstraintResult::skipped("No data to validate"));
464        }
465
466        // Process results based on uniqueness type
467        match &self.uniqueness_type {
468            UniquenessType::FullUniqueness { threshold }
469            | UniquenessType::UniqueWithNulls { threshold, .. }
470            | UniquenessType::UniqueComposite { threshold, .. } => {
471                self.evaluate_threshold_based(batch, *threshold).await
472            }
473            UniquenessType::Distinctness(assertion)
474            | UniquenessType::UniqueValueRatio(assertion) => {
475                self.evaluate_assertion_based(batch, assertion).await
476            }
477            UniquenessType::PrimaryKey => self.evaluate_primary_key(batch).await,
478        }
479    }
480
481    fn name(&self) -> &str {
482        self.uniqueness_type.name()
483    }
484
485    fn column(&self) -> Option<&str> {
486        if self.columns.len() == 1 {
487            Some(&self.columns[0])
488        } else {
489            None
490        }
491    }
492
493    fn metadata(&self) -> ConstraintMetadata {
494        let mut metadata = if self.columns.len() == 1 {
495            ConstraintMetadata::for_column(&self.columns[0])
496        } else {
497            ConstraintMetadata::for_columns(&self.columns)
498        };
499
500        metadata = metadata
501            .with_description(format!(
502                "Unified uniqueness constraint that {}",
503                self.uniqueness_type.description()
504            ))
505            .with_custom("uniqueness_type", self.uniqueness_type.name())
506            .with_custom("null_handling", self.options.null_handling.to_string())
507            .with_custom("case_sensitive", self.options.case_sensitive.to_string())
508            .with_custom("constraint_type", "uniqueness");
509
510        // Add type-specific metadata
511        match &self.uniqueness_type {
512            UniquenessType::FullUniqueness { threshold }
513            | UniquenessType::UniqueWithNulls { threshold, .. }
514            | UniquenessType::UniqueComposite { threshold, .. } => {
515                metadata = metadata.with_custom("threshold", threshold.to_string());
516            }
517            UniquenessType::Distinctness(assertion)
518            | UniquenessType::UniqueValueRatio(assertion) => {
519                metadata = metadata.with_custom("assertion", assertion.to_string());
520            }
521            UniquenessType::PrimaryKey => {
522                metadata = metadata.with_custom("strict", "true");
523            }
524        }
525
526        metadata
527    }
528}
529
530impl UniquenessConstraint {
531    /// Generates SQL query based on the uniqueness type and options.
532    fn generate_sql(&self) -> Result<String> {
533        match &self.uniqueness_type {
534            UniquenessType::FullUniqueness { .. }
535            | UniquenessType::UniqueWithNulls { .. }
536            | UniquenessType::UniqueComposite { .. } => self.generate_full_uniqueness_sql(),
537            UniquenessType::Distinctness(_) => self.generate_distinctness_sql(),
538            UniquenessType::UniqueValueRatio(_) => self.generate_unique_value_ratio_sql(),
539            UniquenessType::PrimaryKey => self.generate_primary_key_sql(),
540        }
541    }
542
543    /// Generates SQL for full uniqueness validation.
544    fn generate_full_uniqueness_sql(&self) -> Result<String> {
545        let escaped_columns: Result<Vec<String>> = self
546            .columns
547            .iter()
548            .map(|col| SqlSecurity::escape_identifier(col))
549            .collect();
550        let escaped_columns = escaped_columns?;
551
552        let columns_expr = if self.columns.len() == 1 {
553            escaped_columns[0].clone()
554        } else {
555            let cols = escaped_columns.join(", ");
556            format!("({cols})")
557        };
558
559        let sql = match &self.uniqueness_type {
560            UniquenessType::UniqueWithNulls {
561                null_handling: NullHandling::Include,
562                ..
563            } => {
564                // Include NULLs in distinct count by coalescing to a special value
565                if self.columns.len() == 1 {
566                    let col = &escaped_columns[0];
567                    format!(
568                        "SELECT 
569                            COUNT(*) as total_count,
570                            COUNT(DISTINCT COALESCE({col}, '<NULL>')) as unique_count
571                         FROM data"
572                    )
573                } else {
574                    format!(
575                        "SELECT 
576                            COUNT(*) as total_count,
577                            COUNT(DISTINCT {columns_expr}) as unique_count
578                         FROM data"
579                    )
580                }
581            }
582            UniquenessType::UniqueWithNulls {
583                null_handling: NullHandling::Distinct,
584                ..
585            } => {
586                // Each NULL is treated as distinct
587                if self.columns.len() == 1 {
588                    let col = &escaped_columns[0];
589                    format!(
590                        "SELECT 
591                            COUNT(*) as total_count,
592                            COUNT(DISTINCT {col}) + CASE WHEN COUNT(*) - COUNT({col}) > 0 THEN COUNT(*) - COUNT({col}) ELSE 0 END as unique_count
593                         FROM data"
594                    )
595                } else {
596                    // For multi-column, this is more complex - treat as regular for now
597                    format!(
598                        "SELECT 
599                            COUNT(*) as total_count,
600                            COUNT(DISTINCT {columns_expr}) as unique_count
601                         FROM data"
602                    )
603                }
604            }
605            _ => {
606                // Standard uniqueness (excludes NULLs from distinct count)
607                format!(
608                    "SELECT 
609                        COUNT(*) as total_count,
610                        COUNT(DISTINCT {columns_expr}) as unique_count
611                     FROM data"
612                )
613            }
614        };
615
616        Ok(sql)
617    }
618
619    /// Generates SQL for distinctness validation.
620    fn generate_distinctness_sql(&self) -> Result<String> {
621        let escaped_columns: Result<Vec<String>> = self
622            .columns
623            .iter()
624            .map(|col| SqlSecurity::escape_identifier(col))
625            .collect();
626        let escaped_columns = escaped_columns?;
627
628        let sql = if self.columns.len() == 1 {
629            let col = &escaped_columns[0];
630            format!(
631                "SELECT 
632                    COUNT(DISTINCT {col}) as distinct_count,
633                    COUNT(*) as total_count
634                 FROM data"
635            )
636        } else {
637            // Multi-column distinctness using concatenation with NULL handling
638            let concat_expr = escaped_columns
639                .iter()
640                .map(|col| format!("COALESCE(CAST({col} AS VARCHAR), '<NULL>')"))
641                .collect::<Vec<_>>()
642                .join(" || '|' || ");
643
644            format!(
645                "SELECT 
646                    COUNT(DISTINCT ({concat_expr})) as distinct_count,
647                    COUNT(*) as total_count
648                 FROM data"
649            )
650        };
651
652        Ok(sql)
653    }
654
655    /// Generates SQL for unique value ratio validation.
656    fn generate_unique_value_ratio_sql(&self) -> Result<String> {
657        let escaped_columns: Result<Vec<String>> = self
658            .columns
659            .iter()
660            .map(|col| SqlSecurity::escape_identifier(col))
661            .collect();
662        let escaped_columns = escaped_columns?;
663
664        let columns_list = escaped_columns.join(", ");
665
666        let sql = format!(
667            "WITH value_counts AS (
668                SELECT {columns_list}, COUNT(*) as cnt
669                FROM data
670                GROUP BY {columns_list}
671            )
672            SELECT 
673                COALESCE(SUM(CASE WHEN cnt = 1 THEN cnt ELSE 0 END), 0) as unique_count,
674                COALESCE(SUM(cnt), 0) as total_count
675            FROM value_counts"
676        );
677
678        Ok(sql)
679    }
680
681    /// Generates SQL for primary key validation.
682    fn generate_primary_key_sql(&self) -> Result<String> {
683        let escaped_columns: Result<Vec<String>> = self
684            .columns
685            .iter()
686            .map(|col| SqlSecurity::escape_identifier(col))
687            .collect();
688        let escaped_columns = escaped_columns?;
689
690        let columns_expr = if self.columns.len() == 1 {
691            escaped_columns[0].clone()
692        } else {
693            let cols = escaped_columns.join(", ");
694            format!("({cols})")
695        };
696
697        // Check for NULLs in all columns
698        let null_check = escaped_columns
699            .iter()
700            .map(|col| format!("{col} IS NOT NULL"))
701            .collect::<Vec<_>>()
702            .join(" AND ");
703
704        let sql = format!(
705            "SELECT 
706                COUNT(*) as total_count,
707                COUNT(DISTINCT {columns_expr}) as unique_count,
708                COUNT(*) - COUNT(CASE WHEN {null_check} THEN 1 END) as null_count
709             FROM data"
710        );
711
712        Ok(sql)
713    }
714
715    /// Evaluates threshold-based uniqueness results.
716    async fn evaluate_threshold_based(
717        &self,
718        batch: &arrow::record_batch::RecordBatch,
719        threshold: f64,
720    ) -> Result<ConstraintResult> {
721        let total_count = batch
722            .column(0)
723            .as_any()
724            .downcast_ref::<arrow::array::Int64Array>()
725            .ok_or_else(|| TermError::Internal("Failed to extract total count".to_string()))?
726            .value(0) as f64;
727
728        let unique_count = batch
729            .column(1)
730            .as_any()
731            .downcast_ref::<arrow::array::Int64Array>()
732            .ok_or_else(|| TermError::Internal("Failed to extract unique count".to_string()))?
733            .value(0) as f64;
734
735        if total_count == 0.0 {
736            return Ok(ConstraintResult::skipped("No data to validate"));
737        }
738
739        let uniqueness_ratio = unique_count / total_count;
740
741        if uniqueness_ratio >= threshold {
742            Ok(ConstraintResult::success_with_metric(uniqueness_ratio))
743        } else {
744            Ok(ConstraintResult::failure_with_metric(
745                uniqueness_ratio,
746                format!(
747                    "Uniqueness ratio {uniqueness_ratio:.3} is below threshold {threshold:.3} for columns: {}",
748                    self.columns.join(", ")
749                ),
750            ))
751        }
752    }
753
754    /// Evaluates assertion-based results (distinctness and unique value ratio).
755    async fn evaluate_assertion_based(
756        &self,
757        batch: &arrow::record_batch::RecordBatch,
758        assertion: &Assertion,
759    ) -> Result<ConstraintResult> {
760        let count = batch
761            .column(0)
762            .as_any()
763            .downcast_ref::<arrow::array::Int64Array>()
764            .ok_or_else(|| TermError::Internal("Failed to extract count".to_string()))?
765            .value(0) as f64;
766
767        let total_count = batch
768            .column(1)
769            .as_any()
770            .downcast_ref::<arrow::array::Int64Array>()
771            .ok_or_else(|| TermError::Internal("Failed to extract total count".to_string()))?
772            .value(0) as f64;
773
774        if total_count == 0.0 {
775            return Ok(ConstraintResult::skipped("No data to validate"));
776        }
777
778        let ratio = count / total_count;
779
780        if assertion.evaluate(ratio) {
781            Ok(ConstraintResult::success_with_metric(ratio))
782        } else {
783            Ok(ConstraintResult::failure_with_metric(
784                ratio,
785                format!(
786                    "{} ratio {ratio:.3} does not satisfy {} for columns: {}",
787                    self.uniqueness_type.name(),
788                    assertion.description(),
789                    self.columns.join(", ")
790                ),
791            ))
792        }
793    }
794
795    /// Evaluates primary key results.
796    async fn evaluate_primary_key(
797        &self,
798        batch: &arrow::record_batch::RecordBatch,
799    ) -> Result<ConstraintResult> {
800        let total_count = batch
801            .column(0)
802            .as_any()
803            .downcast_ref::<arrow::array::Int64Array>()
804            .ok_or_else(|| TermError::Internal("Failed to extract total count".to_string()))?
805            .value(0) as f64;
806
807        let unique_count = batch
808            .column(1)
809            .as_any()
810            .downcast_ref::<arrow::array::Int64Array>()
811            .ok_or_else(|| TermError::Internal("Failed to extract unique count".to_string()))?
812            .value(0) as f64;
813
814        let null_count = batch
815            .column(2)
816            .as_any()
817            .downcast_ref::<arrow::array::Int64Array>()
818            .ok_or_else(|| TermError::Internal("Failed to extract null count".to_string()))?
819            .value(0) as f64;
820
821        if total_count == 0.0 {
822            return Ok(ConstraintResult::skipped("No data to validate"));
823        }
824
825        // Primary key validation: no NULLs and all values unique
826        if null_count > 0.0 {
827            Ok(ConstraintResult::failure_with_metric(
828                null_count / total_count,
829                format!(
830                    "Primary key columns contain {null_count} NULL values: {}",
831                    self.columns.join(", ")
832                ),
833            ))
834        } else if unique_count != total_count {
835            let duplicate_ratio = (total_count - unique_count) / total_count;
836            Ok(ConstraintResult::failure_with_metric(
837                duplicate_ratio,
838                format!(
839                    "Primary key columns contain {} duplicate values: {}",
840                    total_count - unique_count,
841                    self.columns.join(", ")
842                ),
843            ))
844        } else {
845            Ok(ConstraintResult::success_with_metric(1.0))
846        }
847    }
848}
849
850#[cfg(test)]
851mod tests {
852    use super::*;
853    use crate::constraints::Assertion;
854    use crate::core::ConstraintStatus;
855    use arrow::array::StringArray;
856    use arrow::datatypes::{DataType, Field, Schema};
857    use arrow::record_batch::RecordBatch;
858    use datafusion::datasource::MemTable;
859    use std::sync::Arc;
860
861    async fn create_test_context(values: Vec<Option<&str>>) -> SessionContext {
862        let ctx = SessionContext::new();
863
864        let schema = Arc::new(Schema::new(vec![Field::new(
865            "test_col",
866            DataType::Utf8,
867            true,
868        )]));
869
870        let array = StringArray::from(values);
871        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
872
873        let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
874        ctx.register_table("data", Arc::new(provider)).unwrap();
875
876        ctx
877    }
878
879    async fn create_multi_column_test_context(
880        col1_values: Vec<Option<&str>>,
881        col2_values: Vec<Option<&str>>,
882    ) -> SessionContext {
883        let ctx = SessionContext::new();
884
885        let schema = Arc::new(Schema::new(vec![
886            Field::new("col1", DataType::Utf8, true),
887            Field::new("col2", DataType::Utf8, true),
888        ]));
889
890        let array1 = StringArray::from(col1_values);
891        let array2 = StringArray::from(col2_values);
892        let batch =
893            RecordBatch::try_new(schema.clone(), vec![Arc::new(array1), Arc::new(array2)]).unwrap();
894
895        let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
896        ctx.register_table("data", Arc::new(provider)).unwrap();
897
898        ctx
899    }
900
901    #[tokio::test]
902    async fn test_full_uniqueness_single_column() {
903        let values = vec![Some("A"), Some("B"), Some("C"), Some("A")];
904        let ctx = create_test_context(values).await;
905
906        let constraint = UniquenessConstraint::full_uniqueness("test_col", 0.7).unwrap();
907
908        let result = constraint.evaluate(&ctx).await.unwrap();
909        assert_eq!(result.status, ConstraintStatus::Success);
910        assert_eq!(result.metric, Some(0.75)); // 3 unique out of 4 total
911    }
912
913    #[tokio::test]
914    async fn test_full_uniqueness_with_nulls() {
915        let values = vec![Some("A"), Some("B"), None, Some("A")];
916        let ctx = create_test_context(values).await;
917
918        // Standard uniqueness (excludes NULLs from distinct count)
919        let constraint = UniquenessConstraint::full_uniqueness("test_col", 0.4).unwrap();
920
921        let result = constraint.evaluate(&ctx).await.unwrap();
922        assert_eq!(result.status, ConstraintStatus::Success);
923        assert_eq!(result.metric, Some(0.5)); // 2 unique non-null out of 4 total
924    }
925
926    #[tokio::test]
927    async fn test_distinctness_constraint() {
928        let values = vec![Some("A"), Some("B"), Some("C"), Some("A")];
929        let ctx = create_test_context(values).await;
930
931        let constraint =
932            UniquenessConstraint::distinctness(vec!["test_col"], Assertion::Equals(0.75)).unwrap();
933
934        let result = constraint.evaluate(&ctx).await.unwrap();
935        assert_eq!(result.status, ConstraintStatus::Success);
936        assert_eq!(result.metric, Some(0.75)); // 3 distinct out of 4 total
937    }
938
939    #[tokio::test]
940    async fn test_unique_value_ratio_constraint() {
941        let values = vec![Some("A"), Some("B"), Some("C"), Some("A")];
942        let ctx = create_test_context(values).await;
943
944        let constraint =
945            UniquenessConstraint::unique_value_ratio(vec!["test_col"], Assertion::Equals(0.5))
946                .unwrap();
947
948        let result = constraint.evaluate(&ctx).await.unwrap();
949        assert_eq!(result.status, ConstraintStatus::Success);
950        assert_eq!(result.metric, Some(0.5)); // 2 values appear exactly once out of 4 total
951    }
952
953    #[tokio::test]
954    async fn test_primary_key_success() {
955        let values = vec![Some("A"), Some("B"), Some("C")];
956        let ctx = create_test_context(values).await;
957
958        let constraint = UniquenessConstraint::primary_key(vec!["test_col"]).unwrap();
959
960        let result = constraint.evaluate(&ctx).await.unwrap();
961        assert_eq!(result.status, ConstraintStatus::Success);
962        assert_eq!(result.metric, Some(1.0));
963    }
964
965    #[tokio::test]
966    async fn test_primary_key_with_nulls() {
967        let values = vec![Some("A"), Some("B"), None];
968        let ctx = create_test_context(values).await;
969
970        let constraint = UniquenessConstraint::primary_key(vec!["test_col"]).unwrap();
971
972        let result = constraint.evaluate(&ctx).await.unwrap();
973        assert_eq!(result.status, ConstraintStatus::Failure);
974        assert!(result.message.unwrap().contains("NULL values"));
975    }
976
977    #[tokio::test]
978    async fn test_primary_key_with_duplicates() {
979        let values = vec![Some("A"), Some("B"), Some("A")];
980        let ctx = create_test_context(values).await;
981
982        let constraint = UniquenessConstraint::primary_key(vec!["test_col"]).unwrap();
983
984        let result = constraint.evaluate(&ctx).await.unwrap();
985        assert_eq!(result.status, ConstraintStatus::Failure);
986        assert!(result.message.unwrap().contains("duplicate values"));
987    }
988
989    #[tokio::test]
990    async fn test_multi_column_uniqueness() {
991        let col1_values = vec![Some("A"), Some("B"), Some("A")];
992        let col2_values = vec![Some("1"), Some("2"), Some("2")];
993        let ctx = create_multi_column_test_context(col1_values, col2_values).await;
994
995        let constraint =
996            UniquenessConstraint::full_uniqueness_multi(vec!["col1", "col2"], 0.9).unwrap();
997
998        let result = constraint.evaluate(&ctx).await.unwrap();
999        assert_eq!(result.status, ConstraintStatus::Success);
1000        assert_eq!(result.metric, Some(1.0)); // All combinations are unique
1001    }
1002
1003    #[tokio::test]
1004    async fn test_multi_column_distinctness() {
1005        let col1_values = vec![Some("A"), Some("B"), Some("A")];
1006        let col2_values = vec![Some("1"), Some("2"), Some("1")];
1007        let ctx = create_multi_column_test_context(col1_values, col2_values).await;
1008
1009        let constraint =
1010            UniquenessConstraint::distinctness(vec!["col1", "col2"], Assertion::GreaterThan(0.5))
1011                .unwrap();
1012
1013        let result = constraint.evaluate(&ctx).await.unwrap();
1014        assert_eq!(result.status, ConstraintStatus::Success);
1015        // Two distinct combinations: A|1 and B|2, plus A|1 repeated = 2/3 = 0.67
1016        assert!((result.metric.unwrap() - 2.0 / 3.0).abs() < 0.01);
1017    }
1018
1019    #[tokio::test]
1020    async fn test_unique_with_nulls_include() {
1021        let values = vec![Some("A"), Some("B"), None, None];
1022        let ctx = create_test_context(values).await;
1023
1024        let constraint =
1025            UniquenessConstraint::unique_with_nulls(vec!["test_col"], 0.4, NullHandling::Include)
1026                .unwrap();
1027
1028        let result = constraint.evaluate(&ctx).await.unwrap();
1029        assert_eq!(result.status, ConstraintStatus::Success);
1030        assert_eq!(result.metric, Some(0.75)); // A, B, NULL (treated as one value) = 3/4
1031    }
1032
1033    #[tokio::test]
1034    async fn test_empty_data() {
1035        let values: Vec<Option<&str>> = vec![];
1036        let ctx = create_test_context(values).await;
1037
1038        let constraint = UniquenessConstraint::full_uniqueness("test_col", 1.0).unwrap();
1039
1040        let result = constraint.evaluate(&ctx).await.unwrap();
1041        assert_eq!(result.status, ConstraintStatus::Skipped);
1042    }
1043
1044    #[tokio::test]
1045    async fn test_invalid_threshold() {
1046        let result = UniquenessConstraint::full_uniqueness("col", 1.5);
1047        assert!(result.is_err());
1048        assert!(result
1049            .unwrap_err()
1050            .to_string()
1051            .contains("Threshold must be between 0.0 and 1.0"));
1052    }
1053
1054    #[tokio::test]
1055    async fn test_empty_columns() {
1056        let columns: Vec<String> = vec![];
1057        let result = UniquenessConstraint::new(
1058            columns,
1059            UniquenessType::FullUniqueness { threshold: 1.0 },
1060            UniquenessOptions::default(),
1061        );
1062        assert!(result.is_err());
1063        assert!(result
1064            .unwrap_err()
1065            .to_string()
1066            .contains("At least one column must be specified"));
1067    }
1068
1069    #[tokio::test]
1070    async fn test_constraint_metadata() {
1071        let constraint = UniquenessConstraint::full_uniqueness("test_col", 0.95).unwrap();
1072        let metadata = constraint.metadata();
1073
1074        assert!(metadata
1075            .description
1076            .unwrap_or_default()
1077            .contains("Unified uniqueness constraint"));
1078        assert_eq!(constraint.name(), "full_uniqueness");
1079        assert_eq!(constraint.column(), Some("test_col"));
1080    }
1081
1082    #[tokio::test]
1083    async fn test_multi_column_metadata() {
1084        let constraint =
1085            UniquenessConstraint::full_uniqueness_multi(vec!["col1", "col2"], 0.9).unwrap();
1086
1087        assert_eq!(constraint.column(), None); // Multi-column has no single column
1088        assert_eq!(constraint.columns(), &["col1", "col2"]);
1089    }
1090}