term_guard/core/
unified.rs1use super::{ColumnSpec, Constraint, ConstraintResult, LogicalOperator};
7use crate::prelude::*;
8use async_trait::async_trait;
9use datafusion::prelude::*;
10use std::collections::HashMap;
11
12#[async_trait]
17pub trait UnifiedConstraint: Constraint {
18 fn column_spec(&self) -> &ColumnSpec;
20
21 fn logical_operator(&self) -> Option<LogicalOperator> {
23 None
24 }
25
26 fn options(&self) -> HashMap<String, String> {
28 HashMap::new()
29 }
30
31 async fn evaluate_column(&self, ctx: &SessionContext, column: &str)
37 -> Result<ConstraintResult>;
38
39 async fn evaluate_unified(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
41 let columns = self.column_spec().as_vec();
42
43 match columns.len() {
44 0 => Ok(ConstraintResult::skipped("No columns specified")),
45 1 => {
46 self.evaluate_column(ctx, columns[0]).await
48 }
49 _ => {
50 let mut results = Vec::new();
52 let mut metrics = Vec::new();
53
54 for column in &columns {
55 let result = self.evaluate_column(ctx, column).await?;
56 results.push((column.to_string(), result.status.is_success()));
57 if let Some(metric) = result.metric {
58 metrics.push(metric);
59 }
60 }
61
62 let operator = self.logical_operator().unwrap_or(LogicalOperator::All);
64 let bools: Vec<bool> = results.iter().map(|(_, b)| *b).collect();
65 let combined_result = operator.evaluate(&bools);
66
67 let combined_metric = if metrics.is_empty() {
69 None
70 } else {
71 Some(metrics.iter().sum::<f64>() / metrics.len() as f64)
72 };
73
74 let message = if combined_result {
76 match operator {
77 LogicalOperator::All => Some(format!(
78 "All {} columns satisfy the constraint",
79 columns.len()
80 )),
81 LogicalOperator::Any => {
82 let passed = results
83 .iter()
84 .filter(|(_, b)| *b)
85 .map(|(c, _)| c.as_str())
86 .collect::<Vec<_>>();
87 Some(format!(
88 "Columns {} satisfy the constraint",
89 passed.join(", ")
90 ))
91 }
92 _ => None,
93 }
94 } else {
95 let failed = results
96 .iter()
97 .filter(|(_, b)| !*b)
98 .map(|(c, _)| c.as_str())
99 .collect::<Vec<_>>();
100 Some(format!(
101 "Constraint failed for columns: {}. Required: {}",
102 failed.join(", "),
103 operator.description()
104 ))
105 };
106
107 if combined_result {
108 Ok(ConstraintResult {
109 status: crate::core::ConstraintStatus::Success,
110 metric: combined_metric,
111 message,
112 })
113 } else {
114 Ok(ConstraintResult {
115 status: crate::core::ConstraintStatus::Failure,
116 metric: combined_metric,
117 message,
118 })
119 }
120 }
121 }
122 }
123}
124
125#[derive(Debug, Clone, Default)]
130pub struct ConstraintOptions {
131 pub operator: Option<LogicalOperator>,
133 pub threshold: Option<f64>,
135 pub flags: HashMap<String, bool>,
137 pub options: HashMap<String, String>,
139}
140
141impl ConstraintOptions {
142 pub fn new() -> Self {
144 Self::default()
145 }
146
147 pub fn with_operator(mut self, operator: LogicalOperator) -> Self {
149 self.operator = Some(operator);
150 self
151 }
152
153 pub fn with_threshold(mut self, threshold: f64) -> Self {
155 self.threshold = Some(threshold);
156 self
157 }
158
159 pub fn with_flag(mut self, name: impl Into<String>, value: bool) -> Self {
161 self.flags.insert(name.into(), value);
162 self
163 }
164
165 pub fn with_option(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
167 self.options.insert(name.into(), value.into());
168 self
169 }
170
171 pub fn operator_or(&self, default: LogicalOperator) -> LogicalOperator {
173 self.operator.unwrap_or(default)
174 }
175
176 pub fn threshold_or(&self, default: f64) -> f64 {
178 self.threshold.unwrap_or(default)
179 }
180
181 pub fn flag(&self, name: &str) -> bool {
183 self.flags.get(name).copied().unwrap_or(false)
184 }
185
186 pub fn option(&self, name: &str) -> Option<&str> {
188 self.options.get(name).map(|s| s.as_str())
189 }
190}
191
192#[macro_export]
196macro_rules! impl_unified_constraint {
197 ($constraint:ty, $name:expr) => {
198 #[async_trait]
199 impl Constraint for $constraint {
200 async fn evaluate(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
201 self.evaluate_unified(ctx).await
202 }
203
204 fn name(&self) -> &str {
205 $name
206 }
207
208 fn column(&self) -> Option<&str> {
209 match &self.columns {
210 ColumnSpec::Single(col) => Some(col),
211 ColumnSpec::Multiple(_) => None,
212 }
213 }
214
215 fn metadata(&self) -> ConstraintMetadata {
216 let mut metadata = match &self.columns {
217 ColumnSpec::Single(col) => ConstraintMetadata::for_column(col),
218 ColumnSpec::Multiple(cols) => ConstraintMetadata::for_columns(cols),
219 };
220
221 if let Some(op) = self.logical_operator() {
222 metadata = metadata.with_custom("operator", op.description());
223 }
224
225 metadata
226 }
227 }
228 };
229}
230
231pub struct UnifiedCompletenessBase {
236 pub columns: ColumnSpec,
237 pub threshold: f64,
238 pub operator: LogicalOperator,
239}
240
241impl UnifiedCompletenessBase {
242 pub async fn evaluate_completeness(
244 &self,
245 ctx: &SessionContext,
246 column: &str,
247 ) -> Result<(f64, i64, i64)> {
248 let sql = format!(
249 "SELECT
250 COUNT(*) as total_count,
251 COUNT({column}) as non_null_count
252 FROM data"
253 );
254
255 let df = ctx.sql(&sql).await?;
256 let batches = df.collect().await?;
257
258 if batches.is_empty() || batches[0].num_rows() == 0 {
259 return Ok((0.0, 0, 0));
260 }
261
262 let batch = &batches[0];
263 let total_count = batch
264 .column(0)
265 .as_any()
266 .downcast_ref::<arrow::array::Int64Array>()
267 .ok_or_else(|| TermError::Internal("Failed to extract total count".to_string()))?
268 .value(0);
269
270 let non_null_count = batch
271 .column(1)
272 .as_any()
273 .downcast_ref::<arrow::array::Int64Array>()
274 .ok_or_else(|| TermError::Internal("Failed to extract non-null count".to_string()))?
275 .value(0);
276
277 let completeness = if total_count > 0 {
278 non_null_count as f64 / total_count as f64
279 } else {
280 0.0
281 };
282
283 Ok((completeness, non_null_count, total_count))
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290
291 #[test]
292 fn test_constraint_options_builder() {
293 let options = ConstraintOptions::new()
294 .with_operator(LogicalOperator::Any)
295 .with_threshold(0.95)
296 .with_flag("case_sensitive", true)
297 .with_option("pattern", "[A-Z]+");
298
299 assert_eq!(
300 options.operator_or(LogicalOperator::All),
301 LogicalOperator::Any
302 );
303 assert_eq!(options.threshold_or(1.0), 0.95);
304 assert!(options.flag("case_sensitive"));
305 assert!(!options.flag("unknown_flag"));
306 assert_eq!(options.option("pattern"), Some("[A-Z]+"));
307 assert_eq!(options.option("unknown"), None);
308 }
309
310 #[test]
311 fn test_column_spec_with_options() {
312 let single = ColumnSpec::Single("user_id".to_string());
313 let multiple = ColumnSpec::Multiple(vec!["email".to_string(), "phone".to_string()]);
314
315 let options = ConstraintOptions::new().with_operator(LogicalOperator::Any);
316
317 assert_eq!(single.len(), 1);
319 assert_eq!(multiple.len(), 2);
320 assert_eq!(
321 options.operator_or(LogicalOperator::All),
322 LogicalOperator::Any
323 );
324 }
325}