term_guard/constraints/
size.rs1use crate::constraints::Assertion;
4use crate::core::{current_validation_context, Constraint, ConstraintMetadata, ConstraintResult};
5use crate::prelude::*;
6use async_trait::async_trait;
7use datafusion::prelude::*;
8use tracing::{debug, instrument};
9#[derive(Debug, Clone)]
31pub struct SizeConstraint {
32 assertion: Assertion,
34}
35
36impl SizeConstraint {
37 pub fn new(assertion: Assertion) -> Self {
43 Self { assertion }
44 }
45}
46
47#[async_trait]
48impl Constraint for SizeConstraint {
49 #[instrument(skip(self, ctx), fields(
50 constraint.name = %self.name(),
51 constraint.assertion = %self.assertion
52 ))]
53 async fn evaluate(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
54 debug!(
55 constraint.name = %self.name(),
56 constraint.assertion = %self.assertion,
57 "Starting size constraint evaluation"
58 );
59 let validation_ctx = current_validation_context();
63
64 let table_name = validation_ctx.table_name();
65
66 let sql = format!("SELECT COUNT(*) as row_count FROM {table_name}");
67
68 let df = ctx.sql(&sql).await?;
70 let batches = df.collect().await?;
71
72 if batches.is_empty() {
74 debug!(
75 constraint.name = %self.name(),
76 skip.reason = "No data to validate",
77 "Skipping constraint due to empty result set"
78 );
79 return Ok(ConstraintResult::skipped("No data to validate"));
80 }
81
82 let batch = &batches[0];
83
84 if batch.num_rows() == 0 {
86 return Ok(ConstraintResult::skipped("No data to validate"));
87 }
88
89 let row_count = batch
90 .column(0)
91 .as_any()
92 .downcast_ref::<arrow::array::Int64Array>()
93 .ok_or_else(|| TermError::Internal("Failed to extract row count".to_string()))?
94 .value(0) as f64;
95
96 if self.assertion.evaluate(row_count) {
98 debug!(
99 constraint.name = %self.name(),
100 constraint.assertion = %self.assertion,
101 result.row_count = row_count as i64,
102 result.status = "success",
103 "Size constraint passed"
104 );
105 Ok(ConstraintResult::success_with_metric(row_count))
106 } else {
107 debug!(
108 constraint.name = %self.name(),
109 constraint.assertion = %self.assertion,
110 result.row_count = row_count as i64,
111 result.status = "failure",
112 "Size constraint failed"
113 );
114 Ok(ConstraintResult::failure_with_metric(
115 row_count,
116 format!("Size {row_count} does not {}", self.assertion),
117 ))
118 }
119 }
120
121 fn name(&self) -> &str {
122 "size"
123 }
124
125 fn metadata(&self) -> ConstraintMetadata {
126 ConstraintMetadata::new()
127 .with_description(format!(
128 "Checks that the dataset size {}",
129 self.assertion.description()
130 ))
131 .with_custom("assertion", self.assertion.to_string())
132 .with_custom("constraint_type", "statistical")
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139 use crate::core::ConstraintStatus;
140 use arrow::array::Int64Array;
141 use arrow::datatypes::{DataType, Field, Schema};
142 use arrow::record_batch::RecordBatch;
143 use datafusion::datasource::MemTable;
144 use std::sync::Arc;
145
146 use crate::test_helpers::evaluate_constraint_with_context;
147 async fn create_test_context(num_rows: usize) -> SessionContext {
148 let ctx = SessionContext::new();
149
150 let schema = Arc::new(Schema::new(vec![Field::new(
152 "value",
153 DataType::Int64,
154 false,
155 )]));
156
157 let values: Vec<i64> = (0..num_rows as i64).collect();
159 let array = Int64Array::from(values);
160 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
161
162 let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
164 ctx.register_table("data", Arc::new(provider)).unwrap();
165
166 ctx
167 }
168
169 #[tokio::test]
170 async fn test_size_equals() {
171 let ctx = create_test_context(100).await;
172 let constraint = SizeConstraint::new(Assertion::Equals(100.0));
173
174 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
175 .await
176 .unwrap();
177 assert_eq!(result.status, ConstraintStatus::Success);
178 assert_eq!(result.metric, Some(100.0));
179 }
180
181 #[tokio::test]
182 async fn test_size_greater_than() {
183 let ctx = create_test_context(50).await;
184 let constraint = SizeConstraint::new(Assertion::GreaterThan(25.0));
185
186 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
187 .await
188 .unwrap();
189 assert_eq!(result.status, ConstraintStatus::Success);
190 assert_eq!(result.metric, Some(50.0));
191 }
192
193 #[tokio::test]
194 async fn test_size_between() {
195 let ctx = create_test_context(75).await;
196 let constraint = SizeConstraint::new(Assertion::Between(50.0, 100.0));
197
198 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
199 .await
200 .unwrap();
201 assert_eq!(result.status, ConstraintStatus::Success);
202 assert_eq!(result.metric, Some(75.0));
203 }
204
205 #[tokio::test]
206 async fn test_size_failure() {
207 let ctx = create_test_context(10).await;
208 let constraint = SizeConstraint::new(Assertion::GreaterThan(50.0));
209
210 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
211 .await
212 .unwrap();
213 assert_eq!(result.status, ConstraintStatus::Failure);
214 assert_eq!(result.metric, Some(10.0));
215 }
216
217 #[tokio::test]
218 async fn test_empty_data() {
219 let ctx = create_test_context(0).await;
220 let constraint = SizeConstraint::new(Assertion::Equals(0.0));
221
222 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
223 .await
224 .unwrap();
225 assert_eq!(result.status, ConstraintStatus::Success);
226 assert_eq!(result.metric, Some(0.0));
227 }
228}