1use crate::core::{Constraint, ConstraintResult, ConstraintStatus};
37use crate::error::{Result, TermError};
38use crate::security::SqlSecurity;
39use arrow::array::{Array, Int64Array};
40use async_trait::async_trait;
41use datafusion::prelude::*;
42use serde::{Deserialize, Serialize};
43use tracing::{debug, instrument, warn};
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct TemporalOrderingConstraint {
57 table_name: String,
59 validation_type: TemporalValidationType,
61 allow_nulls: bool,
63 tolerance_seconds: i64,
65 max_violations_reported: usize,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub enum TemporalValidationType {
72 BeforeAfter {
74 before_column: String,
75 after_column: String,
76 allow_equal: bool,
77 },
78 BusinessHours {
80 timestamp_column: String,
81 start_time: String, end_time: String, weekdays_only: bool,
84 timezone: Option<String>,
85 },
86 DateRange {
88 timestamp_column: String,
89 min_date: Option<String>, max_date: Option<String>, },
92 MaxTimeGap {
94 timestamp_column: String,
95 group_by_column: Option<String>,
96 max_gap_seconds: i64,
97 },
98 EventSequence {
100 event_column: String,
101 timestamp_column: String,
102 expected_sequence: Vec<String>,
103 },
104}
105
106impl TemporalOrderingConstraint {
107 pub fn new(table_name: impl Into<String>) -> Self {
121 Self {
122 table_name: table_name.into(),
123 validation_type: TemporalValidationType::BeforeAfter {
124 before_column: String::new(),
125 after_column: String::new(),
126 allow_equal: false,
127 },
128 allow_nulls: false,
129 tolerance_seconds: 0,
130 max_violations_reported: 100,
131 }
132 }
133
134 pub fn before_after(
141 mut self,
142 before_column: impl Into<String>,
143 after_column: impl Into<String>,
144 ) -> Self {
145 self.validation_type = TemporalValidationType::BeforeAfter {
146 before_column: before_column.into(),
147 after_column: after_column.into(),
148 allow_equal: false,
149 };
150 self
151 }
152
153 pub fn before_or_equal(
155 mut self,
156 before_column: impl Into<String>,
157 after_column: impl Into<String>,
158 ) -> Self {
159 self.validation_type = TemporalValidationType::BeforeAfter {
160 before_column: before_column.into(),
161 after_column: after_column.into(),
162 allow_equal: true,
163 };
164 self
165 }
166
167 pub fn business_hours(
175 mut self,
176 timestamp_column: impl Into<String>,
177 start_time: impl Into<String>,
178 end_time: impl Into<String>,
179 ) -> Self {
180 self.validation_type = TemporalValidationType::BusinessHours {
181 timestamp_column: timestamp_column.into(),
182 start_time: start_time.into(),
183 end_time: end_time.into(),
184 weekdays_only: false,
185 timezone: None,
186 };
187 self
188 }
189
190 pub fn weekdays_only(mut self, weekdays_only: bool) -> Self {
192 if let TemporalValidationType::BusinessHours {
193 timestamp_column,
194 start_time,
195 end_time,
196 timezone,
197 ..
198 } = self.validation_type
199 {
200 self.validation_type = TemporalValidationType::BusinessHours {
201 timestamp_column,
202 start_time,
203 end_time,
204 weekdays_only,
205 timezone,
206 };
207 }
208 self
209 }
210
211 pub fn with_timezone(mut self, timezone: impl Into<String>) -> Self {
213 if let TemporalValidationType::BusinessHours {
214 timestamp_column,
215 start_time,
216 end_time,
217 weekdays_only,
218 ..
219 } = self.validation_type
220 {
221 self.validation_type = TemporalValidationType::BusinessHours {
222 timestamp_column,
223 start_time,
224 end_time,
225 weekdays_only,
226 timezone: Some(timezone.into()),
227 };
228 }
229 self
230 }
231
232 pub fn date_range(
234 mut self,
235 timestamp_column: impl Into<String>,
236 min_date: Option<impl Into<String>>,
237 max_date: Option<impl Into<String>>,
238 ) -> Self {
239 self.validation_type = TemporalValidationType::DateRange {
240 timestamp_column: timestamp_column.into(),
241 min_date: min_date.map(Into::into),
242 max_date: max_date.map(Into::into),
243 };
244 self
245 }
246
247 pub fn max_time_gap(
249 mut self,
250 timestamp_column: impl Into<String>,
251 max_gap_seconds: i64,
252 ) -> Self {
253 self.validation_type = TemporalValidationType::MaxTimeGap {
254 timestamp_column: timestamp_column.into(),
255 group_by_column: None,
256 max_gap_seconds,
257 };
258 self
259 }
260
261 pub fn group_by(mut self, column: impl Into<String>) -> Self {
263 if let TemporalValidationType::MaxTimeGap {
264 timestamp_column,
265 max_gap_seconds,
266 ..
267 } = self.validation_type
268 {
269 self.validation_type = TemporalValidationType::MaxTimeGap {
270 timestamp_column,
271 group_by_column: Some(column.into()),
272 max_gap_seconds,
273 };
274 }
275 self
276 }
277
278 pub fn allow_nulls(mut self, allow: bool) -> Self {
280 self.allow_nulls = allow;
281 self
282 }
283
284 pub fn tolerance_seconds(mut self, seconds: i64) -> Self {
286 self.tolerance_seconds = seconds;
287 self
288 }
289
290 fn validate_identifiers(&self) -> Result<()> {
292 SqlSecurity::validate_identifier(&self.table_name)?;
293
294 match &self.validation_type {
295 TemporalValidationType::BeforeAfter {
296 before_column,
297 after_column,
298 ..
299 } => {
300 SqlSecurity::validate_identifier(before_column)?;
301 SqlSecurity::validate_identifier(after_column)?;
302 }
303 TemporalValidationType::BusinessHours {
304 timestamp_column, ..
305 } => {
306 SqlSecurity::validate_identifier(timestamp_column)?;
307 }
308 TemporalValidationType::DateRange {
309 timestamp_column, ..
310 } => {
311 SqlSecurity::validate_identifier(timestamp_column)?;
312 }
313 TemporalValidationType::MaxTimeGap {
314 timestamp_column,
315 group_by_column,
316 ..
317 } => {
318 SqlSecurity::validate_identifier(timestamp_column)?;
319 if let Some(group_col) = group_by_column {
320 SqlSecurity::validate_identifier(group_col)?;
321 }
322 }
323 TemporalValidationType::EventSequence {
324 event_column,
325 timestamp_column,
326 ..
327 } => {
328 SqlSecurity::validate_identifier(event_column)?;
329 SqlSecurity::validate_identifier(timestamp_column)?;
330 }
331 }
332
333 Ok(())
334 }
335
336 fn generate_validation_query(&self) -> Result<String> {
338 self.validate_identifiers()?;
339
340 let _null_condition = if self.allow_nulls {
341 String::new()
342 } else {
343 " AND {} IS NOT NULL AND {} IS NOT NULL".to_string()
344 };
345
346 let sql = match &self.validation_type {
347 TemporalValidationType::BeforeAfter {
348 before_column,
349 after_column,
350 allow_equal,
351 } => {
352 let comparison = if *allow_equal {
353 if self.tolerance_seconds > 0 {
354 format!(
355 "{after_column} > {before_column} + INTERVAL '{} seconds'",
356 self.tolerance_seconds
357 )
358 } else {
359 format!("{after_column} > {before_column}")
360 }
361 } else if self.tolerance_seconds > 0 {
362 format!(
363 "{after_column} >= {before_column} + INTERVAL '{} seconds'",
364 self.tolerance_seconds
365 )
366 } else {
367 format!("{after_column} >= {before_column}")
368 };
369
370 let null_clause = if self.allow_nulls {
371 String::new()
372 } else {
373 format!(" AND {before_column} IS NOT NULL AND {after_column} IS NOT NULL")
374 };
375
376 format!(
377 "SELECT
378 COUNT(*) as total_rows,
379 SUM(CASE WHEN {comparison} THEN 0 ELSE 1 END) as violations
380 FROM {}
381 WHERE 1=1{null_clause}",
382 self.table_name
383 )
384 }
385 TemporalValidationType::BusinessHours {
386 timestamp_column,
387 start_time,
388 end_time,
389 weekdays_only,
390 ..
391 } => {
392 let time_check = format!(
393 "CAST({timestamp_column} AS TIME) BETWEEN TIME '{start_time}:00' AND TIME '{end_time}:00'"
394 );
395
396 let weekday_check = if *weekdays_only {
397 format!(" AND EXTRACT(DOW FROM {timestamp_column}) BETWEEN 1 AND 5")
398 } else {
399 String::new()
400 };
401
402 let null_clause = if self.allow_nulls {
403 String::new()
404 } else {
405 format!(" AND {timestamp_column} IS NOT NULL")
406 };
407
408 format!(
409 "SELECT
410 COUNT(*) as total_rows,
411 SUM(CASE WHEN {time_check} THEN 0 ELSE 1 END) as violations
412 FROM {}
413 WHERE 1=1{weekday_check}{null_clause}",
414 self.table_name
415 )
416 }
417 TemporalValidationType::DateRange {
418 timestamp_column,
419 min_date,
420 max_date,
421 } => {
422 let mut conditions = Vec::new();
423
424 if let Some(min) = min_date {
425 conditions.push(format!("{timestamp_column} >= TIMESTAMP '{min}'"));
426 }
427 if let Some(max) = max_date {
428 conditions.push(format!("{timestamp_column} <= TIMESTAMP '{max}'"));
429 }
430
431 if conditions.is_empty() {
432 return Err(TermError::constraint_evaluation(
433 "temporal_ordering",
434 "DateRange validation requires at least min_date or max_date",
435 ));
436 }
437
438 let range_check = conditions.join(" AND ");
439 let null_clause = if self.allow_nulls {
440 String::new()
441 } else {
442 format!(" AND {timestamp_column} IS NOT NULL")
443 };
444
445 format!(
446 "SELECT
447 COUNT(*) as total_rows,
448 SUM(CASE WHEN {range_check} THEN 0 ELSE 1 END) as violations
449 FROM {}
450 WHERE 1=1{null_clause}",
451 self.table_name
452 )
453 }
454 TemporalValidationType::MaxTimeGap {
455 timestamp_column,
456 group_by_column,
457 max_gap_seconds,
458 } => {
459 let partition_clause = if let Some(group_col) = group_by_column {
460 format!("PARTITION BY {group_col}")
461 } else {
462 String::new()
463 };
464
465 format!(
466 "WITH time_gaps AS (
467 SELECT
468 {timestamp_column},
469 LAG({timestamp_column}) OVER ({partition_clause} ORDER BY {timestamp_column}) as prev_timestamp,
470 EXTRACT(EPOCH FROM {timestamp_column} - LAG({timestamp_column}) OVER ({partition_clause} ORDER BY {timestamp_column})) as gap_seconds
471 FROM {}
472 WHERE {timestamp_column} IS NOT NULL
473 )
474 SELECT
475 COUNT(*) as total_gaps,
476 SUM(CASE WHEN gap_seconds > {max_gap_seconds} THEN 1 ELSE 0 END) as violations
477 FROM time_gaps
478 WHERE prev_timestamp IS NOT NULL",
479 self.table_name
480 )
481 }
482 TemporalValidationType::EventSequence { .. } => {
483 return Err(TermError::constraint_evaluation(
485 "temporal_ordering",
486 "Event sequence validation not yet implemented",
487 ));
488 }
489 };
490
491 debug!("Generated temporal validation query: {}", sql);
492 Ok(sql)
493 }
494}
495
496#[async_trait]
497impl Constraint for TemporalOrderingConstraint {
498 #[instrument(skip(self, ctx), fields(constraint = "temporal_ordering"))]
499 async fn evaluate(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
500 debug!(
501 "Evaluating temporal ordering constraint on table: {}",
502 self.table_name
503 );
504
505 let sql = self.generate_validation_query()?;
507 let df = ctx.sql(&sql).await.map_err(|e| {
508 TermError::constraint_evaluation(
509 "temporal_ordering",
510 format!("Temporal validation query failed: {e}"),
511 )
512 })?;
513
514 let batches = df.collect().await.map_err(|e| {
515 TermError::constraint_evaluation(
516 "temporal_ordering",
517 format!("Failed to collect temporal validation results: {e}"),
518 )
519 })?;
520
521 if batches.is_empty() || batches[0].num_rows() == 0 {
522 return Ok(ConstraintResult::success());
523 }
524
525 let batch = &batches[0];
527 let total_rows = batch
528 .column(0)
529 .as_any()
530 .downcast_ref::<Int64Array>()
531 .ok_or_else(|| {
532 TermError::constraint_evaluation(
533 "temporal_ordering",
534 "Invalid total rows column type",
535 )
536 })?
537 .value(0);
538
539 let violations = batch
540 .column(1)
541 .as_any()
542 .downcast_ref::<Int64Array>()
543 .ok_or_else(|| {
544 TermError::constraint_evaluation(
545 "temporal_ordering",
546 "Invalid violations column type",
547 )
548 })?
549 .value(0);
550
551 if violations == 0 {
552 debug!("Temporal ordering constraint passed: no violations found");
553 return Ok(ConstraintResult::success_with_metric(1.0));
554 }
555
556 let compliance_rate = if total_rows > 0 {
558 (total_rows - violations) as f64 / total_rows as f64
559 } else {
560 1.0
561 };
562
563 let message = match &self.validation_type {
565 TemporalValidationType::BeforeAfter {
566 before_column,
567 after_column,
568 ..
569 } => format!(
570 "Temporal ordering violation: {violations} records where '{before_column}' is not before '{after_column}' ({:.2}% compliance)",
571 compliance_rate * 100.0
572 ),
573 TemporalValidationType::BusinessHours {
574 timestamp_column, ..
575 } => format!(
576 "Business hours violation: {violations} records with '{timestamp_column}' outside business hours ({:.2}% compliance)",
577 compliance_rate * 100.0
578 ),
579 TemporalValidationType::DateRange {
580 timestamp_column, ..
581 } => format!(
582 "Date range violation: {violations} records with '{timestamp_column}' outside valid range ({:.2}% compliance)",
583 compliance_rate * 100.0
584 ),
585 TemporalValidationType::MaxTimeGap { .. } => format!(
586 "Time gap violation: {violations} gaps exceed maximum allowed ({:.2}% compliance)",
587 compliance_rate * 100.0
588 ),
589 _ => format!(
590 "Temporal validation failed: {violations} violations ({:.2}% compliance)",
591 compliance_rate * 100.0
592 ),
593 };
594
595 warn!("{}", message);
596
597 Ok(ConstraintResult {
598 status: ConstraintStatus::Failure,
599 metric: Some(compliance_rate),
600 message: Some(message),
601 })
602 }
603
604 fn name(&self) -> &str {
605 "temporal_ordering"
606 }
607}
608
609#[cfg(test)]
610mod tests {
611 use super::*;
612 use crate::test_utils::create_test_context;
613
614 #[tokio::test]
615 async fn test_before_after_success() -> Result<()> {
616 let ctx = create_test_context().await?;
617
618 ctx.sql(
620 "CREATE TABLE events_ordered (id BIGINT, created_at TIMESTAMP, processed_at TIMESTAMP)",
621 )
622 .await?
623 .collect()
624 .await?;
625 ctx.sql(
626 "INSERT INTO events_ordered VALUES
627 (1, '2024-01-01 10:00:00', '2024-01-01 10:05:00'),
628 (2, '2024-01-01 11:00:00', '2024-01-01 11:10:00')",
629 )
630 .await?
631 .collect()
632 .await?;
633
634 let constraint = TemporalOrderingConstraint::new("events_ordered")
635 .before_after("created_at", "processed_at");
636
637 let result = constraint.evaluate(&ctx).await?;
638 assert_eq!(result.status, ConstraintStatus::Success);
639
640 Ok(())
641 }
642
643 #[tokio::test]
644 async fn test_before_after_violation() -> Result<()> {
645 let ctx = create_test_context().await?;
646
647 ctx.sql("CREATE TABLE events_violated (id BIGINT, created_at TIMESTAMP, processed_at TIMESTAMP)")
649 .await?
650 .collect()
651 .await?;
652 ctx.sql(
653 "INSERT INTO events_violated VALUES
654 (1, '2024-01-01 10:00:00', '2024-01-01 09:00:00'),
655 (2, '2024-01-01 11:00:00', '2024-01-01 11:10:00')",
656 )
657 .await?
658 .collect()
659 .await?;
660
661 let constraint = TemporalOrderingConstraint::new("events_violated")
662 .before_after("created_at", "processed_at");
663
664 let result = constraint.evaluate(&ctx).await?;
665 assert_eq!(result.status, ConstraintStatus::Failure);
666 assert!(result.message.is_some());
667
668 Ok(())
669 }
670
671 #[test]
672 fn test_constraint_configuration() {
673 let constraint = TemporalOrderingConstraint::new("transactions")
674 .business_hours("timestamp", "09:00", "17:00")
675 .weekdays_only(true)
676 .allow_nulls(true)
677 .tolerance_seconds(60);
678
679 assert_eq!(constraint.table_name, "transactions");
680 assert!(constraint.allow_nulls);
681 assert_eq!(constraint.tolerance_seconds, 60);
682
683 if let TemporalValidationType::BusinessHours { weekdays_only, .. } =
684 constraint.validation_type
685 {
686 assert!(weekdays_only);
687 } else {
688 panic!("Expected BusinessHours validation type");
689 }
690 }
691}