1use crate::core::{current_validation_context, Constraint, ConstraintResult, ConstraintStatus};
10use crate::error::Result;
11use crate::security::SqlSecurity;
12use arrow::array::Array;
13use async_trait::async_trait;
14use datafusion::execution::context::SessionContext;
15use serde::{Deserialize, Serialize};
16use std::fmt;
17use tracing::instrument;
18#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
20pub enum LengthAssertion {
21 Min(usize),
23 Max(usize),
25 Between(usize, usize),
27 Exactly(usize),
29 NotEmpty,
31}
32
33impl LengthAssertion {
34 fn sql_condition(&self, column: &str) -> String {
36 match self {
37 LengthAssertion::Min(min) => format!("LENGTH({column}) >= {min}"),
38 LengthAssertion::Max(max) => format!("LENGTH({column}) <= {max}"),
39 LengthAssertion::Between(min, max) => {
40 format!("LENGTH({column}) >= {min} AND LENGTH({column}) <= {max}")
41 }
42 LengthAssertion::Exactly(len) => format!("LENGTH({column}) = {len}"),
43 LengthAssertion::NotEmpty => format!("LENGTH({column}) >= 1"),
44 }
45 }
46
47 fn name(&self) -> &str {
49 match self {
50 LengthAssertion::Min(_) => "min_length",
51 LengthAssertion::Max(_) => "max_length",
52 LengthAssertion::Between(_, _) => "length_between",
53 LengthAssertion::Exactly(_) => "exact_length",
54 LengthAssertion::NotEmpty => "not_empty",
55 }
56 }
57
58 fn description(&self) -> String {
60 match self {
61 LengthAssertion::Min(min) => format!("at least {min} characters"),
62 LengthAssertion::Max(max) => format!("at most {max} characters"),
63 LengthAssertion::Between(min, max) => format!("between {min} and {max} characters"),
64 LengthAssertion::Exactly(len) => format!("exactly {len} characters"),
65 LengthAssertion::NotEmpty => "not empty".to_string(),
66 }
67 }
68}
69
70impl fmt::Display for LengthAssertion {
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 write!(f, "{}", self.description())
73 }
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct LengthConstraint {
104 column: String,
106 assertion: LengthAssertion,
108}
109
110impl LengthConstraint {
111 pub fn new(column: impl Into<String>, assertion: LengthAssertion) -> Self {
118 Self {
119 column: column.into(),
120 assertion,
121 }
122 }
123
124 pub fn min(column: impl Into<String>, min_length: usize) -> Self {
126 Self::new(column, LengthAssertion::Min(min_length))
127 }
128
129 pub fn max(column: impl Into<String>, max_length: usize) -> Self {
131 Self::new(column, LengthAssertion::Max(max_length))
132 }
133
134 pub fn between(column: impl Into<String>, min_length: usize, max_length: usize) -> Self {
136 assert!(min_length <= max_length, "min_length must be <= max_length");
137 Self::new(column, LengthAssertion::Between(min_length, max_length))
138 }
139
140 pub fn exactly(column: impl Into<String>, length: usize) -> Self {
142 Self::new(column, LengthAssertion::Exactly(length))
143 }
144
145 pub fn not_empty(column: impl Into<String>) -> Self {
147 Self::new(column, LengthAssertion::NotEmpty)
148 }
149}
150
151#[async_trait]
152impl Constraint for LengthConstraint {
153 #[instrument(skip(self, ctx), fields(
154 column = %self.column,
155 assertion = %self.assertion
156 ))]
157 async fn evaluate(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
158 let column_identifier = SqlSecurity::escape_identifier(&self.column)?;
159 let condition = self.assertion.sql_condition(&column_identifier);
160
161 let validation_ctx = current_validation_context();
164
165 let table_name = validation_ctx.table_name();
166
167 let sql = format!(
168 "SELECT
169 COUNT(CASE WHEN {condition} OR {column_identifier} IS NULL THEN 1 END) * 1.0 / NULLIF(COUNT(*), 0) as ratio
170 FROM {table_name}"
171 );
172
173 let df = ctx.sql(&sql).await?;
174 let batches = df.collect().await?;
175
176 if batches.is_empty() || batches[0].num_rows() == 0 {
177 return Ok(ConstraintResult::skipped("No data to validate"));
178 }
179
180 let ratio_array = batches[0]
181 .column(0)
182 .as_any()
183 .downcast_ref::<arrow::array::Float64Array>()
184 .ok_or_else(|| {
185 crate::error::TermError::constraint_evaluation(
186 self.name(),
187 "Failed to extract ratio from result",
188 )
189 })?;
190
191 if ratio_array.is_null(0) {
193 return Ok(ConstraintResult::skipped("No data to validate"));
194 }
195
196 let ratio = ratio_array.value(0);
197
198 let status = if ratio >= 1.0 {
199 ConstraintStatus::Success
200 } else {
201 ConstraintStatus::Failure
202 };
203
204 let message = if status == ConstraintStatus::Failure {
205 Some(format!(
206 "Length constraint failed: {:.2}% of values are {}",
207 ratio * 100.0,
208 self.assertion.description()
209 ))
210 } else {
211 None
212 };
213
214 Ok(ConstraintResult {
215 status,
216 metric: Some(ratio),
217 message,
218 })
219 }
220
221 fn name(&self) -> &str {
222 self.assertion.name()
223 }
224
225 fn column(&self) -> Option<&str> {
226 Some(&self.column)
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233 use arrow::array::StringArray;
234 use arrow::record_batch::RecordBatch;
235 use datafusion::arrow::datatypes::{DataType, Field, Schema};
236 use std::sync::Arc;
237
238 use crate::test_helpers::evaluate_constraint_with_context;
239 async fn create_test_context(data: Vec<Option<&str>>) -> SessionContext {
240 let ctx = SessionContext::new();
241 let string_data = StringArray::from(data);
242 let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, true)]));
243 let batch = RecordBatch::try_new(schema, vec![Arc::new(string_data)]).unwrap();
244 ctx.register_batch("data", batch).unwrap();
245 ctx
246 }
247
248 #[tokio::test]
249 async fn test_min_length_constraint() {
250 let ctx = create_test_context(vec![
251 Some("hello"), Some("world"), Some("testing"), Some("great"), None, ])
257 .await;
258
259 let constraint = LengthConstraint::min("text", 5);
260 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
261 .await
262 .unwrap();
263
264 assert_eq!(result.status, ConstraintStatus::Success);
265 assert_eq!(result.metric, Some(1.0)); assert_eq!(constraint.name(), "min_length");
267 }
268
269 #[tokio::test]
270 async fn test_min_length_constraint_failure() {
271 let ctx = create_test_context(vec![
272 Some("hi"), Some("hello"), Some("a"), Some("testing"), None, ])
278 .await;
279
280 let constraint = LengthConstraint::min("text", 5);
281 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
282 .await
283 .unwrap();
284
285 assert_eq!(result.status, ConstraintStatus::Failure);
286 assert_eq!(result.metric, Some(0.6)); assert!(result.message.unwrap().contains("at least 5 characters"));
288 }
289
290 #[tokio::test]
291 async fn test_max_length_constraint() {
292 let ctx = create_test_context(vec![Some("hi"), Some("hey"), Some("test"), None]).await;
293
294 let constraint = LengthConstraint::max("text", 10);
295 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
296 .await
297 .unwrap();
298
299 assert_eq!(result.status, ConstraintStatus::Success);
300 assert_eq!(result.metric, Some(1.0));
301 assert_eq!(constraint.name(), "max_length");
302 }
303
304 #[tokio::test]
305 async fn test_max_length_constraint_failure() {
306 let ctx = create_test_context(vec![
307 Some("short"),
308 Some("this is a very long string that exceeds the limit"),
309 Some("ok"),
310 None,
311 ])
312 .await;
313
314 let constraint = LengthConstraint::max("text", 10);
315 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
316 .await
317 .unwrap();
318
319 assert_eq!(result.status, ConstraintStatus::Failure);
320 assert_eq!(result.metric, Some(0.75)); assert!(result.message.unwrap().contains("at most 10 characters"));
322 }
323
324 #[tokio::test]
325 async fn test_between_length_constraint() {
326 let ctx = create_test_context(vec![
327 Some("hello"), Some("testing"), Some("hi"), Some("this is way too long"), None,
332 ])
333 .await;
334
335 let constraint = LengthConstraint::between("text", 3, 10);
336 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
337 .await
338 .unwrap();
339
340 assert_eq!(result.status, ConstraintStatus::Failure);
341 assert_eq!(result.metric, Some(0.6)); assert_eq!(constraint.name(), "length_between");
343 assert!(result
344 .message
345 .unwrap()
346 .contains("between 3 and 10 characters"));
347 }
348
349 #[tokio::test]
350 async fn test_exactly_length_constraint() {
351 let ctx = create_test_context(vec![
352 Some("hello"), Some("world"), Some("test"), Some("testing"), None,
357 ])
358 .await;
359
360 let constraint = LengthConstraint::exactly("text", 5);
361 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
362 .await
363 .unwrap();
364
365 assert_eq!(result.status, ConstraintStatus::Failure);
366 assert_eq!(result.metric, Some(0.6)); assert_eq!(constraint.name(), "exact_length");
368 assert!(result.message.unwrap().contains("exactly 5 characters"));
369 }
370
371 #[tokio::test]
372 async fn test_not_empty_constraint() {
373 let ctx = create_test_context(vec![
374 Some("hello"),
375 Some("a"), Some(""), Some("testing"),
378 None, ])
380 .await;
381
382 let constraint = LengthConstraint::not_empty("text");
383 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
384 .await
385 .unwrap();
386
387 assert_eq!(result.status, ConstraintStatus::Failure);
388 assert_eq!(result.metric, Some(0.8)); assert_eq!(constraint.name(), "not_empty");
390 assert!(result.message.unwrap().contains("not empty"));
391 }
392
393 #[tokio::test]
394 async fn test_utf8_multibyte_characters() {
395 let ctx = create_test_context(vec![
396 Some("hello"), Some("你好"), Some("🦀🔥"), Some("café"), None,
401 ])
402 .await;
403
404 let constraint = LengthConstraint::min("text", 2);
406 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
407 .await
408 .unwrap();
409
410 assert_eq!(result.status, ConstraintStatus::Success);
411 }
413
414 #[tokio::test]
415 async fn test_all_null_values() {
416 let ctx = create_test_context(vec![None, None, None]).await;
417
418 let constraint = LengthConstraint::min("text", 5);
419 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
420 .await
421 .unwrap();
422
423 assert_eq!(result.status, ConstraintStatus::Success);
425 assert_eq!(result.metric, Some(1.0));
426 }
427
428 #[tokio::test]
429 async fn test_empty_data() {
430 let ctx = create_test_context(vec![]).await;
431
432 let constraint = LengthConstraint::min("text", 5);
433 let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
434 .await
435 .unwrap();
436
437 assert_eq!(result.status, ConstraintStatus::Skipped);
438 }
439
440 #[test]
441 fn test_length_assertion_display() {
442 assert_eq!(LengthAssertion::Min(5).to_string(), "at least 5 characters");
443 assert_eq!(
444 LengthAssertion::Max(10).to_string(),
445 "at most 10 characters"
446 );
447 assert_eq!(
448 LengthAssertion::Between(3, 8).to_string(),
449 "between 3 and 8 characters"
450 );
451 assert_eq!(
452 LengthAssertion::Exactly(6).to_string(),
453 "exactly 6 characters"
454 );
455 assert_eq!(LengthAssertion::NotEmpty.to_string(), "not empty");
456 }
457
458 #[test]
459 #[should_panic(expected = "min_length must be <= max_length")]
460 fn test_invalid_between_constraint() {
461 LengthConstraint::between("test", 10, 5); }
463}