1use crate::constraints::Assertion;
7use crate::core::{Constraint, ConstraintMetadata, ConstraintResult};
8use crate::prelude::*;
9use crate::security::SqlSecurity;
10use arrow::array::Array;
11use async_trait::async_trait;
12use datafusion::prelude::*;
13use std::fmt;
14use tracing::instrument;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
20pub enum NullHandling {
21 #[default]
24 Exclude,
25
26 Include,
29
30 Distinct,
33}
34
35impl fmt::Display for NullHandling {
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 match self {
38 NullHandling::Exclude => write!(f, "exclude"),
39 NullHandling::Include => write!(f, "include"),
40 NullHandling::Distinct => write!(f, "distinct"),
41 }
42 }
43}
44
45#[derive(Debug, Clone, PartialEq)]
50pub enum UniquenessType {
51 FullUniqueness { threshold: f64 },
56
57 Distinctness(Assertion),
62
63 UniqueValueRatio(Assertion),
68
69 PrimaryKey,
74
75 UniqueWithNulls {
79 threshold: f64,
80 null_handling: NullHandling,
81 },
82
83 UniqueComposite {
88 threshold: f64,
89 null_handling: NullHandling,
90 case_sensitive: bool,
91 },
92}
93
94impl UniquenessType {
95 pub fn name(&self) -> &str {
97 match self {
98 UniquenessType::FullUniqueness { .. } => "full_uniqueness",
99 UniquenessType::Distinctness(_) => "distinctness",
100 UniquenessType::UniqueValueRatio(_) => "unique_value_ratio",
101 UniquenessType::PrimaryKey => "primary_key",
102 UniquenessType::UniqueWithNulls { .. } => "unique_with_nulls",
103 UniquenessType::UniqueComposite { .. } => "unique_composite",
104 }
105 }
106
107 pub fn description(&self) -> String {
109 match self {
110 UniquenessType::FullUniqueness { threshold } => {
111 let threshold_pct = threshold * 100.0;
112 format!("validates that at least {threshold_pct:.1}% of values are unique")
113 }
114 UniquenessType::Distinctness(assertion) => {
115 format!(
116 "validates that distinct value ratio {}",
117 assertion.description()
118 )
119 }
120 UniquenessType::UniqueValueRatio(assertion) => {
121 format!(
122 "validates that unique value ratio {}",
123 assertion.description()
124 )
125 }
126 UniquenessType::PrimaryKey => {
127 "validates that values form a valid primary key (unique + non-null)".to_string()
128 }
129 UniquenessType::UniqueWithNulls {
130 threshold,
131 null_handling,
132 } => {
133 let threshold_pct = threshold * 100.0;
134 format!(
135 "validates that at least {threshold_pct:.1}% of values are unique (nulls: {null_handling})"
136 )
137 }
138 UniquenessType::UniqueComposite {
139 threshold,
140 null_handling,
141 case_sensitive,
142 } => {
143 let threshold_pct = threshold * 100.0;
144 format!(
145 "validates composite uniqueness at {threshold_pct:.1}% threshold (nulls: {null_handling}, case-sensitive: {case_sensitive})"
146 )
147 }
148 }
149 }
150}
151
152#[derive(Debug, Clone, PartialEq)]
154pub struct UniquenessOptions {
155 pub null_handling: NullHandling,
157
158 pub case_sensitive: bool,
160
161 pub trim_whitespace: bool,
163}
164
165impl Default for UniquenessOptions {
166 fn default() -> Self {
167 Self {
168 null_handling: NullHandling::default(),
169 case_sensitive: true,
170 trim_whitespace: false,
171 }
172 }
173}
174
175impl UniquenessOptions {
176 pub fn new() -> Self {
178 Self::default()
179 }
180
181 pub fn with_null_handling(mut self, null_handling: NullHandling) -> Self {
183 self.null_handling = null_handling;
184 self
185 }
186
187 pub fn case_sensitive(mut self, case_sensitive: bool) -> Self {
189 self.case_sensitive = case_sensitive;
190 self
191 }
192
193 pub fn trim_whitespace(mut self, trim_whitespace: bool) -> Self {
195 self.trim_whitespace = trim_whitespace;
196 self
197 }
198}
199
200#[derive(Debug, Clone)]
249pub struct UniquenessConstraint {
250 columns: Vec<String>,
251 uniqueness_type: UniquenessType,
252 options: UniquenessOptions,
253}
254
255impl UniquenessConstraint {
256 pub fn new<I, S>(
268 columns: I,
269 uniqueness_type: UniquenessType,
270 options: UniquenessOptions,
271 ) -> Result<Self>
272 where
273 I: IntoIterator<Item = S>,
274 S: Into<String>,
275 {
276 let column_vec: Vec<String> = columns.into_iter().map(Into::into).collect();
277
278 if column_vec.is_empty() {
279 return Err(TermError::validation_failed(
280 "unified_uniqueness",
281 "At least one column must be specified",
282 ));
283 }
284
285 for column in &column_vec {
287 SqlSecurity::validate_identifier(column)?;
288 }
289
290 match &uniqueness_type {
292 UniquenessType::FullUniqueness { threshold }
293 | UniquenessType::UniqueWithNulls { threshold, .. }
294 | UniquenessType::UniqueComposite { threshold, .. } => {
295 if !((0.0..=1.0).contains(threshold)) {
296 return Err(TermError::validation_failed(
297 "unified_uniqueness",
298 "Threshold must be between 0.0 and 1.0",
299 ));
300 }
301 }
302 _ => {} }
304
305 Ok(Self {
306 columns: column_vec,
307 uniqueness_type,
308 options,
309 })
310 }
311
312 pub fn full_uniqueness(column: impl Into<String>, threshold: f64) -> Result<Self> {
316 Self::new(
317 vec![column.into()],
318 UniquenessType::FullUniqueness { threshold },
319 UniquenessOptions::default(),
320 )
321 }
322
323 pub fn full_uniqueness_multi<I, S>(columns: I, threshold: f64) -> Result<Self>
327 where
328 I: IntoIterator<Item = S>,
329 S: Into<String>,
330 {
331 Self::new(
332 columns,
333 UniquenessType::FullUniqueness { threshold },
334 UniquenessOptions::default(),
335 )
336 }
337
338 pub fn distinctness<I, S>(columns: I, assertion: Assertion) -> Result<Self>
342 where
343 I: IntoIterator<Item = S>,
344 S: Into<String>,
345 {
346 Self::new(
347 columns,
348 UniquenessType::Distinctness(assertion),
349 UniquenessOptions::default(),
350 )
351 }
352
353 pub fn unique_value_ratio<I, S>(columns: I, assertion: Assertion) -> Result<Self>
357 where
358 I: IntoIterator<Item = S>,
359 S: Into<String>,
360 {
361 Self::new(
362 columns,
363 UniquenessType::UniqueValueRatio(assertion),
364 UniquenessOptions::default(),
365 )
366 }
367
368 pub fn primary_key<I, S>(columns: I) -> Result<Self>
372 where
373 I: IntoIterator<Item = S>,
374 S: Into<String>,
375 {
376 Self::new(
377 columns,
378 UniquenessType::PrimaryKey,
379 UniquenessOptions::default(),
380 )
381 }
382
383 pub fn unique_with_nulls<I, S>(
385 columns: I,
386 threshold: f64,
387 null_handling: NullHandling,
388 ) -> Result<Self>
389 where
390 I: IntoIterator<Item = S>,
391 S: Into<String>,
392 {
393 Self::new(
394 columns,
395 UniquenessType::UniqueWithNulls {
396 threshold,
397 null_handling,
398 },
399 UniquenessOptions::default(),
400 )
401 }
402
403 pub fn unique_composite<I, S>(
405 columns: I,
406 threshold: f64,
407 null_handling: NullHandling,
408 case_sensitive: bool,
409 ) -> Result<Self>
410 where
411 I: IntoIterator<Item = S>,
412 S: Into<String>,
413 {
414 Self::new(
415 columns,
416 UniquenessType::UniqueComposite {
417 threshold,
418 null_handling,
419 case_sensitive,
420 },
421 UniquenessOptions::new()
422 .with_null_handling(null_handling)
423 .case_sensitive(case_sensitive),
424 )
425 }
426
427 pub fn columns(&self) -> &[String] {
429 &self.columns
430 }
431
432 pub fn uniqueness_type(&self) -> &UniquenessType {
434 &self.uniqueness_type
435 }
436
437 pub fn options(&self) -> &UniquenessOptions {
439 &self.options
440 }
441}
442
443#[async_trait]
444impl Constraint for UniquenessConstraint {
445 #[instrument(skip(self, ctx), fields(
446 columns = ?self.columns,
447 uniqueness_type = %self.uniqueness_type.name(),
448 null_handling = %self.options.null_handling
449 ))]
450 async fn evaluate(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
451 let sql = self.generate_sql()?;
453
454 let df = ctx.sql(&sql).await?;
455 let batches = df.collect().await?;
456
457 if batches.is_empty() {
458 return Ok(ConstraintResult::skipped("No data to validate"));
459 }
460
461 let batch = &batches[0];
462 if batch.num_rows() == 0 {
463 return Ok(ConstraintResult::skipped("No data to validate"));
464 }
465
466 match &self.uniqueness_type {
468 UniquenessType::FullUniqueness { threshold }
469 | UniquenessType::UniqueWithNulls { threshold, .. }
470 | UniquenessType::UniqueComposite { threshold, .. } => {
471 self.evaluate_threshold_based(batch, *threshold).await
472 }
473 UniquenessType::Distinctness(assertion)
474 | UniquenessType::UniqueValueRatio(assertion) => {
475 self.evaluate_assertion_based(batch, assertion).await
476 }
477 UniquenessType::PrimaryKey => self.evaluate_primary_key(batch).await,
478 }
479 }
480
481 fn name(&self) -> &str {
482 self.uniqueness_type.name()
483 }
484
485 fn column(&self) -> Option<&str> {
486 if self.columns.len() == 1 {
487 Some(&self.columns[0])
488 } else {
489 None
490 }
491 }
492
493 fn metadata(&self) -> ConstraintMetadata {
494 let mut metadata = if self.columns.len() == 1 {
495 ConstraintMetadata::for_column(&self.columns[0])
496 } else {
497 ConstraintMetadata::for_columns(&self.columns)
498 };
499
500 metadata = metadata
501 .with_description(format!(
502 "Unified uniqueness constraint that {}",
503 self.uniqueness_type.description()
504 ))
505 .with_custom("uniqueness_type", self.uniqueness_type.name())
506 .with_custom("null_handling", self.options.null_handling.to_string())
507 .with_custom("case_sensitive", self.options.case_sensitive.to_string())
508 .with_custom("constraint_type", "uniqueness");
509
510 match &self.uniqueness_type {
512 UniquenessType::FullUniqueness { threshold }
513 | UniquenessType::UniqueWithNulls { threshold, .. }
514 | UniquenessType::UniqueComposite { threshold, .. } => {
515 metadata = metadata.with_custom("threshold", threshold.to_string());
516 }
517 UniquenessType::Distinctness(assertion)
518 | UniquenessType::UniqueValueRatio(assertion) => {
519 metadata = metadata.with_custom("assertion", assertion.to_string());
520 }
521 UniquenessType::PrimaryKey => {
522 metadata = metadata.with_custom("strict", "true");
523 }
524 }
525
526 metadata
527 }
528}
529
530impl UniquenessConstraint {
531 fn generate_sql(&self) -> Result<String> {
533 match &self.uniqueness_type {
534 UniquenessType::FullUniqueness { .. }
535 | UniquenessType::UniqueWithNulls { .. }
536 | UniquenessType::UniqueComposite { .. } => self.generate_full_uniqueness_sql(),
537 UniquenessType::Distinctness(_) => self.generate_distinctness_sql(),
538 UniquenessType::UniqueValueRatio(_) => self.generate_unique_value_ratio_sql(),
539 UniquenessType::PrimaryKey => self.generate_primary_key_sql(),
540 }
541 }
542
543 fn generate_full_uniqueness_sql(&self) -> Result<String> {
545 let escaped_columns: Result<Vec<String>> = self
546 .columns
547 .iter()
548 .map(|col| SqlSecurity::escape_identifier(col))
549 .collect();
550 let escaped_columns = escaped_columns?;
551
552 let columns_expr = if self.columns.len() == 1 {
553 escaped_columns[0].clone()
554 } else {
555 let cols = escaped_columns.join(", ");
556 format!("({cols})")
557 };
558
559 let sql = match &self.uniqueness_type {
560 UniquenessType::UniqueWithNulls {
561 null_handling: NullHandling::Include,
562 ..
563 } => {
564 if self.columns.len() == 1 {
566 let col = &escaped_columns[0];
567 format!(
568 "SELECT
569 COUNT(*) as total_count,
570 COUNT(DISTINCT COALESCE({col}, '<NULL>')) as unique_count
571 FROM data"
572 )
573 } else {
574 format!(
575 "SELECT
576 COUNT(*) as total_count,
577 COUNT(DISTINCT {columns_expr}) as unique_count
578 FROM data"
579 )
580 }
581 }
582 UniquenessType::UniqueWithNulls {
583 null_handling: NullHandling::Distinct,
584 ..
585 } => {
586 if self.columns.len() == 1 {
588 let col = &escaped_columns[0];
589 format!(
590 "SELECT
591 COUNT(*) as total_count,
592 COUNT(DISTINCT {col}) + CASE WHEN COUNT(*) - COUNT({col}) > 0 THEN COUNT(*) - COUNT({col}) ELSE 0 END as unique_count
593 FROM data"
594 )
595 } else {
596 format!(
598 "SELECT
599 COUNT(*) as total_count,
600 COUNT(DISTINCT {columns_expr}) as unique_count
601 FROM data"
602 )
603 }
604 }
605 _ => {
606 format!(
608 "SELECT
609 COUNT(*) as total_count,
610 COUNT(DISTINCT {columns_expr}) as unique_count
611 FROM data"
612 )
613 }
614 };
615
616 Ok(sql)
617 }
618
619 fn generate_distinctness_sql(&self) -> Result<String> {
621 let escaped_columns: Result<Vec<String>> = self
622 .columns
623 .iter()
624 .map(|col| SqlSecurity::escape_identifier(col))
625 .collect();
626 let escaped_columns = escaped_columns?;
627
628 let sql = if self.columns.len() == 1 {
629 let col = &escaped_columns[0];
630 format!(
631 "SELECT
632 COUNT(DISTINCT {col}) as distinct_count,
633 COUNT(*) as total_count
634 FROM data"
635 )
636 } else {
637 let concat_expr = escaped_columns
639 .iter()
640 .map(|col| format!("COALESCE(CAST({col} AS VARCHAR), '<NULL>')"))
641 .collect::<Vec<_>>()
642 .join(" || '|' || ");
643
644 format!(
645 "SELECT
646 COUNT(DISTINCT ({concat_expr})) as distinct_count,
647 COUNT(*) as total_count
648 FROM data"
649 )
650 };
651
652 Ok(sql)
653 }
654
655 fn generate_unique_value_ratio_sql(&self) -> Result<String> {
657 let escaped_columns: Result<Vec<String>> = self
658 .columns
659 .iter()
660 .map(|col| SqlSecurity::escape_identifier(col))
661 .collect();
662 let escaped_columns = escaped_columns?;
663
664 let columns_list = escaped_columns.join(", ");
665
666 let sql = format!(
667 "WITH value_counts AS (
668 SELECT {columns_list}, COUNT(*) as cnt
669 FROM data
670 GROUP BY {columns_list}
671 )
672 SELECT
673 COALESCE(SUM(CASE WHEN cnt = 1 THEN cnt ELSE 0 END), 0) as unique_count,
674 COALESCE(SUM(cnt), 0) as total_count
675 FROM value_counts"
676 );
677
678 Ok(sql)
679 }
680
681 fn generate_primary_key_sql(&self) -> Result<String> {
683 let escaped_columns: Result<Vec<String>> = self
684 .columns
685 .iter()
686 .map(|col| SqlSecurity::escape_identifier(col))
687 .collect();
688 let escaped_columns = escaped_columns?;
689
690 let columns_expr = if self.columns.len() == 1 {
691 escaped_columns[0].clone()
692 } else {
693 let cols = escaped_columns.join(", ");
694 format!("({cols})")
695 };
696
697 let null_check = escaped_columns
699 .iter()
700 .map(|col| format!("{col} IS NOT NULL"))
701 .collect::<Vec<_>>()
702 .join(" AND ");
703
704 let sql = format!(
705 "SELECT
706 COUNT(*) as total_count,
707 COUNT(DISTINCT {columns_expr}) as unique_count,
708 COUNT(*) - COUNT(CASE WHEN {null_check} THEN 1 END) as null_count
709 FROM data"
710 );
711
712 Ok(sql)
713 }
714
715 async fn evaluate_threshold_based(
717 &self,
718 batch: &arrow::record_batch::RecordBatch,
719 threshold: f64,
720 ) -> Result<ConstraintResult> {
721 let total_count = batch
722 .column(0)
723 .as_any()
724 .downcast_ref::<arrow::array::Int64Array>()
725 .ok_or_else(|| TermError::Internal("Failed to extract total count".to_string()))?
726 .value(0) as f64;
727
728 let unique_count = batch
729 .column(1)
730 .as_any()
731 .downcast_ref::<arrow::array::Int64Array>()
732 .ok_or_else(|| TermError::Internal("Failed to extract unique count".to_string()))?
733 .value(0) as f64;
734
735 if total_count == 0.0 {
736 return Ok(ConstraintResult::skipped("No data to validate"));
737 }
738
739 let uniqueness_ratio = unique_count / total_count;
740
741 if uniqueness_ratio >= threshold {
742 Ok(ConstraintResult::success_with_metric(uniqueness_ratio))
743 } else {
744 Ok(ConstraintResult::failure_with_metric(
745 uniqueness_ratio,
746 format!(
747 "Uniqueness ratio {uniqueness_ratio:.3} is below threshold {threshold:.3} for columns: {}",
748 self.columns.join(", ")
749 ),
750 ))
751 }
752 }
753
754 async fn evaluate_assertion_based(
756 &self,
757 batch: &arrow::record_batch::RecordBatch,
758 assertion: &Assertion,
759 ) -> Result<ConstraintResult> {
760 let count = batch
761 .column(0)
762 .as_any()
763 .downcast_ref::<arrow::array::Int64Array>()
764 .ok_or_else(|| TermError::Internal("Failed to extract count".to_string()))?
765 .value(0) as f64;
766
767 let total_count = batch
768 .column(1)
769 .as_any()
770 .downcast_ref::<arrow::array::Int64Array>()
771 .ok_or_else(|| TermError::Internal("Failed to extract total count".to_string()))?
772 .value(0) as f64;
773
774 if total_count == 0.0 {
775 return Ok(ConstraintResult::skipped("No data to validate"));
776 }
777
778 let ratio = count / total_count;
779
780 if assertion.evaluate(ratio) {
781 Ok(ConstraintResult::success_with_metric(ratio))
782 } else {
783 Ok(ConstraintResult::failure_with_metric(
784 ratio,
785 format!(
786 "{} ratio {ratio:.3} does not satisfy {} for columns: {}",
787 self.uniqueness_type.name(),
788 assertion.description(),
789 self.columns.join(", ")
790 ),
791 ))
792 }
793 }
794
795 async fn evaluate_primary_key(
797 &self,
798 batch: &arrow::record_batch::RecordBatch,
799 ) -> Result<ConstraintResult> {
800 let total_count = batch
801 .column(0)
802 .as_any()
803 .downcast_ref::<arrow::array::Int64Array>()
804 .ok_or_else(|| TermError::Internal("Failed to extract total count".to_string()))?
805 .value(0) as f64;
806
807 let unique_count = batch
808 .column(1)
809 .as_any()
810 .downcast_ref::<arrow::array::Int64Array>()
811 .ok_or_else(|| TermError::Internal("Failed to extract unique count".to_string()))?
812 .value(0) as f64;
813
814 let null_count = batch
815 .column(2)
816 .as_any()
817 .downcast_ref::<arrow::array::Int64Array>()
818 .ok_or_else(|| TermError::Internal("Failed to extract null count".to_string()))?
819 .value(0) as f64;
820
821 if total_count == 0.0 {
822 return Ok(ConstraintResult::skipped("No data to validate"));
823 }
824
825 if null_count > 0.0 {
827 Ok(ConstraintResult::failure_with_metric(
828 null_count / total_count,
829 format!(
830 "Primary key columns contain {null_count} NULL values: {}",
831 self.columns.join(", ")
832 ),
833 ))
834 } else if unique_count != total_count {
835 let duplicate_ratio = (total_count - unique_count) / total_count;
836 Ok(ConstraintResult::failure_with_metric(
837 duplicate_ratio,
838 format!(
839 "Primary key columns contain {} duplicate values: {}",
840 total_count - unique_count,
841 self.columns.join(", ")
842 ),
843 ))
844 } else {
845 Ok(ConstraintResult::success_with_metric(1.0))
846 }
847 }
848}
849
850#[cfg(test)]
851mod tests {
852 use super::*;
853 use crate::constraints::Assertion;
854 use crate::core::ConstraintStatus;
855 use arrow::array::StringArray;
856 use arrow::datatypes::{DataType, Field, Schema};
857 use arrow::record_batch::RecordBatch;
858 use datafusion::datasource::MemTable;
859 use std::sync::Arc;
860
861 async fn create_test_context(values: Vec<Option<&str>>) -> SessionContext {
862 let ctx = SessionContext::new();
863
864 let schema = Arc::new(Schema::new(vec![Field::new(
865 "test_col",
866 DataType::Utf8,
867 true,
868 )]));
869
870 let array = StringArray::from(values);
871 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
872
873 let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
874 ctx.register_table("data", Arc::new(provider)).unwrap();
875
876 ctx
877 }
878
879 async fn create_multi_column_test_context(
880 col1_values: Vec<Option<&str>>,
881 col2_values: Vec<Option<&str>>,
882 ) -> SessionContext {
883 let ctx = SessionContext::new();
884
885 let schema = Arc::new(Schema::new(vec![
886 Field::new("col1", DataType::Utf8, true),
887 Field::new("col2", DataType::Utf8, true),
888 ]));
889
890 let array1 = StringArray::from(col1_values);
891 let array2 = StringArray::from(col2_values);
892 let batch =
893 RecordBatch::try_new(schema.clone(), vec![Arc::new(array1), Arc::new(array2)]).unwrap();
894
895 let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
896 ctx.register_table("data", Arc::new(provider)).unwrap();
897
898 ctx
899 }
900
901 #[tokio::test]
902 async fn test_full_uniqueness_single_column() {
903 let values = vec![Some("A"), Some("B"), Some("C"), Some("A")];
904 let ctx = create_test_context(values).await;
905
906 let constraint = UniquenessConstraint::full_uniqueness("test_col", 0.7).unwrap();
907
908 let result = constraint.evaluate(&ctx).await.unwrap();
909 assert_eq!(result.status, ConstraintStatus::Success);
910 assert_eq!(result.metric, Some(0.75)); }
912
913 #[tokio::test]
914 async fn test_full_uniqueness_with_nulls() {
915 let values = vec![Some("A"), Some("B"), None, Some("A")];
916 let ctx = create_test_context(values).await;
917
918 let constraint = UniquenessConstraint::full_uniqueness("test_col", 0.4).unwrap();
920
921 let result = constraint.evaluate(&ctx).await.unwrap();
922 assert_eq!(result.status, ConstraintStatus::Success);
923 assert_eq!(result.metric, Some(0.5)); }
925
926 #[tokio::test]
927 async fn test_distinctness_constraint() {
928 let values = vec![Some("A"), Some("B"), Some("C"), Some("A")];
929 let ctx = create_test_context(values).await;
930
931 let constraint =
932 UniquenessConstraint::distinctness(vec!["test_col"], Assertion::Equals(0.75)).unwrap();
933
934 let result = constraint.evaluate(&ctx).await.unwrap();
935 assert_eq!(result.status, ConstraintStatus::Success);
936 assert_eq!(result.metric, Some(0.75)); }
938
939 #[tokio::test]
940 async fn test_unique_value_ratio_constraint() {
941 let values = vec![Some("A"), Some("B"), Some("C"), Some("A")];
942 let ctx = create_test_context(values).await;
943
944 let constraint =
945 UniquenessConstraint::unique_value_ratio(vec!["test_col"], Assertion::Equals(0.5))
946 .unwrap();
947
948 let result = constraint.evaluate(&ctx).await.unwrap();
949 assert_eq!(result.status, ConstraintStatus::Success);
950 assert_eq!(result.metric, Some(0.5)); }
952
953 #[tokio::test]
954 async fn test_primary_key_success() {
955 let values = vec![Some("A"), Some("B"), Some("C")];
956 let ctx = create_test_context(values).await;
957
958 let constraint = UniquenessConstraint::primary_key(vec!["test_col"]).unwrap();
959
960 let result = constraint.evaluate(&ctx).await.unwrap();
961 assert_eq!(result.status, ConstraintStatus::Success);
962 assert_eq!(result.metric, Some(1.0));
963 }
964
965 #[tokio::test]
966 async fn test_primary_key_with_nulls() {
967 let values = vec![Some("A"), Some("B"), None];
968 let ctx = create_test_context(values).await;
969
970 let constraint = UniquenessConstraint::primary_key(vec!["test_col"]).unwrap();
971
972 let result = constraint.evaluate(&ctx).await.unwrap();
973 assert_eq!(result.status, ConstraintStatus::Failure);
974 assert!(result.message.unwrap().contains("NULL values"));
975 }
976
977 #[tokio::test]
978 async fn test_primary_key_with_duplicates() {
979 let values = vec![Some("A"), Some("B"), Some("A")];
980 let ctx = create_test_context(values).await;
981
982 let constraint = UniquenessConstraint::primary_key(vec!["test_col"]).unwrap();
983
984 let result = constraint.evaluate(&ctx).await.unwrap();
985 assert_eq!(result.status, ConstraintStatus::Failure);
986 assert!(result.message.unwrap().contains("duplicate values"));
987 }
988
989 #[tokio::test]
990 async fn test_multi_column_uniqueness() {
991 let col1_values = vec![Some("A"), Some("B"), Some("A")];
992 let col2_values = vec![Some("1"), Some("2"), Some("2")];
993 let ctx = create_multi_column_test_context(col1_values, col2_values).await;
994
995 let constraint =
996 UniquenessConstraint::full_uniqueness_multi(vec!["col1", "col2"], 0.9).unwrap();
997
998 let result = constraint.evaluate(&ctx).await.unwrap();
999 assert_eq!(result.status, ConstraintStatus::Success);
1000 assert_eq!(result.metric, Some(1.0)); }
1002
1003 #[tokio::test]
1004 async fn test_multi_column_distinctness() {
1005 let col1_values = vec![Some("A"), Some("B"), Some("A")];
1006 let col2_values = vec![Some("1"), Some("2"), Some("1")];
1007 let ctx = create_multi_column_test_context(col1_values, col2_values).await;
1008
1009 let constraint =
1010 UniquenessConstraint::distinctness(vec!["col1", "col2"], Assertion::GreaterThan(0.5))
1011 .unwrap();
1012
1013 let result = constraint.evaluate(&ctx).await.unwrap();
1014 assert_eq!(result.status, ConstraintStatus::Success);
1015 assert!((result.metric.unwrap() - 2.0 / 3.0).abs() < 0.01);
1017 }
1018
1019 #[tokio::test]
1020 async fn test_unique_with_nulls_include() {
1021 let values = vec![Some("A"), Some("B"), None, None];
1022 let ctx = create_test_context(values).await;
1023
1024 let constraint =
1025 UniquenessConstraint::unique_with_nulls(vec!["test_col"], 0.4, NullHandling::Include)
1026 .unwrap();
1027
1028 let result = constraint.evaluate(&ctx).await.unwrap();
1029 assert_eq!(result.status, ConstraintStatus::Success);
1030 assert_eq!(result.metric, Some(0.75)); }
1032
1033 #[tokio::test]
1034 async fn test_empty_data() {
1035 let values: Vec<Option<&str>> = vec![];
1036 let ctx = create_test_context(values).await;
1037
1038 let constraint = UniquenessConstraint::full_uniqueness("test_col", 1.0).unwrap();
1039
1040 let result = constraint.evaluate(&ctx).await.unwrap();
1041 assert_eq!(result.status, ConstraintStatus::Skipped);
1042 }
1043
1044 #[tokio::test]
1045 async fn test_invalid_threshold() {
1046 let result = UniquenessConstraint::full_uniqueness("col", 1.5);
1047 assert!(result.is_err());
1048 assert!(result
1049 .unwrap_err()
1050 .to_string()
1051 .contains("Threshold must be between 0.0 and 1.0"));
1052 }
1053
1054 #[tokio::test]
1055 async fn test_empty_columns() {
1056 let columns: Vec<String> = vec![];
1057 let result = UniquenessConstraint::new(
1058 columns,
1059 UniquenessType::FullUniqueness { threshold: 1.0 },
1060 UniquenessOptions::default(),
1061 );
1062 assert!(result.is_err());
1063 assert!(result
1064 .unwrap_err()
1065 .to_string()
1066 .contains("At least one column must be specified"));
1067 }
1068
1069 #[tokio::test]
1070 async fn test_constraint_metadata() {
1071 let constraint = UniquenessConstraint::full_uniqueness("test_col", 0.95).unwrap();
1072 let metadata = constraint.metadata();
1073
1074 assert!(metadata
1075 .description
1076 .unwrap_or_default()
1077 .contains("Unified uniqueness constraint"));
1078 assert_eq!(constraint.name(), "full_uniqueness");
1079 assert_eq!(constraint.column(), Some("test_col"));
1080 }
1081
1082 #[tokio::test]
1083 async fn test_multi_column_metadata() {
1084 let constraint =
1085 UniquenessConstraint::full_uniqueness_multi(vec!["col1", "col2"], 0.9).unwrap();
1086
1087 assert_eq!(constraint.column(), None); assert_eq!(constraint.columns(), &["col1", "col2"]);
1089 }
1090}