1use crate::constraints::{
34 CrossTableSumConstraint, ForeignKeyConstraint, JoinCoverageConstraint,
35 TemporalOrderingConstraint,
36};
37use crate::core::{Check, Level};
38use std::sync::Arc;
39
40#[derive(Debug)]
46pub struct MultiTableCheck {
47 name: String,
48 level: Level,
49 description: Option<String>,
50 current_context: Option<TableContext>,
51 constraints: Vec<Arc<dyn crate::core::Constraint>>,
52}
53
54#[derive(Debug, Clone)]
56struct TableContext {
57 left_table: String,
58 right_table: Option<String>,
59 join_columns: Vec<(String, String)>,
60 group_by_columns: Vec<String>,
61}
62
63impl MultiTableCheck {
64 pub fn new(name: impl Into<String>) -> Self {
70 Self {
71 name: name.into(),
72 level: Level::Error,
73 description: None,
74 current_context: None,
75 constraints: Vec::new(),
76 }
77 }
78
79 pub fn level(mut self, level: Level) -> Self {
81 self.level = level;
82 self
83 }
84
85 pub fn description(mut self, description: impl Into<String>) -> Self {
87 self.description = Some(description.into());
88 self
89 }
90
91 pub fn validate_tables(
110 mut self,
111 left_table: impl Into<String>,
112 right_table: impl Into<String>,
113 ) -> Self {
114 self.current_context = Some(TableContext {
115 left_table: left_table.into(),
116 right_table: Some(right_table.into()),
117 join_columns: Vec::new(),
118 group_by_columns: Vec::new(),
119 });
120 self
121 }
122
123 pub fn and_validate_tables(
125 self,
126 left_table: impl Into<String>,
127 right_table: impl Into<String>,
128 ) -> Self {
129 self.validate_tables(left_table, right_table)
130 }
131
132 pub fn validate_temporal(mut self, table: impl Into<String>) -> Self {
138 self.current_context = Some(TableContext {
139 left_table: table.into(),
140 right_table: None,
141 join_columns: Vec::new(),
142 group_by_columns: Vec::new(),
143 });
144 self
145 }
146
147 pub fn and_validate_temporal(self, table: impl Into<String>) -> Self {
149 self.validate_temporal(table)
150 }
151
152 pub fn join_on(
159 mut self,
160 left_column: impl Into<String>,
161 right_column: impl Into<String>,
162 ) -> Self {
163 if let Some(ref mut ctx) = self.current_context {
164 ctx.join_columns
165 .push((left_column.into(), right_column.into()));
166 }
167 self
168 }
169
170 pub fn join_on_multiple(mut self, columns: Vec<(&str, &str)>) -> Self {
172 if let Some(ref mut ctx) = self.current_context {
173 for (left, right) in columns {
174 ctx.join_columns.push((left.to_string(), right.to_string()));
175 }
176 }
177 self
178 }
179
180 pub fn group_by(mut self, column: impl Into<String>) -> Self {
182 if let Some(ref mut ctx) = self.current_context {
183 ctx.group_by_columns.push(column.into());
184 }
185 self
186 }
187
188 pub fn group_by_multiple(mut self, columns: Vec<impl Into<String>>) -> Self {
190 if let Some(ref mut ctx) = self.current_context {
191 ctx.group_by_columns
192 .extend(columns.into_iter().map(Into::into));
193 }
194 self
195 }
196
197 pub fn ensure_referential_integrity(mut self) -> Self {
201 if let Some(ref ctx) = self.current_context {
202 if let (Some(right_table), Some((left_col, right_col))) =
203 (&ctx.right_table, ctx.join_columns.first())
204 {
205 let child_column = format!("{}.{left_col}", ctx.left_table);
206 let parent_column = format!("{right_table}.{right_col}");
207
208 self.constraints.push(Arc::new(ForeignKeyConstraint::new(
209 child_column,
210 parent_column,
211 )));
212 }
213 }
214 self
215 }
216
217 pub fn expect_join_coverage(mut self, min_coverage: f64) -> Self {
223 if let Some(ref ctx) = self.current_context {
224 if let Some(ref right_table) = ctx.right_table {
225 let mut constraint = JoinCoverageConstraint::new(&ctx.left_table, right_table);
226
227 if ctx.join_columns.len() == 1 {
229 let (left_col, right_col) = &ctx.join_columns[0];
230 constraint = constraint.on(left_col, right_col);
231 } else if ctx.join_columns.len() > 1 {
232 let cols: Vec<_> = ctx
233 .join_columns
234 .iter()
235 .map(|(l, r)| (l.as_str(), r.as_str()))
236 .collect();
237 constraint = constraint.on_multiple(cols);
238 }
239
240 constraint = constraint.expect_match_rate(min_coverage);
241 self.constraints.push(Arc::new(constraint));
242 }
243 }
244 self
245 }
246
247 pub fn ensure_sum_consistency(
254 mut self,
255 left_column: impl Into<String>,
256 right_column: impl Into<String>,
257 ) -> Self {
258 if let Some(ref ctx) = self.current_context {
259 if let Some(ref right_table) = ctx.right_table {
260 let left_col = format!("{}.{}", ctx.left_table, left_column.into());
261 let right_col = format!("{right_table}.{}", right_column.into());
262
263 let mut constraint = CrossTableSumConstraint::new(left_col, right_col);
264
265 if !ctx.group_by_columns.is_empty() {
267 constraint = constraint.group_by(ctx.group_by_columns.clone());
268 }
269
270 self.constraints.push(Arc::new(constraint));
271 }
272 }
273 self
274 }
275
276 pub fn with_tolerance(self, _tolerance: f64) -> Self {
282 if let Some(_last) = self.constraints.last() {
284 }
288 self
289 }
290
291 pub fn ensure_ordering(
298 mut self,
299 before_column: impl Into<String>,
300 after_column: impl Into<String>,
301 ) -> Self {
302 if let Some(ref ctx) = self.current_context {
303 let constraint = TemporalOrderingConstraint::new(&ctx.left_table)
304 .before_after(before_column, after_column);
305
306 self.constraints.push(Arc::new(constraint));
307 }
308 self
309 }
310
311 pub fn within_business_hours(
318 mut self,
319 start_time: impl Into<String>,
320 end_time: impl Into<String>,
321 ) -> Self {
322 if let Some(ref ctx) = self.current_context {
323 let constraint = TemporalOrderingConstraint::new(&ctx.left_table).business_hours(
326 "timestamp",
327 start_time,
328 end_time,
329 );
330
331 self.constraints.push(Arc::new(constraint));
332 }
333 self
334 }
335
336 pub fn build(self) -> Check {
338 let mut builder = Check::builder(self.name).level(self.level);
339
340 if let Some(desc) = self.description {
341 builder = builder.description(desc);
342 }
343
344 for constraint in self.constraints {
345 builder = builder.arc_constraint(constraint);
346 }
347
348 builder.build()
349 }
350}
351
352pub trait CheckMultiTableExt {
354 fn multi_table(name: impl Into<String>) -> MultiTableCheck;
356}
357
358impl CheckMultiTableExt for Check {
359 fn multi_table(name: impl Into<String>) -> MultiTableCheck {
360 MultiTableCheck::new(name)
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367
368 #[test]
369 fn test_fluent_builder_basic() {
370 let check = MultiTableCheck::new("test_validation")
371 .level(Level::Warning)
372 .description("Test multi-table validation")
373 .validate_tables("orders", "customers")
374 .join_on("customer_id", "id")
375 .ensure_referential_integrity()
376 .build();
377
378 assert_eq!(check.name(), "test_validation");
379 assert_eq!(check.level(), Level::Warning);
380 assert_eq!(check.description(), Some("Test multi-table validation"));
381 assert_eq!(check.constraints().len(), 1);
382 }
383
384 #[test]
385 fn test_fluent_builder_complex() {
386 let check = MultiTableCheck::new("complex_validation")
387 .validate_tables("orders", "customers")
388 .join_on("customer_id", "id")
389 .ensure_referential_integrity()
390 .expect_join_coverage(0.95)
391 .and_validate_tables("orders", "payments")
392 .join_on("order_id", "order_id")
393 .ensure_sum_consistency("total", "amount")
394 .group_by("customer_id")
395 .and_validate_temporal("events")
396 .ensure_ordering("created_at", "processed_at")
397 .build();
398
399 assert_eq!(check.name(), "complex_validation");
400 assert_eq!(check.constraints().len(), 4);
402 }
403
404 #[test]
405 fn test_composite_keys() {
406 let check = MultiTableCheck::new("composite_key_validation")
407 .validate_tables("order_items", "products")
408 .join_on_multiple(vec![("product_id", "id"), ("variant", "variant_code")])
409 .ensure_referential_integrity()
410 .build();
411
412 assert_eq!(check.constraints().len(), 1);
413 }
414}