term_guard/core/
unified.rs1use super::{ColumnSpec, Constraint, ConstraintResult, LogicalOperator};
7use crate::core::current_validation_context;
8use crate::prelude::*;
9use async_trait::async_trait;
10use datafusion::prelude::*;
11use std::collections::HashMap;
12
13#[async_trait]
18pub trait UnifiedConstraint: Constraint {
19 fn column_spec(&self) -> &ColumnSpec;
21
22 fn logical_operator(&self) -> Option<LogicalOperator> {
24 None
25 }
26
27 fn options(&self) -> HashMap<String, String> {
29 HashMap::new()
30 }
31
32 async fn evaluate_column(&self, ctx: &SessionContext, column: &str)
38 -> Result<ConstraintResult>;
39
40 async fn evaluate_unified(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
42 let columns = self.column_spec().as_vec();
43
44 match columns.len() {
45 0 => Ok(ConstraintResult::skipped("No columns specified")),
46 1 => {
47 self.evaluate_column(ctx, columns[0]).await
49 }
50 _ => {
51 let mut results = Vec::new();
53 let mut metrics = Vec::new();
54
55 for column in &columns {
56 let result = self.evaluate_column(ctx, column).await?;
57 results.push((column.to_string(), result.status.is_success()));
58 if let Some(metric) = result.metric {
59 metrics.push(metric);
60 }
61 }
62
63 let operator = self.logical_operator().unwrap_or(LogicalOperator::All);
65 let bools: Vec<bool> = results.iter().map(|(_, b)| *b).collect();
66 let combined_result = operator.evaluate(&bools);
67
68 let combined_metric = if metrics.is_empty() {
70 None
71 } else {
72 Some(metrics.iter().sum::<f64>() / metrics.len() as f64)
73 };
74
75 let message = if combined_result {
77 match operator {
78 LogicalOperator::All => Some(format!(
79 "All {} columns satisfy the constraint",
80 columns.len()
81 )),
82 LogicalOperator::Any => {
83 let passed = results
84 .iter()
85 .filter(|(_, b)| *b)
86 .map(|(c, _)| c.as_str())
87 .collect::<Vec<_>>();
88 Some(format!(
89 "Columns {} satisfy the constraint",
90 passed.join(", ")
91 ))
92 }
93 _ => None,
94 }
95 } else {
96 let failed = results
97 .iter()
98 .filter(|(_, b)| !*b)
99 .map(|(c, _)| c.as_str())
100 .collect::<Vec<_>>();
101 Some(format!(
102 "Constraint failed for columns: {}. Required: {}",
103 failed.join(", "),
104 operator.description()
105 ))
106 };
107
108 if combined_result {
109 Ok(ConstraintResult {
110 status: crate::core::ConstraintStatus::Success,
111 metric: combined_metric,
112 message,
113 })
114 } else {
115 Ok(ConstraintResult {
116 status: crate::core::ConstraintStatus::Failure,
117 metric: combined_metric,
118 message,
119 })
120 }
121 }
122 }
123 }
124}
125
126#[derive(Debug, Clone, Default)]
131pub struct ConstraintOptions {
132 pub operator: Option<LogicalOperator>,
134 pub threshold: Option<f64>,
136 pub flags: HashMap<String, bool>,
138 pub options: HashMap<String, String>,
140}
141
142impl ConstraintOptions {
143 pub fn new() -> Self {
145 Self::default()
146 }
147
148 pub fn with_operator(mut self, operator: LogicalOperator) -> Self {
150 self.operator = Some(operator);
151 self
152 }
153
154 pub fn with_threshold(mut self, threshold: f64) -> Self {
156 self.threshold = Some(threshold);
157 self
158 }
159
160 pub fn with_flag(mut self, name: impl Into<String>, value: bool) -> Self {
162 self.flags.insert(name.into(), value);
163 self
164 }
165
166 pub fn with_option(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
168 self.options.insert(name.into(), value.into());
169 self
170 }
171
172 pub fn operator_or(&self, default: LogicalOperator) -> LogicalOperator {
174 self.operator.unwrap_or(default)
175 }
176
177 pub fn threshold_or(&self, default: f64) -> f64 {
179 self.threshold.unwrap_or(default)
180 }
181
182 pub fn flag(&self, name: &str) -> bool {
184 self.flags.get(name).copied().unwrap_or(false)
185 }
186
187 pub fn option(&self, name: &str) -> Option<&str> {
189 self.options.get(name).map(|s| s.as_str())
190 }
191}
192
193#[macro_export]
197macro_rules! impl_unified_constraint {
198 ($constraint:ty, $name:expr) => {
199 #[async_trait]
200 impl Constraint for $constraint {
201 async fn evaluate(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
202 self.evaluate_unified(ctx).await
203 }
204
205 fn name(&self) -> &str {
206 $name
207 }
208
209 fn column(&self) -> Option<&str> {
210 match &self.columns {
211 ColumnSpec::Single(col) => Some(col),
212 ColumnSpec::Multiple(_) => None,
213 }
214 }
215
216 fn metadata(&self) -> ConstraintMetadata {
217 let mut metadata = match &self.columns {
218 ColumnSpec::Single(col) => ConstraintMetadata::for_column(col),
219 ColumnSpec::Multiple(cols) => ConstraintMetadata::for_columns(cols),
220 };
221
222 if let Some(op) = self.logical_operator() {
223 metadata = metadata.with_custom("operator", op.description());
224 }
225
226 metadata
227 }
228 }
229 };
230}
231
232pub struct UnifiedCompletenessBase {
237 pub columns: ColumnSpec,
238 pub threshold: f64,
239 pub operator: LogicalOperator,
240}
241
242impl UnifiedCompletenessBase {
243 pub async fn evaluate_completeness(
245 &self,
246 ctx: &SessionContext,
247 column: &str,
248 ) -> Result<(f64, i64, i64)> {
249 let validation_ctx = current_validation_context();
252
253 let table_name = validation_ctx.table_name();
254
255 let sql = format!(
256 "SELECT
257 COUNT(*) as total_count,
258 COUNT({column}) as non_null_count
259 FROM {table_name}"
260 );
261
262 let df = ctx.sql(&sql).await?;
263 let batches = df.collect().await?;
264
265 if batches.is_empty() || batches[0].num_rows() == 0 {
266 return Ok((0.0, 0, 0));
267 }
268
269 let batch = &batches[0];
270 let total_count = batch
271 .column(0)
272 .as_any()
273 .downcast_ref::<arrow::array::Int64Array>()
274 .ok_or_else(|| TermError::Internal("Failed to extract total count".to_string()))?
275 .value(0);
276
277 let non_null_count = batch
278 .column(1)
279 .as_any()
280 .downcast_ref::<arrow::array::Int64Array>()
281 .ok_or_else(|| TermError::Internal("Failed to extract non-null count".to_string()))?
282 .value(0);
283
284 let completeness = if total_count > 0 {
285 non_null_count as f64 / total_count as f64
286 } else {
287 0.0
288 };
289
290 Ok((completeness, non_null_count, total_count))
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297
298 #[test]
299 fn test_constraint_options_builder() {
300 let options = ConstraintOptions::new()
301 .with_operator(LogicalOperator::Any)
302 .with_threshold(0.95)
303 .with_flag("case_sensitive", true)
304 .with_option("pattern", "[A-Z]+");
305
306 assert_eq!(
307 options.operator_or(LogicalOperator::All),
308 LogicalOperator::Any
309 );
310 assert_eq!(options.threshold_or(1.0), 0.95);
311 assert!(options.flag("case_sensitive"));
312 assert!(!options.flag("unknown_flag"));
313 assert_eq!(options.option("pattern"), Some("[A-Z]+"));
314 assert_eq!(options.option("unknown"), None);
315 }
316
317 #[test]
318 fn test_column_spec_with_options() {
319 let single = ColumnSpec::Single("user_id".to_string());
320 let multiple = ColumnSpec::Multiple(vec!["email".to_string(), "phone".to_string()]);
321
322 let options = ConstraintOptions::new().with_operator(LogicalOperator::Any);
323
324 assert_eq!(single.len(), 1);
326 assert_eq!(multiple.len(), 2);
327 assert_eq!(
328 options.operator_or(LogicalOperator::All),
329 LogicalOperator::Any
330 );
331 }
332}