1use crate::constraints::Assertion;
7use crate::core::{current_validation_context, Constraint, ConstraintMetadata, ConstraintResult};
8use crate::prelude::*;
9use crate::security::SqlSecurity;
10use arrow::array::Array;
11use async_trait::async_trait;
12use datafusion::prelude::*;
13use std::fmt;
14use tracing::instrument;
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
19pub enum NullHandling {
20 #[default]
23 Exclude,
24
25 Include,
28
29 Distinct,
32}
33
34impl fmt::Display for NullHandling {
35 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36 match self {
37 NullHandling::Exclude => write!(f, "exclude"),
38 NullHandling::Include => write!(f, "include"),
39 NullHandling::Distinct => write!(f, "distinct"),
40 }
41 }
42}
43
44#[derive(Debug, Clone, PartialEq)]
49pub enum UniquenessType {
50 FullUniqueness { threshold: f64 },
55
56 Distinctness(Assertion),
61
62 UniqueValueRatio(Assertion),
67
68 PrimaryKey,
73
74 UniqueWithNulls {
78 threshold: f64,
79 null_handling: NullHandling,
80 },
81
82 UniqueComposite {
87 threshold: f64,
88 null_handling: NullHandling,
89 case_sensitive: bool,
90 },
91}
92
93impl UniquenessType {
94 pub fn name(&self) -> &str {
96 match self {
97 UniquenessType::FullUniqueness { .. } => "full_uniqueness",
98 UniquenessType::Distinctness(_) => "distinctness",
99 UniquenessType::UniqueValueRatio(_) => "unique_value_ratio",
100 UniquenessType::PrimaryKey => "primary_key",
101 UniquenessType::UniqueWithNulls { .. } => "unique_with_nulls",
102 UniquenessType::UniqueComposite { .. } => "unique_composite",
103 }
104 }
105
106 pub fn description(&self) -> String {
108 match self {
109 UniquenessType::FullUniqueness { threshold } => {
110 let threshold_pct = threshold * 100.0;
111 format!("validates that at least {threshold_pct:.1}% of values are unique")
112 }
113 UniquenessType::Distinctness(assertion) => {
114 format!(
115 "validates that distinct value ratio {}",
116 assertion.description()
117 )
118 }
119 UniquenessType::UniqueValueRatio(assertion) => {
120 format!(
121 "validates that unique value ratio {}",
122 assertion.description()
123 )
124 }
125 UniquenessType::PrimaryKey => {
126 "validates that values form a valid primary key (unique + non-null)".to_string()
127 }
128 UniquenessType::UniqueWithNulls {
129 threshold,
130 null_handling,
131 } => {
132 let threshold_pct = threshold * 100.0;
133 format!(
134 "validates that at least {threshold_pct:.1}% of values are unique (nulls: {null_handling})"
135 )
136 }
137 UniquenessType::UniqueComposite {
138 threshold,
139 null_handling,
140 case_sensitive,
141 } => {
142 let threshold_pct = threshold * 100.0;
143 format!(
144 "validates composite uniqueness at {threshold_pct:.1}% threshold (nulls: {null_handling}, case-sensitive: {case_sensitive})"
145 )
146 }
147 }
148 }
149}
150
151#[derive(Debug, Clone, PartialEq)]
153pub struct UniquenessOptions {
154 pub null_handling: NullHandling,
156
157 pub case_sensitive: bool,
159
160 pub trim_whitespace: bool,
162}
163
164impl Default for UniquenessOptions {
165 fn default() -> Self {
166 Self {
167 null_handling: NullHandling::default(),
168 case_sensitive: true,
169 trim_whitespace: false,
170 }
171 }
172}
173
174impl UniquenessOptions {
175 pub fn new() -> Self {
177 Self::default()
178 }
179
180 pub fn with_null_handling(mut self, null_handling: NullHandling) -> Self {
182 self.null_handling = null_handling;
183 self
184 }
185
186 pub fn case_sensitive(mut self, case_sensitive: bool) -> Self {
188 self.case_sensitive = case_sensitive;
189 self
190 }
191
192 pub fn trim_whitespace(mut self, trim_whitespace: bool) -> Self {
194 self.trim_whitespace = trim_whitespace;
195 self
196 }
197}
198
199#[derive(Debug, Clone)]
248pub struct UniquenessConstraint {
249 columns: Vec<String>,
250 uniqueness_type: UniquenessType,
251 options: UniquenessOptions,
252}
253
254impl UniquenessConstraint {
255 pub fn new<I, S>(
267 columns: I,
268 uniqueness_type: UniquenessType,
269 options: UniquenessOptions,
270 ) -> Result<Self>
271 where
272 I: IntoIterator<Item = S>,
273 S: Into<String>,
274 {
275 let column_vec: Vec<String> = columns.into_iter().map(Into::into).collect();
276
277 if column_vec.is_empty() {
278 return Err(TermError::validation_failed(
279 "unified_uniqueness",
280 "At least one column must be specified",
281 ));
282 }
283
284 for column in &column_vec {
286 SqlSecurity::validate_identifier(column)?;
287 }
288
289 match &uniqueness_type {
291 UniquenessType::FullUniqueness { threshold }
292 | UniquenessType::UniqueWithNulls { threshold, .. }
293 | UniquenessType::UniqueComposite { threshold, .. } => {
294 if !((0.0..=1.0).contains(threshold)) {
295 return Err(TermError::validation_failed(
296 "unified_uniqueness",
297 "Threshold must be between 0.0 and 1.0",
298 ));
299 }
300 }
301 _ => {} }
303
304 Ok(Self {
305 columns: column_vec,
306 uniqueness_type,
307 options,
308 })
309 }
310
311 pub fn full_uniqueness(column: impl Into<String>, threshold: f64) -> Result<Self> {
315 Self::new(
316 vec![column.into()],
317 UniquenessType::FullUniqueness { threshold },
318 UniquenessOptions::default(),
319 )
320 }
321
322 pub fn full_uniqueness_multi<I, S>(columns: I, threshold: f64) -> Result<Self>
326 where
327 I: IntoIterator<Item = S>,
328 S: Into<String>,
329 {
330 Self::new(
331 columns,
332 UniquenessType::FullUniqueness { threshold },
333 UniquenessOptions::default(),
334 )
335 }
336
337 pub fn distinctness<I, S>(columns: I, assertion: Assertion) -> Result<Self>
341 where
342 I: IntoIterator<Item = S>,
343 S: Into<String>,
344 {
345 Self::new(
346 columns,
347 UniquenessType::Distinctness(assertion),
348 UniquenessOptions::default(),
349 )
350 }
351
352 pub fn unique_value_ratio<I, S>(columns: I, assertion: Assertion) -> Result<Self>
356 where
357 I: IntoIterator<Item = S>,
358 S: Into<String>,
359 {
360 Self::new(
361 columns,
362 UniquenessType::UniqueValueRatio(assertion),
363 UniquenessOptions::default(),
364 )
365 }
366
367 pub fn primary_key<I, S>(columns: I) -> Result<Self>
371 where
372 I: IntoIterator<Item = S>,
373 S: Into<String>,
374 {
375 Self::new(
376 columns,
377 UniquenessType::PrimaryKey,
378 UniquenessOptions::default(),
379 )
380 }
381
382 pub fn unique_with_nulls<I, S>(
384 columns: I,
385 threshold: f64,
386 null_handling: NullHandling,
387 ) -> Result<Self>
388 where
389 I: IntoIterator<Item = S>,
390 S: Into<String>,
391 {
392 Self::new(
393 columns,
394 UniquenessType::UniqueWithNulls {
395 threshold,
396 null_handling,
397 },
398 UniquenessOptions::default(),
399 )
400 }
401
402 pub fn unique_composite<I, S>(
404 columns: I,
405 threshold: f64,
406 null_handling: NullHandling,
407 case_sensitive: bool,
408 ) -> Result<Self>
409 where
410 I: IntoIterator<Item = S>,
411 S: Into<String>,
412 {
413 Self::new(
414 columns,
415 UniquenessType::UniqueComposite {
416 threshold,
417 null_handling,
418 case_sensitive,
419 },
420 UniquenessOptions::new()
421 .with_null_handling(null_handling)
422 .case_sensitive(case_sensitive),
423 )
424 }
425
426 pub fn columns(&self) -> &[String] {
428 &self.columns
429 }
430
431 pub fn uniqueness_type(&self) -> &UniquenessType {
433 &self.uniqueness_type
434 }
435
436 pub fn options(&self) -> &UniquenessOptions {
438 &self.options
439 }
440}
441
442#[async_trait]
443impl Constraint for UniquenessConstraint {
444 #[instrument(skip(self, ctx), fields(
445 columns = ?self.columns,
446 uniqueness_type = %self.uniqueness_type.name(),
447 null_handling = %self.options.null_handling
448 ))]
449 async fn evaluate(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
450 let validation_ctx = current_validation_context();
452 let table_name = validation_ctx.table_name();
453
454 let sql = self.generate_sql(table_name)?;
456
457 let df = ctx.sql(&sql).await?;
458 let batches = df.collect().await?;
459
460 if batches.is_empty() {
461 return Ok(ConstraintResult::skipped("No data to validate"));
462 }
463
464 let batch = &batches[0];
465 if batch.num_rows() == 0 {
466 return Ok(ConstraintResult::skipped("No data to validate"));
467 }
468
469 match &self.uniqueness_type {
471 UniquenessType::FullUniqueness { threshold }
472 | UniquenessType::UniqueWithNulls { threshold, .. }
473 | UniquenessType::UniqueComposite { threshold, .. } => {
474 self.evaluate_threshold_based(batch, *threshold).await
475 }
476 UniquenessType::Distinctness(assertion)
477 | UniquenessType::UniqueValueRatio(assertion) => {
478 self.evaluate_assertion_based(batch, assertion).await
479 }
480 UniquenessType::PrimaryKey => self.evaluate_primary_key(batch).await,
481 }
482 }
483
484 fn name(&self) -> &str {
485 self.uniqueness_type.name()
486 }
487
488 fn column(&self) -> Option<&str> {
489 if self.columns.len() == 1 {
490 Some(&self.columns[0])
491 } else {
492 None
493 }
494 }
495
496 fn metadata(&self) -> ConstraintMetadata {
497 let mut metadata = if self.columns.len() == 1 {
498 ConstraintMetadata::for_column(&self.columns[0])
499 } else {
500 ConstraintMetadata::for_columns(&self.columns)
501 };
502
503 metadata = metadata
504 .with_description(format!(
505 "Unified uniqueness constraint that {}",
506 self.uniqueness_type.description()
507 ))
508 .with_custom("uniqueness_type", self.uniqueness_type.name())
509 .with_custom("null_handling", self.options.null_handling.to_string())
510 .with_custom("case_sensitive", self.options.case_sensitive.to_string())
511 .with_custom("constraint_type", "uniqueness");
512
513 match &self.uniqueness_type {
515 UniquenessType::FullUniqueness { threshold }
516 | UniquenessType::UniqueWithNulls { threshold, .. }
517 | UniquenessType::UniqueComposite { threshold, .. } => {
518 metadata = metadata.with_custom("threshold", threshold.to_string());
519 }
520 UniquenessType::Distinctness(assertion)
521 | UniquenessType::UniqueValueRatio(assertion) => {
522 metadata = metadata.with_custom("assertion", assertion.to_string());
523 }
524 UniquenessType::PrimaryKey => {
525 metadata = metadata.with_custom("strict", "true");
526 }
527 }
528
529 metadata
530 }
531}
532
533impl UniquenessConstraint {
534 fn generate_sql(&self, table_name: &str) -> Result<String> {
536 match &self.uniqueness_type {
537 UniquenessType::FullUniqueness { .. }
538 | UniquenessType::UniqueWithNulls { .. }
539 | UniquenessType::UniqueComposite { .. } => {
540 self.generate_full_uniqueness_sql(table_name)
541 }
542 UniquenessType::Distinctness(_) => self.generate_distinctness_sql(table_name),
543 UniquenessType::UniqueValueRatio(_) => self.generate_unique_value_ratio_sql(table_name),
544 UniquenessType::PrimaryKey => self.generate_primary_key_sql(table_name),
545 }
546 }
547
548 fn generate_full_uniqueness_sql(&self, table_name: &str) -> Result<String> {
550 let escaped_columns: Result<Vec<String>> = self
551 .columns
552 .iter()
553 .map(|col| SqlSecurity::escape_identifier(col))
554 .collect();
555 let escaped_columns = escaped_columns?;
556
557 let columns_expr = if self.columns.len() == 1 {
558 escaped_columns[0].clone()
559 } else {
560 let cols = escaped_columns.join(", ");
561 format!("({cols})")
562 };
563
564 let sql = match &self.uniqueness_type {
565 UniquenessType::UniqueWithNulls {
566 null_handling: NullHandling::Include,
567 ..
568 } => {
569 if self.columns.len() == 1 {
571 let col = &escaped_columns[0];
572 format!(
573 "SELECT
574 COUNT(*) as total_count,
575 COUNT(DISTINCT COALESCE({col}, '<NULL>')) as unique_count
576 FROM {table_name}"
577 )
578 } else {
579 format!(
580 "SELECT
581 COUNT(*) as total_count,
582 COUNT(DISTINCT {columns_expr}) as unique_count
583 FROM {table_name}"
584 )
585 }
586 }
587 UniquenessType::UniqueWithNulls {
588 null_handling: NullHandling::Distinct,
589 ..
590 } => {
591 if self.columns.len() == 1 {
593 let col = &escaped_columns[0];
594 format!(
595 "SELECT
596 COUNT(*) as total_count,
597 COUNT(DISTINCT {col}) + CASE WHEN COUNT(*) - COUNT({col}) > 0 THEN COUNT(*) - COUNT({col}) ELSE 0 END as unique_count
598 FROM {table_name}"
599 )
600 } else {
601 format!(
603 "SELECT
604 COUNT(*) as total_count,
605 COUNT(DISTINCT {columns_expr}) as unique_count
606 FROM {table_name}"
607 )
608 }
609 }
610 _ => {
611 format!(
613 "SELECT
614 COUNT(*) as total_count,
615 COUNT(DISTINCT {columns_expr}) as unique_count
616 FROM {table_name}"
617 )
618 }
619 };
620
621 Ok(sql)
622 }
623
624 fn generate_distinctness_sql(&self, table_name: &str) -> Result<String> {
626 let escaped_columns: Result<Vec<String>> = self
627 .columns
628 .iter()
629 .map(|col| SqlSecurity::escape_identifier(col))
630 .collect();
631 let escaped_columns = escaped_columns?;
632
633 let sql = if self.columns.len() == 1 {
634 let col = &escaped_columns[0];
635 format!(
636 "SELECT
637 COUNT(DISTINCT {col}) as distinct_count,
638 COUNT(*) as total_count
639 FROM {table_name}"
640 )
641 } else {
642 let concat_expr = escaped_columns
644 .iter()
645 .map(|col| format!("COALESCE(CAST({col} AS VARCHAR), '<NULL>')"))
646 .collect::<Vec<_>>()
647 .join(" || '|' || ");
648
649 format!(
650 "SELECT
651 COUNT(DISTINCT ({concat_expr})) as distinct_count,
652 COUNT(*) as total_count
653 FROM {table_name}"
654 )
655 };
656
657 Ok(sql)
658 }
659
660 fn generate_unique_value_ratio_sql(&self, table_name: &str) -> Result<String> {
662 let escaped_columns: Result<Vec<String>> = self
663 .columns
664 .iter()
665 .map(|col| SqlSecurity::escape_identifier(col))
666 .collect();
667 let escaped_columns = escaped_columns?;
668
669 let columns_list = escaped_columns.join(", ");
670
671 let sql = format!(
672 "WITH value_counts AS (
673 SELECT {columns_list}, COUNT(*) as cnt
674 FROM {table_name}
675 GROUP BY {columns_list}
676 )
677 SELECT
678 COALESCE(SUM(CASE WHEN cnt = 1 THEN cnt ELSE 0 END), 0) as unique_count,
679 COALESCE(SUM(cnt), 0) as total_count
680 FROM value_counts"
681 );
682
683 Ok(sql)
684 }
685
686 fn generate_primary_key_sql(&self, table_name: &str) -> Result<String> {
688 let escaped_columns: Result<Vec<String>> = self
689 .columns
690 .iter()
691 .map(|col| SqlSecurity::escape_identifier(col))
692 .collect();
693 let escaped_columns = escaped_columns?;
694
695 let columns_expr = if self.columns.len() == 1 {
696 escaped_columns[0].clone()
697 } else {
698 let cols = escaped_columns.join(", ");
699 format!("({cols})")
700 };
701
702 let null_check = escaped_columns
704 .iter()
705 .map(|col| format!("{col} IS NOT NULL"))
706 .collect::<Vec<_>>()
707 .join(" AND ");
708
709 let sql = format!(
710 "SELECT
711 COUNT(*) as total_count,
712 COUNT(DISTINCT {columns_expr}) as unique_count,
713 COUNT(*) - COUNT(CASE WHEN {null_check} THEN 1 END) as null_count
714 FROM {table_name}"
715 );
716
717 Ok(sql)
718 }
719
720 async fn evaluate_threshold_based(
722 &self,
723 batch: &arrow::record_batch::RecordBatch,
724 threshold: f64,
725 ) -> Result<ConstraintResult> {
726 let total_count = batch
727 .column(0)
728 .as_any()
729 .downcast_ref::<arrow::array::Int64Array>()
730 .ok_or_else(|| TermError::Internal("Failed to extract total count".to_string()))?
731 .value(0) as f64;
732
733 let unique_count = batch
734 .column(1)
735 .as_any()
736 .downcast_ref::<arrow::array::Int64Array>()
737 .ok_or_else(|| TermError::Internal("Failed to extract unique count".to_string()))?
738 .value(0) as f64;
739
740 if total_count == 0.0 {
741 return Ok(ConstraintResult::skipped("No data to validate"));
742 }
743
744 let uniqueness_ratio = unique_count / total_count;
745
746 if uniqueness_ratio >= threshold {
747 Ok(ConstraintResult::success_with_metric(uniqueness_ratio))
748 } else {
749 Ok(ConstraintResult::failure_with_metric(
750 uniqueness_ratio,
751 format!(
752 "Uniqueness ratio {uniqueness_ratio:.3} is below threshold {threshold:.3} for columns: {}",
753 self.columns.join(", ")
754 ),
755 ))
756 }
757 }
758
759 async fn evaluate_assertion_based(
761 &self,
762 batch: &arrow::record_batch::RecordBatch,
763 assertion: &Assertion,
764 ) -> Result<ConstraintResult> {
765 let count = batch
766 .column(0)
767 .as_any()
768 .downcast_ref::<arrow::array::Int64Array>()
769 .ok_or_else(|| TermError::Internal("Failed to extract count".to_string()))?
770 .value(0) as f64;
771
772 let total_count = batch
773 .column(1)
774 .as_any()
775 .downcast_ref::<arrow::array::Int64Array>()
776 .ok_or_else(|| TermError::Internal("Failed to extract total count".to_string()))?
777 .value(0) as f64;
778
779 if total_count == 0.0 {
780 return Ok(ConstraintResult::skipped("No data to validate"));
781 }
782
783 let ratio = count / total_count;
784
785 if assertion.evaluate(ratio) {
786 Ok(ConstraintResult::success_with_metric(ratio))
787 } else {
788 Ok(ConstraintResult::failure_with_metric(
789 ratio,
790 format!(
791 "{} ratio {ratio:.3} does not satisfy {} for columns: {}",
792 self.uniqueness_type.name(),
793 assertion.description(),
794 self.columns.join(", ")
795 ),
796 ))
797 }
798 }
799
800 async fn evaluate_primary_key(
802 &self,
803 batch: &arrow::record_batch::RecordBatch,
804 ) -> Result<ConstraintResult> {
805 let total_count = batch
806 .column(0)
807 .as_any()
808 .downcast_ref::<arrow::array::Int64Array>()
809 .ok_or_else(|| TermError::Internal("Failed to extract total count".to_string()))?
810 .value(0) as f64;
811
812 let unique_count = batch
813 .column(1)
814 .as_any()
815 .downcast_ref::<arrow::array::Int64Array>()
816 .ok_or_else(|| TermError::Internal("Failed to extract unique count".to_string()))?
817 .value(0) as f64;
818
819 let null_count = batch
820 .column(2)
821 .as_any()
822 .downcast_ref::<arrow::array::Int64Array>()
823 .ok_or_else(|| TermError::Internal("Failed to extract null count".to_string()))?
824 .value(0) as f64;
825
826 if total_count == 0.0 {
827 return Ok(ConstraintResult::skipped("No data to validate"));
828 }
829
830 if null_count > 0.0 {
832 Ok(ConstraintResult::failure_with_metric(
833 null_count / total_count,
834 format!(
835 "Primary key columns contain {null_count} NULL values: {}",
836 self.columns.join(", ")
837 ),
838 ))
839 } else if unique_count != total_count {
840 let duplicate_ratio = (total_count - unique_count) / total_count;
841 Ok(ConstraintResult::failure_with_metric(
842 duplicate_ratio,
843 format!(
844 "Primary key columns contain {} duplicate values: {}",
845 total_count - unique_count,
846 self.columns.join(", ")
847 ),
848 ))
849 } else {
850 Ok(ConstraintResult::success_with_metric(1.0))
851 }
852 }
853}
854
855#[cfg(test)]
856mod tests {
857 use super::*;
858 use crate::constraints::Assertion;
859 use crate::core::ConstraintStatus;
860 use arrow::array::StringArray;
861 use arrow::datatypes::{DataType, Field, Schema};
862 use arrow::record_batch::RecordBatch;
863 use datafusion::datasource::MemTable;
864 use std::sync::Arc;
865
866 use crate::test_helpers::evaluate_constraint_with_context;
867 async fn create_test_context(values: Vec<Option<&str>>) -> SessionContext {
868 let ctx = SessionContext::new();
869
870 let schema = Arc::new(Schema::new(vec![Field::new(
871 "test_col",
872 DataType::Utf8,
873 true,
874 )]));
875
876 let array = StringArray::from(values);
877 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
878
879 let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
880 ctx.register_table("data", Arc::new(provider)).unwrap();
881
882 ctx
883 }
884
885 async fn create_multi_column_test_context(
886 col1_values: Vec<Option<&str>>,
887 col2_values: Vec<Option<&str>>,
888 ) -> SessionContext {
889 let ctx = SessionContext::new();
890
891 let schema = Arc::new(Schema::new(vec![
892 Field::new("col1", DataType::Utf8, true),
893 Field::new("col2", DataType::Utf8, true),
894 ]));
895
896 let array1 = StringArray::from(col1_values);
897 let array2 = StringArray::from(col2_values);
898 let batch =
899 RecordBatch::try_new(schema.clone(), vec![Arc::new(array1), Arc::new(array2)]).unwrap();
900
901 let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
902 ctx.register_table("data", Arc::new(provider)).unwrap();
903
904 ctx
905 }
906
907 #[tokio::test]
908 async fn test_full_uniqueness_single_column() {
909 let values = vec![Some("A"), Some("B"), Some("C"), Some("A")];
910 let ctx = create_test_context(values).await;
911
912 let constraint = UniquenessConstraint::full_uniqueness("test_col", 0.7).unwrap();
913
914 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
915 .await
916 .unwrap();
917 assert_eq!(result.status, ConstraintStatus::Success);
918 assert_eq!(result.metric, Some(0.75)); }
920
921 #[tokio::test]
922 async fn test_full_uniqueness_with_nulls() {
923 let values = vec![Some("A"), Some("B"), None, Some("A")];
924 let ctx = create_test_context(values).await;
925
926 let constraint = UniquenessConstraint::full_uniqueness("test_col", 0.4).unwrap();
928
929 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
930 .await
931 .unwrap();
932 assert_eq!(result.status, ConstraintStatus::Success);
933 assert_eq!(result.metric, Some(0.5)); }
935
936 #[tokio::test]
937 async fn test_distinctness_constraint() {
938 let values = vec![Some("A"), Some("B"), Some("C"), Some("A")];
939 let ctx = create_test_context(values).await;
940
941 let constraint =
942 UniquenessConstraint::distinctness(vec!["test_col"], Assertion::Equals(0.75)).unwrap();
943
944 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
945 .await
946 .unwrap();
947 assert_eq!(result.status, ConstraintStatus::Success);
948 assert_eq!(result.metric, Some(0.75)); }
950
951 #[tokio::test]
952 async fn test_unique_value_ratio_constraint() {
953 let values = vec![Some("A"), Some("B"), Some("C"), Some("A")];
954 let ctx = create_test_context(values).await;
955
956 let constraint =
957 UniquenessConstraint::unique_value_ratio(vec!["test_col"], Assertion::Equals(0.5))
958 .unwrap();
959
960 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
961 .await
962 .unwrap();
963 assert_eq!(result.status, ConstraintStatus::Success);
964 assert_eq!(result.metric, Some(0.5)); }
966
967 #[tokio::test]
968 async fn test_primary_key_success() {
969 let values = vec![Some("A"), Some("B"), Some("C")];
970 let ctx = create_test_context(values).await;
971
972 let constraint = UniquenessConstraint::primary_key(vec!["test_col"]).unwrap();
973
974 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
975 .await
976 .unwrap();
977 assert_eq!(result.status, ConstraintStatus::Success);
978 assert_eq!(result.metric, Some(1.0));
979 }
980
981 #[tokio::test]
982 async fn test_primary_key_with_nulls() {
983 let values = vec![Some("A"), Some("B"), None];
984 let ctx = create_test_context(values).await;
985
986 let constraint = UniquenessConstraint::primary_key(vec!["test_col"]).unwrap();
987
988 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
989 .await
990 .unwrap();
991 assert_eq!(result.status, ConstraintStatus::Failure);
992 assert!(result.message.unwrap().contains("NULL values"));
993 }
994
995 #[tokio::test]
996 async fn test_primary_key_with_duplicates() {
997 let values = vec![Some("A"), Some("B"), Some("A")];
998 let ctx = create_test_context(values).await;
999
1000 let constraint = UniquenessConstraint::primary_key(vec!["test_col"]).unwrap();
1001
1002 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
1003 .await
1004 .unwrap();
1005 assert_eq!(result.status, ConstraintStatus::Failure);
1006 assert!(result.message.unwrap().contains("duplicate values"));
1007 }
1008
1009 #[tokio::test]
1010 async fn test_multi_column_uniqueness() {
1011 let col1_values = vec![Some("A"), Some("B"), Some("A")];
1012 let col2_values = vec![Some("1"), Some("2"), Some("2")];
1013 let ctx = create_multi_column_test_context(col1_values, col2_values).await;
1014
1015 let constraint =
1016 UniquenessConstraint::full_uniqueness_multi(vec!["col1", "col2"], 0.9).unwrap();
1017
1018 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
1019 .await
1020 .unwrap();
1021 assert_eq!(result.status, ConstraintStatus::Success);
1022 assert_eq!(result.metric, Some(1.0)); }
1024
1025 #[tokio::test]
1026 async fn test_multi_column_distinctness() {
1027 let col1_values = vec![Some("A"), Some("B"), Some("A")];
1028 let col2_values = vec![Some("1"), Some("2"), Some("1")];
1029 let ctx = create_multi_column_test_context(col1_values, col2_values).await;
1030
1031 let constraint =
1032 UniquenessConstraint::distinctness(vec!["col1", "col2"], Assertion::GreaterThan(0.5))
1033 .unwrap();
1034
1035 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
1036 .await
1037 .unwrap();
1038 assert_eq!(result.status, ConstraintStatus::Success);
1039 assert!((result.metric.unwrap() - 2.0 / 3.0).abs() < 0.01);
1041 }
1042
1043 #[tokio::test]
1044 async fn test_unique_with_nulls_include() {
1045 let values = vec![Some("A"), Some("B"), None, None];
1046 let ctx = create_test_context(values).await;
1047
1048 let constraint =
1049 UniquenessConstraint::unique_with_nulls(vec!["test_col"], 0.4, NullHandling::Include)
1050 .unwrap();
1051
1052 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
1053 .await
1054 .unwrap();
1055 assert_eq!(result.status, ConstraintStatus::Success);
1056 assert_eq!(result.metric, Some(0.75)); }
1058
1059 #[tokio::test]
1060 async fn test_empty_data() {
1061 let values: Vec<Option<&str>> = vec![];
1062 let ctx = create_test_context(values).await;
1063
1064 let constraint = UniquenessConstraint::full_uniqueness("test_col", 1.0).unwrap();
1065
1066 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
1067 .await
1068 .unwrap();
1069 assert_eq!(result.status, ConstraintStatus::Skipped);
1070 }
1071
1072 #[tokio::test]
1073 async fn test_invalid_threshold() {
1074 let result = UniquenessConstraint::full_uniqueness("col", 1.5);
1075 assert!(result.is_err());
1076 assert!(result
1077 .unwrap_err()
1078 .to_string()
1079 .contains("Threshold must be between 0.0 and 1.0"));
1080 }
1081
1082 #[tokio::test]
1083 async fn test_empty_columns() {
1084 let columns: Vec<String> = vec![];
1085 let result = UniquenessConstraint::new(
1086 columns,
1087 UniquenessType::FullUniqueness { threshold: 1.0 },
1088 UniquenessOptions::default(),
1089 );
1090 assert!(result.is_err());
1091 assert!(result
1092 .unwrap_err()
1093 .to_string()
1094 .contains("At least one column must be specified"));
1095 }
1096
1097 #[tokio::test]
1098 async fn test_constraint_metadata() {
1099 let constraint = UniquenessConstraint::full_uniqueness("test_col", 0.95).unwrap();
1100 let metadata = constraint.metadata();
1101
1102 assert!(metadata
1103 .description
1104 .unwrap_or_default()
1105 .contains("Unified uniqueness constraint"));
1106 assert_eq!(constraint.name(), "full_uniqueness");
1107 assert_eq!(constraint.column(), Some("test_col"));
1108 }
1109
1110 #[tokio::test]
1111 async fn test_multi_column_metadata() {
1112 let constraint =
1113 UniquenessConstraint::full_uniqueness_multi(vec!["col1", "col2"], 0.9).unwrap();
1114
1115 assert_eq!(constraint.column(), None); assert_eq!(constraint.columns(), &["col1", "col2"]);
1117 }
1118}