1use crate::core::{Constraint, ConstraintResult, ConstraintStatus};
10use crate::error::Result;
11use crate::security::SqlSecurity;
12use arrow::array::Array;
13use async_trait::async_trait;
14use datafusion::execution::context::SessionContext;
15use serde::{Deserialize, Serialize};
16use std::fmt;
17use tracing::instrument;
18
19#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
21pub enum LengthAssertion {
22 Min(usize),
24 Max(usize),
26 Between(usize, usize),
28 Exactly(usize),
30 NotEmpty,
32}
33
34impl LengthAssertion {
35 fn sql_condition(&self, column: &str) -> String {
37 match self {
38 LengthAssertion::Min(min) => format!("LENGTH({column}) >= {min}"),
39 LengthAssertion::Max(max) => format!("LENGTH({column}) <= {max}"),
40 LengthAssertion::Between(min, max) => {
41 format!("LENGTH({column}) >= {min} AND LENGTH({column}) <= {max}")
42 }
43 LengthAssertion::Exactly(len) => format!("LENGTH({column}) = {len}"),
44 LengthAssertion::NotEmpty => format!("LENGTH({column}) >= 1"),
45 }
46 }
47
48 fn name(&self) -> &str {
50 match self {
51 LengthAssertion::Min(_) => "min_length",
52 LengthAssertion::Max(_) => "max_length",
53 LengthAssertion::Between(_, _) => "length_between",
54 LengthAssertion::Exactly(_) => "exact_length",
55 LengthAssertion::NotEmpty => "not_empty",
56 }
57 }
58
59 fn description(&self) -> String {
61 match self {
62 LengthAssertion::Min(min) => format!("at least {min} characters"),
63 LengthAssertion::Max(max) => format!("at most {max} characters"),
64 LengthAssertion::Between(min, max) => format!("between {min} and {max} characters"),
65 LengthAssertion::Exactly(len) => format!("exactly {len} characters"),
66 LengthAssertion::NotEmpty => "not empty".to_string(),
67 }
68 }
69}
70
71impl fmt::Display for LengthAssertion {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 write!(f, "{}", self.description())
74 }
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct LengthConstraint {
105 column: String,
107 assertion: LengthAssertion,
109}
110
111impl LengthConstraint {
112 pub fn new(column: impl Into<String>, assertion: LengthAssertion) -> Self {
119 Self {
120 column: column.into(),
121 assertion,
122 }
123 }
124
125 pub fn min(column: impl Into<String>, min_length: usize) -> Self {
127 Self::new(column, LengthAssertion::Min(min_length))
128 }
129
130 pub fn max(column: impl Into<String>, max_length: usize) -> Self {
132 Self::new(column, LengthAssertion::Max(max_length))
133 }
134
135 pub fn between(column: impl Into<String>, min_length: usize, max_length: usize) -> Self {
137 assert!(min_length <= max_length, "min_length must be <= max_length");
138 Self::new(column, LengthAssertion::Between(min_length, max_length))
139 }
140
141 pub fn exactly(column: impl Into<String>, length: usize) -> Self {
143 Self::new(column, LengthAssertion::Exactly(length))
144 }
145
146 pub fn not_empty(column: impl Into<String>) -> Self {
148 Self::new(column, LengthAssertion::NotEmpty)
149 }
150}
151
152#[async_trait]
153impl Constraint for LengthConstraint {
154 #[instrument(skip(self, ctx), fields(
155 column = %self.column,
156 assertion = %self.assertion
157 ))]
158 async fn evaluate(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
159 let column_identifier = SqlSecurity::escape_identifier(&self.column)?;
160 let condition = self.assertion.sql_condition(&column_identifier);
161
162 let sql = format!(
163 "SELECT
164 COUNT(CASE WHEN {condition} OR {column_identifier} IS NULL THEN 1 END) * 1.0 / NULLIF(COUNT(*), 0) as ratio
165 FROM data"
166 );
167
168 let df = ctx.sql(&sql).await?;
169 let batches = df.collect().await?;
170
171 if batches.is_empty() || batches[0].num_rows() == 0 {
172 return Ok(ConstraintResult::skipped("No data to validate"));
173 }
174
175 let ratio_array = batches[0]
176 .column(0)
177 .as_any()
178 .downcast_ref::<arrow::array::Float64Array>()
179 .ok_or_else(|| {
180 crate::error::TermError::constraint_evaluation(
181 self.name(),
182 "Failed to extract ratio from result",
183 )
184 })?;
185
186 if ratio_array.is_null(0) {
188 return Ok(ConstraintResult::skipped("No data to validate"));
189 }
190
191 let ratio = ratio_array.value(0);
192
193 let status = if ratio >= 1.0 {
194 ConstraintStatus::Success
195 } else {
196 ConstraintStatus::Failure
197 };
198
199 let message = if status == ConstraintStatus::Failure {
200 Some(format!(
201 "Length constraint failed: {:.2}% of values are {}",
202 ratio * 100.0,
203 self.assertion.description()
204 ))
205 } else {
206 None
207 };
208
209 Ok(ConstraintResult {
210 status,
211 metric: Some(ratio),
212 message,
213 })
214 }
215
216 fn name(&self) -> &str {
217 self.assertion.name()
218 }
219
220 fn column(&self) -> Option<&str> {
221 Some(&self.column)
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use super::*;
228 use arrow::array::StringArray;
229 use arrow::record_batch::RecordBatch;
230 use datafusion::arrow::datatypes::{DataType, Field, Schema};
231 use std::sync::Arc;
232
233 async fn create_test_context(data: Vec<Option<&str>>) -> SessionContext {
234 let ctx = SessionContext::new();
235 let string_data = StringArray::from(data);
236 let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, true)]));
237 let batch = RecordBatch::try_new(schema, vec![Arc::new(string_data)]).unwrap();
238 ctx.register_batch("data", batch).unwrap();
239 ctx
240 }
241
242 #[tokio::test]
243 async fn test_min_length_constraint() {
244 let ctx = create_test_context(vec![
245 Some("hello"), Some("world"), Some("testing"), Some("great"), None, ])
251 .await;
252
253 let constraint = LengthConstraint::min("text", 5);
254 let result = constraint.evaluate(&ctx).await.unwrap();
255
256 assert_eq!(result.status, ConstraintStatus::Success);
257 assert_eq!(result.metric, Some(1.0)); assert_eq!(constraint.name(), "min_length");
259 }
260
261 #[tokio::test]
262 async fn test_min_length_constraint_failure() {
263 let ctx = create_test_context(vec![
264 Some("hi"), Some("hello"), Some("a"), Some("testing"), None, ])
270 .await;
271
272 let constraint = LengthConstraint::min("text", 5);
273 let result = constraint.evaluate(&ctx).await.unwrap();
274
275 assert_eq!(result.status, ConstraintStatus::Failure);
276 assert_eq!(result.metric, Some(0.6)); assert!(result.message.unwrap().contains("at least 5 characters"));
278 }
279
280 #[tokio::test]
281 async fn test_max_length_constraint() {
282 let ctx = create_test_context(vec![Some("hi"), Some("hey"), Some("test"), None]).await;
283
284 let constraint = LengthConstraint::max("text", 10);
285 let result = constraint.evaluate(&ctx).await.unwrap();
286
287 assert_eq!(result.status, ConstraintStatus::Success);
288 assert_eq!(result.metric, Some(1.0));
289 assert_eq!(constraint.name(), "max_length");
290 }
291
292 #[tokio::test]
293 async fn test_max_length_constraint_failure() {
294 let ctx = create_test_context(vec![
295 Some("short"),
296 Some("this is a very long string that exceeds the limit"),
297 Some("ok"),
298 None,
299 ])
300 .await;
301
302 let constraint = LengthConstraint::max("text", 10);
303 let result = constraint.evaluate(&ctx).await.unwrap();
304
305 assert_eq!(result.status, ConstraintStatus::Failure);
306 assert_eq!(result.metric, Some(0.75)); assert!(result.message.unwrap().contains("at most 10 characters"));
308 }
309
310 #[tokio::test]
311 async fn test_between_length_constraint() {
312 let ctx = create_test_context(vec![
313 Some("hello"), Some("testing"), Some("hi"), Some("this is way too long"), None,
318 ])
319 .await;
320
321 let constraint = LengthConstraint::between("text", 3, 10);
322 let result = constraint.evaluate(&ctx).await.unwrap();
323
324 assert_eq!(result.status, ConstraintStatus::Failure);
325 assert_eq!(result.metric, Some(0.6)); assert_eq!(constraint.name(), "length_between");
327 assert!(result
328 .message
329 .unwrap()
330 .contains("between 3 and 10 characters"));
331 }
332
333 #[tokio::test]
334 async fn test_exactly_length_constraint() {
335 let ctx = create_test_context(vec![
336 Some("hello"), Some("world"), Some("test"), Some("testing"), None,
341 ])
342 .await;
343
344 let constraint = LengthConstraint::exactly("text", 5);
345 let result = constraint.evaluate(&ctx).await.unwrap();
346
347 assert_eq!(result.status, ConstraintStatus::Failure);
348 assert_eq!(result.metric, Some(0.6)); assert_eq!(constraint.name(), "exact_length");
350 assert!(result.message.unwrap().contains("exactly 5 characters"));
351 }
352
353 #[tokio::test]
354 async fn test_not_empty_constraint() {
355 let ctx = create_test_context(vec![
356 Some("hello"),
357 Some("a"), Some(""), Some("testing"),
360 None, ])
362 .await;
363
364 let constraint = LengthConstraint::not_empty("text");
365 let result = constraint.evaluate(&ctx).await.unwrap();
366
367 assert_eq!(result.status, ConstraintStatus::Failure);
368 assert_eq!(result.metric, Some(0.8)); assert_eq!(constraint.name(), "not_empty");
370 assert!(result.message.unwrap().contains("not empty"));
371 }
372
373 #[tokio::test]
374 async fn test_utf8_multibyte_characters() {
375 let ctx = create_test_context(vec![
376 Some("hello"), Some("你好"), Some("🦀🔥"), Some("café"), None,
381 ])
382 .await;
383
384 let constraint = LengthConstraint::min("text", 2);
386 let result = constraint.evaluate(&ctx).await.unwrap();
387
388 assert_eq!(result.status, ConstraintStatus::Success);
389 }
391
392 #[tokio::test]
393 async fn test_all_null_values() {
394 let ctx = create_test_context(vec![None, None, None]).await;
395
396 let constraint = LengthConstraint::min("text", 5);
397 let result = constraint.evaluate(&ctx).await.unwrap();
398
399 assert_eq!(result.status, ConstraintStatus::Success);
401 assert_eq!(result.metric, Some(1.0));
402 }
403
404 #[tokio::test]
405 async fn test_empty_data() {
406 let ctx = create_test_context(vec![]).await;
407
408 let constraint = LengthConstraint::min("text", 5);
409 let result = constraint.evaluate(&ctx).await.unwrap();
410
411 assert_eq!(result.status, ConstraintStatus::Skipped);
412 }
413
414 #[test]
415 fn test_length_assertion_display() {
416 assert_eq!(LengthAssertion::Min(5).to_string(), "at least 5 characters");
417 assert_eq!(
418 LengthAssertion::Max(10).to_string(),
419 "at most 10 characters"
420 );
421 assert_eq!(
422 LengthAssertion::Between(3, 8).to_string(),
423 "between 3 and 8 characters"
424 );
425 assert_eq!(
426 LengthAssertion::Exactly(6).to_string(),
427 "exactly 6 characters"
428 );
429 assert_eq!(LengthAssertion::NotEmpty.to_string(), "not empty");
430 }
431
432 #[test]
433 #[should_panic(expected = "min_length must be <= max_length")]
434 fn test_invalid_between_constraint() {
435 LengthConstraint::between("test", 10, 5); }
437}