term_guard/constraints/
approx_count_distinct.rs

1//! Approximate count distinct validation constraint.
2
3use crate::constraints::Assertion;
4use crate::core::{
5    current_validation_context, Constraint, ConstraintMetadata, ConstraintResult, ConstraintStatus,
6};
7use crate::prelude::*;
8use async_trait::async_trait;
9use datafusion::prelude::*;
10use tracing::instrument;
11/// A constraint that validates the approximate count of distinct values in a column.
12///
13/// This constraint uses DataFusion's APPROX_DISTINCT function which provides
14/// an approximate count using HyperLogLog algorithm. This is much faster than
15/// exact COUNT(DISTINCT) for large datasets while maintaining accuracy within
16/// 2-3% error margin.
17///
18/// # Examples
19///
20/// ```rust
21/// use term_guard::constraints::{ApproxCountDistinctConstraint, Assertion};
22/// use term_guard::core::Constraint;
23///
24/// // Check for high cardinality (e.g., user IDs)
25/// let constraint = ApproxCountDistinctConstraint::new("user_id", Assertion::GreaterThan(1000000.0));
26/// assert_eq!(constraint.name(), "approx_count_distinct");
27///
28/// // Check for low cardinality (e.g., country codes)
29/// let constraint = ApproxCountDistinctConstraint::new("country_code", Assertion::LessThan(200.0));
30/// ```
31#[derive(Debug, Clone)]
32pub struct ApproxCountDistinctConstraint {
33    column: String,
34    assertion: Assertion,
35}
36
37impl ApproxCountDistinctConstraint {
38    /// Creates a new approximate count distinct constraint.
39    ///
40    /// # Arguments
41    ///
42    /// * `column` - The column to count distinct values in
43    /// * `assertion` - The assertion to apply to the approximate distinct count
44    pub fn new<S: Into<String>>(column: S, assertion: Assertion) -> Self {
45        Self {
46            column: column.into(),
47            assertion,
48        }
49    }
50}
51
52#[async_trait]
53impl Constraint for ApproxCountDistinctConstraint {
54    #[instrument(skip(self, ctx), fields(column = %self.column, assertion = ?self.assertion))]
55    async fn evaluate(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
56        // Build the SQL query using APPROX_DISTINCT
57        // Get the table name from the validation context
58
59        let validation_ctx = current_validation_context();
60
61        let table_name = validation_ctx.table_name();
62
63        let sql = format!(
64            "SELECT APPROX_DISTINCT({}) as approx_distinct_count FROM {table_name}",
65            self.column
66        );
67
68        let df = ctx.sql(&sql).await.map_err(|e| {
69            TermError::constraint_evaluation(
70                self.name(),
71                format!("Failed to execute approximate count distinct query: {e}"),
72            )
73        })?;
74
75        let batches = df.collect().await?;
76
77        if batches.is_empty() || batches[0].num_rows() == 0 {
78            return Ok(ConstraintResult::skipped("No data to validate"));
79        }
80
81        let batch = &batches[0];
82
83        // Extract the approximate distinct count
84        let approx_count = batch
85            .column(0)
86            .as_any()
87            .downcast_ref::<arrow::array::UInt64Array>()
88            .ok_or_else(|| {
89                TermError::constraint_evaluation(
90                    self.name(),
91                    "Failed to extract approximate distinct count from result",
92                )
93            })?
94            .value(0) as f64;
95
96        // Evaluate the assertion
97        let assertion_result = self.assertion.evaluate(approx_count);
98
99        let status = if assertion_result {
100            ConstraintStatus::Success
101        } else {
102            ConstraintStatus::Failure
103        };
104
105        let message = if status == ConstraintStatus::Failure {
106            Some(format!(
107                "Approximate distinct count {approx_count} does not satisfy assertion {} for column '{}'",
108                self.assertion.description(),
109                self.column
110            ))
111        } else {
112            None
113        };
114
115        Ok(ConstraintResult {
116            status,
117            metric: Some(approx_count),
118            message,
119        })
120    }
121
122    fn name(&self) -> &str {
123        "approx_count_distinct"
124    }
125
126    fn column(&self) -> Option<&str> {
127        Some(&self.column)
128    }
129
130    fn metadata(&self) -> ConstraintMetadata {
131        ConstraintMetadata::for_column(&self.column)
132            .with_description(format!(
133                "Checks that the approximate distinct count of column '{}' {}",
134                self.column,
135                self.assertion.description()
136            ))
137            .with_custom("assertion", self.assertion.description())
138            .with_custom("constraint_type", "cardinality")
139            .with_custom("algorithm", "HyperLogLog")
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use crate::core::ConstraintStatus;
147    use arrow::array::{Int64Array, StringArray};
148    use arrow::datatypes::{DataType, Field, Schema};
149    use arrow::record_batch::RecordBatch;
150    use datafusion::datasource::MemTable;
151    use std::sync::Arc;
152
153    use crate::test_helpers::evaluate_constraint_with_context;
154    async fn create_test_context_with_data(values: Vec<Option<i64>>) -> SessionContext {
155        let ctx = SessionContext::new();
156
157        let schema = Arc::new(Schema::new(vec![Field::new(
158            "test_col",
159            DataType::Int64,
160            true,
161        )]));
162
163        let array = Int64Array::from(values);
164        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
165
166        let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
167        ctx.register_table("data", Arc::new(provider)).unwrap();
168
169        ctx
170    }
171
172    async fn create_string_context_with_data(values: Vec<Option<&str>>) -> SessionContext {
173        let ctx = SessionContext::new();
174
175        let schema = Arc::new(Schema::new(vec![Field::new(
176            "test_col",
177            DataType::Utf8,
178            true,
179        )]));
180
181        let array = StringArray::from(values);
182        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
183
184        let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
185        ctx.register_table("data", Arc::new(provider)).unwrap();
186
187        ctx
188    }
189
190    #[tokio::test]
191    async fn test_high_cardinality() {
192        // All unique values
193        let values: Vec<Option<i64>> = (0..1000).map(Some).collect();
194        let ctx = create_test_context_with_data(values).await;
195
196        let constraint =
197            ApproxCountDistinctConstraint::new("test_col", Assertion::GreaterThan(990.0));
198        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
199            .await
200            .unwrap();
201
202        assert_eq!(result.status, ConstraintStatus::Success);
203        // APPROX_DISTINCT should be close to 1000
204        assert!(result.metric.unwrap() > 990.0);
205    }
206
207    #[tokio::test]
208    async fn test_low_cardinality() {
209        // Only a few distinct values repeated many times
210        let mut values = Vec::new();
211        for _ in 0..100 {
212            values.push(Some(1));
213            values.push(Some(2));
214            values.push(Some(3));
215        }
216        let ctx = create_test_context_with_data(values).await;
217
218        let constraint = ApproxCountDistinctConstraint::new("test_col", Assertion::LessThan(10.0));
219        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
220            .await
221            .unwrap();
222
223        assert_eq!(result.status, ConstraintStatus::Success);
224        // Should be approximately 3
225        assert!(result.metric.unwrap() < 10.0);
226    }
227
228    #[tokio::test]
229    async fn test_with_nulls() {
230        // Mix of values and nulls
231        let values = vec![
232            Some(1),
233            None,
234            Some(2),
235            None,
236            Some(3),
237            None,
238            Some(1),
239            Some(2),
240            Some(3),
241            None,
242        ];
243        let ctx = create_test_context_with_data(values).await;
244
245        // APPROX_DISTINCT should count distinct non-null values (approximately 3)
246        let constraint =
247            ApproxCountDistinctConstraint::new("test_col", Assertion::Between(2.0, 5.0));
248        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
249            .await
250            .unwrap();
251
252        assert_eq!(result.status, ConstraintStatus::Success);
253        let metric = result.metric.unwrap();
254        assert!((2.0..=5.0).contains(&metric));
255    }
256
257    #[tokio::test]
258    async fn test_constraint_failure() {
259        // Create data with moderate cardinality
260        let values: Vec<Option<i64>> = (0..50).map(|i| Some(i % 10)).collect();
261        let ctx = create_test_context_with_data(values).await;
262
263        // Expect high cardinality but data has low cardinality (~10 distinct values)
264        let constraint =
265            ApproxCountDistinctConstraint::new("test_col", Assertion::GreaterThan(100.0));
266        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
267            .await
268            .unwrap();
269
270        assert_eq!(result.status, ConstraintStatus::Failure);
271        assert!(result.metric.unwrap() < 20.0);
272        assert!(result.message.is_some());
273    }
274
275    #[tokio::test]
276    async fn test_string_column() {
277        let values = vec![
278            Some("apple"),
279            Some("banana"),
280            Some("cherry"),
281            Some("apple"),
282            Some("banana"),
283            Some("date"),
284            Some("elderberry"),
285            None,
286        ];
287        let ctx = create_string_context_with_data(values).await;
288
289        // Should have approximately 5 distinct non-null values
290        let constraint =
291            ApproxCountDistinctConstraint::new("test_col", Assertion::Between(4.0, 6.0));
292        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
293            .await
294            .unwrap();
295
296        assert_eq!(result.status, ConstraintStatus::Success);
297    }
298
299    #[tokio::test]
300    async fn test_empty_data() {
301        let ctx = create_test_context_with_data(vec![]).await;
302
303        // APPROX_DISTINCT returns 0 for empty data, not skipped
304        let constraint = ApproxCountDistinctConstraint::new("test_col", Assertion::Equals(0.0));
305        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
306            .await
307            .unwrap();
308
309        assert_eq!(result.status, ConstraintStatus::Success);
310        assert_eq!(result.metric, Some(0.0));
311    }
312
313    #[tokio::test]
314    async fn test_all_null_values() {
315        let values = vec![None, None, None, None, None];
316        let ctx = create_test_context_with_data(values).await;
317
318        // APPROX_DISTINCT of all nulls should be 0
319        let constraint = ApproxCountDistinctConstraint::new("test_col", Assertion::Equals(0.0));
320        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
321            .await
322            .unwrap();
323
324        assert_eq!(result.status, ConstraintStatus::Success);
325        assert_eq!(result.metric, Some(0.0));
326    }
327
328    #[tokio::test]
329    async fn test_accuracy_comparison() {
330        // Create a dataset large enough to see HyperLogLog in action
331        // but small enough to verify accuracy
332        let mut values = Vec::new();
333        for i in 0..10000 {
334            values.push(Some(i % 1000)); // 1000 distinct values
335        }
336        let ctx = create_test_context_with_data(values).await;
337
338        let constraint =
339            ApproxCountDistinctConstraint::new("test_col", Assertion::Between(970.0, 1030.0));
340        let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
341            .await
342            .unwrap();
343
344        assert_eq!(result.status, ConstraintStatus::Success);
345        // Should be within 3% of 1000
346        let metric = result.metric.unwrap();
347        assert!((970.0..=1030.0).contains(&metric));
348    }
349
350    #[tokio::test]
351    async fn test_metadata() {
352        let constraint =
353            ApproxCountDistinctConstraint::new("user_id", Assertion::GreaterThan(1000000.0));
354        let metadata = constraint.metadata();
355
356        assert_eq!(metadata.columns, vec!["user_id".to_string()]);
357        let description = metadata.description.unwrap_or_default();
358        assert!(description.contains("approximate distinct count"));
359        assert!(description.contains("greater than 1000000"));
360        assert_eq!(
361            metadata.custom.get("algorithm"),
362            Some(&"HyperLogLog".to_string())
363        );
364        assert_eq!(
365            metadata.custom.get("constraint_type"),
366            Some(&"cardinality".to_string())
367        );
368    }
369}