1use crate::constraints::Assertion;
4use crate::core::{
5 current_validation_context, Constraint, ConstraintMetadata, ConstraintResult, ConstraintStatus,
6};
7use crate::prelude::*;
8use async_trait::async_trait;
9use datafusion::prelude::*;
10use tracing::instrument;
11#[derive(Debug, Clone)]
32pub struct ApproxCountDistinctConstraint {
33 column: String,
34 assertion: Assertion,
35}
36
37impl ApproxCountDistinctConstraint {
38 pub fn new<S: Into<String>>(column: S, assertion: Assertion) -> Self {
45 Self {
46 column: column.into(),
47 assertion,
48 }
49 }
50}
51
52#[async_trait]
53impl Constraint for ApproxCountDistinctConstraint {
54 #[instrument(skip(self, ctx), fields(column = %self.column, assertion = ?self.assertion))]
55 async fn evaluate(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
56 let validation_ctx = current_validation_context();
60
61 let table_name = validation_ctx.table_name();
62
63 let sql = format!(
64 "SELECT APPROX_DISTINCT({}) as approx_distinct_count FROM {table_name}",
65 self.column
66 );
67
68 let df = ctx.sql(&sql).await.map_err(|e| {
69 TermError::constraint_evaluation(
70 self.name(),
71 format!("Failed to execute approximate count distinct query: {e}"),
72 )
73 })?;
74
75 let batches = df.collect().await?;
76
77 if batches.is_empty() || batches[0].num_rows() == 0 {
78 return Ok(ConstraintResult::skipped("No data to validate"));
79 }
80
81 let batch = &batches[0];
82
83 let approx_count = batch
85 .column(0)
86 .as_any()
87 .downcast_ref::<arrow::array::UInt64Array>()
88 .ok_or_else(|| {
89 TermError::constraint_evaluation(
90 self.name(),
91 "Failed to extract approximate distinct count from result",
92 )
93 })?
94 .value(0) as f64;
95
96 let assertion_result = self.assertion.evaluate(approx_count);
98
99 let status = if assertion_result {
100 ConstraintStatus::Success
101 } else {
102 ConstraintStatus::Failure
103 };
104
105 let message = if status == ConstraintStatus::Failure {
106 Some(format!(
107 "Approximate distinct count {approx_count} does not satisfy assertion {} for column '{}'",
108 self.assertion.description(),
109 self.column
110 ))
111 } else {
112 None
113 };
114
115 Ok(ConstraintResult {
116 status,
117 metric: Some(approx_count),
118 message,
119 })
120 }
121
122 fn name(&self) -> &str {
123 "approx_count_distinct"
124 }
125
126 fn column(&self) -> Option<&str> {
127 Some(&self.column)
128 }
129
130 fn metadata(&self) -> ConstraintMetadata {
131 ConstraintMetadata::for_column(&self.column)
132 .with_description(format!(
133 "Checks that the approximate distinct count of column '{}' {}",
134 self.column,
135 self.assertion.description()
136 ))
137 .with_custom("assertion", self.assertion.description())
138 .with_custom("constraint_type", "cardinality")
139 .with_custom("algorithm", "HyperLogLog")
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use crate::core::ConstraintStatus;
147 use arrow::array::{Int64Array, StringArray};
148 use arrow::datatypes::{DataType, Field, Schema};
149 use arrow::record_batch::RecordBatch;
150 use datafusion::datasource::MemTable;
151 use std::sync::Arc;
152
153 use crate::test_helpers::evaluate_constraint_with_context;
154 async fn create_test_context_with_data(values: Vec<Option<i64>>) -> SessionContext {
155 let ctx = SessionContext::new();
156
157 let schema = Arc::new(Schema::new(vec![Field::new(
158 "test_col",
159 DataType::Int64,
160 true,
161 )]));
162
163 let array = Int64Array::from(values);
164 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
165
166 let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
167 ctx.register_table("data", Arc::new(provider)).unwrap();
168
169 ctx
170 }
171
172 async fn create_string_context_with_data(values: Vec<Option<&str>>) -> SessionContext {
173 let ctx = SessionContext::new();
174
175 let schema = Arc::new(Schema::new(vec![Field::new(
176 "test_col",
177 DataType::Utf8,
178 true,
179 )]));
180
181 let array = StringArray::from(values);
182 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
183
184 let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
185 ctx.register_table("data", Arc::new(provider)).unwrap();
186
187 ctx
188 }
189
190 #[tokio::test]
191 async fn test_high_cardinality() {
192 let values: Vec<Option<i64>> = (0..1000).map(Some).collect();
194 let ctx = create_test_context_with_data(values).await;
195
196 let constraint =
197 ApproxCountDistinctConstraint::new("test_col", Assertion::GreaterThan(990.0));
198 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
199 .await
200 .unwrap();
201
202 assert_eq!(result.status, ConstraintStatus::Success);
203 assert!(result.metric.unwrap() > 990.0);
205 }
206
207 #[tokio::test]
208 async fn test_low_cardinality() {
209 let mut values = Vec::new();
211 for _ in 0..100 {
212 values.push(Some(1));
213 values.push(Some(2));
214 values.push(Some(3));
215 }
216 let ctx = create_test_context_with_data(values).await;
217
218 let constraint = ApproxCountDistinctConstraint::new("test_col", Assertion::LessThan(10.0));
219 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
220 .await
221 .unwrap();
222
223 assert_eq!(result.status, ConstraintStatus::Success);
224 assert!(result.metric.unwrap() < 10.0);
226 }
227
228 #[tokio::test]
229 async fn test_with_nulls() {
230 let values = vec![
232 Some(1),
233 None,
234 Some(2),
235 None,
236 Some(3),
237 None,
238 Some(1),
239 Some(2),
240 Some(3),
241 None,
242 ];
243 let ctx = create_test_context_with_data(values).await;
244
245 let constraint =
247 ApproxCountDistinctConstraint::new("test_col", Assertion::Between(2.0, 5.0));
248 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
249 .await
250 .unwrap();
251
252 assert_eq!(result.status, ConstraintStatus::Success);
253 let metric = result.metric.unwrap();
254 assert!((2.0..=5.0).contains(&metric));
255 }
256
257 #[tokio::test]
258 async fn test_constraint_failure() {
259 let values: Vec<Option<i64>> = (0..50).map(|i| Some(i % 10)).collect();
261 let ctx = create_test_context_with_data(values).await;
262
263 let constraint =
265 ApproxCountDistinctConstraint::new("test_col", Assertion::GreaterThan(100.0));
266 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
267 .await
268 .unwrap();
269
270 assert_eq!(result.status, ConstraintStatus::Failure);
271 assert!(result.metric.unwrap() < 20.0);
272 assert!(result.message.is_some());
273 }
274
275 #[tokio::test]
276 async fn test_string_column() {
277 let values = vec![
278 Some("apple"),
279 Some("banana"),
280 Some("cherry"),
281 Some("apple"),
282 Some("banana"),
283 Some("date"),
284 Some("elderberry"),
285 None,
286 ];
287 let ctx = create_string_context_with_data(values).await;
288
289 let constraint =
291 ApproxCountDistinctConstraint::new("test_col", Assertion::Between(4.0, 6.0));
292 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
293 .await
294 .unwrap();
295
296 assert_eq!(result.status, ConstraintStatus::Success);
297 }
298
299 #[tokio::test]
300 async fn test_empty_data() {
301 let ctx = create_test_context_with_data(vec![]).await;
302
303 let constraint = ApproxCountDistinctConstraint::new("test_col", Assertion::Equals(0.0));
305 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
306 .await
307 .unwrap();
308
309 assert_eq!(result.status, ConstraintStatus::Success);
310 assert_eq!(result.metric, Some(0.0));
311 }
312
313 #[tokio::test]
314 async fn test_all_null_values() {
315 let values = vec![None, None, None, None, None];
316 let ctx = create_test_context_with_data(values).await;
317
318 let constraint = ApproxCountDistinctConstraint::new("test_col", Assertion::Equals(0.0));
320 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
321 .await
322 .unwrap();
323
324 assert_eq!(result.status, ConstraintStatus::Success);
325 assert_eq!(result.metric, Some(0.0));
326 }
327
328 #[tokio::test]
329 async fn test_accuracy_comparison() {
330 let mut values = Vec::new();
333 for i in 0..10000 {
334 values.push(Some(i % 1000)); }
336 let ctx = create_test_context_with_data(values).await;
337
338 let constraint =
339 ApproxCountDistinctConstraint::new("test_col", Assertion::Between(970.0, 1030.0));
340 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
341 .await
342 .unwrap();
343
344 assert_eq!(result.status, ConstraintStatus::Success);
345 let metric = result.metric.unwrap();
347 assert!((970.0..=1030.0).contains(&metric));
348 }
349
350 #[tokio::test]
351 async fn test_metadata() {
352 let constraint =
353 ApproxCountDistinctConstraint::new("user_id", Assertion::GreaterThan(1000000.0));
354 let metadata = constraint.metadata();
355
356 assert_eq!(metadata.columns, vec!["user_id".to_string()]);
357 let description = metadata.description.unwrap_or_default();
358 assert!(description.contains("approximate distinct count"));
359 assert!(description.contains("greater than 1000000"));
360 assert_eq!(
361 metadata.custom.get("algorithm"),
362 Some(&"HyperLogLog".to_string())
363 );
364 assert_eq!(
365 metadata.custom.get("constraint_type"),
366 Some(&"cardinality".to_string())
367 );
368 }
369}