term_guard/constraints/
temporal_ordering.rs

1//! Temporal ordering constraint for time-based validation in Term.
2//!
3//! This module provides temporal ordering validation capabilities for ensuring that timestamps
4//! follow expected chronological relationships, validating event sequences, and checking
5//! business hour compliance.
6//!
7//! # Examples
8//!
9//! ## Basic Temporal Ordering Validation
10//!
11//! ```rust
12//! use term_guard::constraints::TemporalOrderingConstraint;
13//! use term_guard::core::{Check, Level};
14//!
15//! // Validate that created_at always comes before processed_at
16//! let constraint = TemporalOrderingConstraint::new("events")
17//!     .before_after("created_at", "processed_at");
18//!
19//! let check = Check::builder("temporal_consistency")
20//!     .level(Level::Error)
21//!     .with_constraint(constraint)
22//!     .build();
23//! ```
24//!
25//! ## Business Hours Validation
26//!
27//! ```rust
28//! use term_guard::constraints::TemporalOrderingConstraint;
29//!
30//! // Validate that transactions occur during business hours
31//! let constraint = TemporalOrderingConstraint::new("transactions")
32//!     .business_hours("timestamp", "09:00", "17:00")
33//!     .weekdays_only(true);
34//! ```
35
36use crate::core::{Constraint, ConstraintResult, ConstraintStatus};
37use crate::error::{Result, TermError};
38use crate::security::SqlSecurity;
39use arrow::array::{Array, Int64Array};
40use async_trait::async_trait;
41use datafusion::prelude::*;
42use serde::{Deserialize, Serialize};
43use tracing::{debug, instrument, warn};
44
45/// Temporal ordering constraint for validating time-based relationships.
46///
47/// This constraint ensures that temporal data follows expected patterns, including:
48/// - Chronological ordering between columns
49/// - Business hour compliance
50/// - Date range validation
51/// - Event sequence validation
52/// - Time gap analysis
53///
54/// The constraint supports various temporal data types and timezone considerations.
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct TemporalOrderingConstraint {
57    /// Table name to validate
58    table_name: String,
59    /// Type of temporal validation to perform
60    validation_type: TemporalValidationType,
61    /// Whether to allow null timestamps
62    allow_nulls: bool,
63    /// Tolerance for temporal comparisons (in seconds)
64    tolerance_seconds: i64,
65    /// Maximum number of violation examples to report
66    max_violations_reported: usize,
67}
68
69/// Type of temporal validation to perform
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub enum TemporalValidationType {
72    /// Validate that one column comes before another
73    BeforeAfter {
74        before_column: String,
75        after_column: String,
76        allow_equal: bool,
77    },
78    /// Validate that timestamps fall within business hours
79    BusinessHours {
80        timestamp_column: String,
81        start_time: String, // Format: "HH:MM"
82        end_time: String,   // Format: "HH:MM"
83        weekdays_only: bool,
84        timezone: Option<String>,
85    },
86    /// Validate that timestamps are within a specific date range
87    DateRange {
88        timestamp_column: String,
89        min_date: Option<String>, // ISO format
90        max_date: Option<String>, // ISO format
91    },
92    /// Validate maximum time gap between sequential events
93    MaxTimeGap {
94        timestamp_column: String,
95        group_by_column: Option<String>,
96        max_gap_seconds: i64,
97    },
98    /// Validate that events follow a specific sequence
99    EventSequence {
100        event_column: String,
101        timestamp_column: String,
102        expected_sequence: Vec<String>,
103    },
104}
105
106impl TemporalOrderingConstraint {
107    /// Create a new temporal ordering constraint.
108    ///
109    /// # Arguments
110    ///
111    /// * `table_name` - Name of the table to validate
112    ///
113    /// # Examples
114    ///
115    /// ```rust
116    /// use term_guard::constraints::TemporalOrderingConstraint;
117    ///
118    /// let constraint = TemporalOrderingConstraint::new("events");
119    /// ```
120    pub fn new(table_name: impl Into<String>) -> Self {
121        Self {
122            table_name: table_name.into(),
123            validation_type: TemporalValidationType::BeforeAfter {
124                before_column: String::new(),
125                after_column: String::new(),
126                allow_equal: false,
127            },
128            allow_nulls: false,
129            tolerance_seconds: 0,
130            max_violations_reported: 100,
131        }
132    }
133
134    /// Validate that one timestamp column comes before another.
135    ///
136    /// # Arguments
137    ///
138    /// * `before_column` - Column that should contain earlier timestamps
139    /// * `after_column` - Column that should contain later timestamps
140    pub fn before_after(
141        mut self,
142        before_column: impl Into<String>,
143        after_column: impl Into<String>,
144    ) -> Self {
145        self.validation_type = TemporalValidationType::BeforeAfter {
146            before_column: before_column.into(),
147            after_column: after_column.into(),
148            allow_equal: false,
149        };
150        self
151    }
152
153    /// Validate that one timestamp column comes before or equals another.
154    pub fn before_or_equal(
155        mut self,
156        before_column: impl Into<String>,
157        after_column: impl Into<String>,
158    ) -> Self {
159        self.validation_type = TemporalValidationType::BeforeAfter {
160            before_column: before_column.into(),
161            after_column: after_column.into(),
162            allow_equal: true,
163        };
164        self
165    }
166
167    /// Validate that timestamps fall within business hours.
168    ///
169    /// # Arguments
170    ///
171    /// * `timestamp_column` - Column containing timestamps to validate
172    /// * `start_time` - Start of business hours (format: "HH:MM")
173    /// * `end_time` - End of business hours (format: "HH:MM")
174    pub fn business_hours(
175        mut self,
176        timestamp_column: impl Into<String>,
177        start_time: impl Into<String>,
178        end_time: impl Into<String>,
179    ) -> Self {
180        self.validation_type = TemporalValidationType::BusinessHours {
181            timestamp_column: timestamp_column.into(),
182            start_time: start_time.into(),
183            end_time: end_time.into(),
184            weekdays_only: false,
185            timezone: None,
186        };
187        self
188    }
189
190    /// Set whether business hours validation should only apply to weekdays.
191    pub fn weekdays_only(mut self, weekdays_only: bool) -> Self {
192        if let TemporalValidationType::BusinessHours {
193            timestamp_column,
194            start_time,
195            end_time,
196            timezone,
197            ..
198        } = self.validation_type
199        {
200            self.validation_type = TemporalValidationType::BusinessHours {
201                timestamp_column,
202                start_time,
203                end_time,
204                weekdays_only,
205                timezone,
206            };
207        }
208        self
209    }
210
211    /// Set the timezone for business hours validation.
212    pub fn with_timezone(mut self, timezone: impl Into<String>) -> Self {
213        if let TemporalValidationType::BusinessHours {
214            timestamp_column,
215            start_time,
216            end_time,
217            weekdays_only,
218            ..
219        } = self.validation_type
220        {
221            self.validation_type = TemporalValidationType::BusinessHours {
222                timestamp_column,
223                start_time,
224                end_time,
225                weekdays_only,
226                timezone: Some(timezone.into()),
227            };
228        }
229        self
230    }
231
232    /// Validate that timestamps are within a specific date range.
233    pub fn date_range(
234        mut self,
235        timestamp_column: impl Into<String>,
236        min_date: Option<impl Into<String>>,
237        max_date: Option<impl Into<String>>,
238    ) -> Self {
239        self.validation_type = TemporalValidationType::DateRange {
240            timestamp_column: timestamp_column.into(),
241            min_date: min_date.map(Into::into),
242            max_date: max_date.map(Into::into),
243        };
244        self
245    }
246
247    /// Validate maximum time gap between sequential events.
248    pub fn max_time_gap(
249        mut self,
250        timestamp_column: impl Into<String>,
251        max_gap_seconds: i64,
252    ) -> Self {
253        self.validation_type = TemporalValidationType::MaxTimeGap {
254            timestamp_column: timestamp_column.into(),
255            group_by_column: None,
256            max_gap_seconds,
257        };
258        self
259    }
260
261    /// Set grouping column for time gap validation.
262    pub fn group_by(mut self, column: impl Into<String>) -> Self {
263        if let TemporalValidationType::MaxTimeGap {
264            timestamp_column,
265            max_gap_seconds,
266            ..
267        } = self.validation_type
268        {
269            self.validation_type = TemporalValidationType::MaxTimeGap {
270                timestamp_column,
271                group_by_column: Some(column.into()),
272                max_gap_seconds,
273            };
274        }
275        self
276    }
277
278    /// Set whether to allow null timestamps.
279    pub fn allow_nulls(mut self, allow: bool) -> Self {
280        self.allow_nulls = allow;
281        self
282    }
283
284    /// Set tolerance for temporal comparisons in seconds.
285    pub fn tolerance_seconds(mut self, seconds: i64) -> Self {
286        self.tolerance_seconds = seconds;
287        self
288    }
289
290    /// Validate identifiers for SQL security.
291    fn validate_identifiers(&self) -> Result<()> {
292        SqlSecurity::validate_identifier(&self.table_name)?;
293
294        match &self.validation_type {
295            TemporalValidationType::BeforeAfter {
296                before_column,
297                after_column,
298                ..
299            } => {
300                SqlSecurity::validate_identifier(before_column)?;
301                SqlSecurity::validate_identifier(after_column)?;
302            }
303            TemporalValidationType::BusinessHours {
304                timestamp_column, ..
305            } => {
306                SqlSecurity::validate_identifier(timestamp_column)?;
307            }
308            TemporalValidationType::DateRange {
309                timestamp_column, ..
310            } => {
311                SqlSecurity::validate_identifier(timestamp_column)?;
312            }
313            TemporalValidationType::MaxTimeGap {
314                timestamp_column,
315                group_by_column,
316                ..
317            } => {
318                SqlSecurity::validate_identifier(timestamp_column)?;
319                if let Some(group_col) = group_by_column {
320                    SqlSecurity::validate_identifier(group_col)?;
321                }
322            }
323            TemporalValidationType::EventSequence {
324                event_column,
325                timestamp_column,
326                ..
327            } => {
328                SqlSecurity::validate_identifier(event_column)?;
329                SqlSecurity::validate_identifier(timestamp_column)?;
330            }
331        }
332
333        Ok(())
334    }
335
336    /// Generate SQL query for temporal validation.
337    fn generate_validation_query(&self) -> Result<String> {
338        self.validate_identifiers()?;
339
340        let _null_condition = if self.allow_nulls {
341            String::new()
342        } else {
343            " AND {} IS NOT NULL AND {} IS NOT NULL".to_string()
344        };
345
346        let sql = match &self.validation_type {
347            TemporalValidationType::BeforeAfter {
348                before_column,
349                after_column,
350                allow_equal,
351            } => {
352                let comparison = if *allow_equal {
353                    if self.tolerance_seconds > 0 {
354                        format!(
355                            "{after_column} > {before_column} + INTERVAL '{} seconds'",
356                            self.tolerance_seconds
357                        )
358                    } else {
359                        format!("{after_column} > {before_column}")
360                    }
361                } else if self.tolerance_seconds > 0 {
362                    format!(
363                        "{after_column} >= {before_column} + INTERVAL '{} seconds'",
364                        self.tolerance_seconds
365                    )
366                } else {
367                    format!("{after_column} >= {before_column}")
368                };
369
370                let null_clause = if self.allow_nulls {
371                    String::new()
372                } else {
373                    format!(" AND {before_column} IS NOT NULL AND {after_column} IS NOT NULL")
374                };
375
376                format!(
377                    "SELECT 
378                        COUNT(*) as total_rows,
379                        SUM(CASE WHEN {comparison} THEN 0 ELSE 1 END) as violations
380                     FROM {}
381                     WHERE 1=1{null_clause}",
382                    self.table_name
383                )
384            }
385            TemporalValidationType::BusinessHours {
386                timestamp_column,
387                start_time,
388                end_time,
389                weekdays_only,
390                ..
391            } => {
392                let time_check = format!(
393                    "CAST({timestamp_column} AS TIME) BETWEEN TIME '{start_time}:00' AND TIME '{end_time}:00'"
394                );
395
396                let weekday_check = if *weekdays_only {
397                    format!(" AND EXTRACT(DOW FROM {timestamp_column}) BETWEEN 1 AND 5")
398                } else {
399                    String::new()
400                };
401
402                let null_clause = if self.allow_nulls {
403                    String::new()
404                } else {
405                    format!(" AND {timestamp_column} IS NOT NULL")
406                };
407
408                format!(
409                    "SELECT 
410                        COUNT(*) as total_rows,
411                        SUM(CASE WHEN {time_check} THEN 0 ELSE 1 END) as violations
412                     FROM {}
413                     WHERE 1=1{weekday_check}{null_clause}",
414                    self.table_name
415                )
416            }
417            TemporalValidationType::DateRange {
418                timestamp_column,
419                min_date,
420                max_date,
421            } => {
422                let mut conditions = Vec::new();
423
424                if let Some(min) = min_date {
425                    conditions.push(format!("{timestamp_column} >= TIMESTAMP '{min}'"));
426                }
427                if let Some(max) = max_date {
428                    conditions.push(format!("{timestamp_column} <= TIMESTAMP '{max}'"));
429                }
430
431                if conditions.is_empty() {
432                    return Err(TermError::constraint_evaluation(
433                        "temporal_ordering",
434                        "DateRange validation requires at least min_date or max_date",
435                    ));
436                }
437
438                let range_check = conditions.join(" AND ");
439                let null_clause = if self.allow_nulls {
440                    String::new()
441                } else {
442                    format!(" AND {timestamp_column} IS NOT NULL")
443                };
444
445                format!(
446                    "SELECT 
447                        COUNT(*) as total_rows,
448                        SUM(CASE WHEN {range_check} THEN 0 ELSE 1 END) as violations
449                     FROM {}
450                     WHERE 1=1{null_clause}",
451                    self.table_name
452                )
453            }
454            TemporalValidationType::MaxTimeGap {
455                timestamp_column,
456                group_by_column,
457                max_gap_seconds,
458            } => {
459                let partition_clause = if let Some(group_col) = group_by_column {
460                    format!("PARTITION BY {group_col}")
461                } else {
462                    String::new()
463                };
464
465                format!(
466                    "WITH time_gaps AS (
467                        SELECT 
468                            {timestamp_column},
469                            LAG({timestamp_column}) OVER ({partition_clause} ORDER BY {timestamp_column}) as prev_timestamp,
470                            EXTRACT(EPOCH FROM {timestamp_column} - LAG({timestamp_column}) OVER ({partition_clause} ORDER BY {timestamp_column})) as gap_seconds
471                        FROM {}
472                        WHERE {timestamp_column} IS NOT NULL
473                    )
474                    SELECT 
475                        COUNT(*) as total_gaps,
476                        SUM(CASE WHEN gap_seconds > {max_gap_seconds} THEN 1 ELSE 0 END) as violations
477                    FROM time_gaps
478                    WHERE prev_timestamp IS NOT NULL",
479                    self.table_name
480                )
481            }
482            TemporalValidationType::EventSequence { .. } => {
483                // Event sequence validation is more complex and would need a different approach
484                return Err(TermError::constraint_evaluation(
485                    "temporal_ordering",
486                    "Event sequence validation not yet implemented",
487                ));
488            }
489        };
490
491        debug!("Generated temporal validation query: {}", sql);
492        Ok(sql)
493    }
494}
495
496#[async_trait]
497impl Constraint for TemporalOrderingConstraint {
498    #[instrument(skip(self, ctx), fields(constraint = "temporal_ordering"))]
499    async fn evaluate(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
500        debug!(
501            "Evaluating temporal ordering constraint on table: {}",
502            self.table_name
503        );
504
505        // Generate and execute validation query
506        let sql = self.generate_validation_query()?;
507        let df = ctx.sql(&sql).await.map_err(|e| {
508            TermError::constraint_evaluation(
509                "temporal_ordering",
510                format!("Temporal validation query failed: {e}"),
511            )
512        })?;
513
514        let batches = df.collect().await.map_err(|e| {
515            TermError::constraint_evaluation(
516                "temporal_ordering",
517                format!("Failed to collect temporal validation results: {e}"),
518            )
519        })?;
520
521        if batches.is_empty() || batches[0].num_rows() == 0 {
522            return Ok(ConstraintResult::success());
523        }
524
525        // Extract violation counts
526        let batch = &batches[0];
527        let total_rows = batch
528            .column(0)
529            .as_any()
530            .downcast_ref::<Int64Array>()
531            .ok_or_else(|| {
532                TermError::constraint_evaluation(
533                    "temporal_ordering",
534                    "Invalid total rows column type",
535                )
536            })?
537            .value(0);
538
539        let violations = batch
540            .column(1)
541            .as_any()
542            .downcast_ref::<Int64Array>()
543            .ok_or_else(|| {
544                TermError::constraint_evaluation(
545                    "temporal_ordering",
546                    "Invalid violations column type",
547                )
548            })?
549            .value(0);
550
551        if violations == 0 {
552            debug!("Temporal ordering constraint passed: no violations found");
553            return Ok(ConstraintResult::success_with_metric(1.0));
554        }
555
556        // Calculate compliance rate
557        let compliance_rate = if total_rows > 0 {
558            (total_rows - violations) as f64 / total_rows as f64
559        } else {
560            1.0
561        };
562
563        // Generate failure message
564        let message = match &self.validation_type {
565            TemporalValidationType::BeforeAfter {
566                before_column,
567                after_column,
568                ..
569            } => format!(
570                "Temporal ordering violation: {violations} records where '{before_column}' is not before '{after_column}' ({:.2}% compliance)",
571                compliance_rate * 100.0
572            ),
573            TemporalValidationType::BusinessHours {
574                timestamp_column, ..
575            } => format!(
576                "Business hours violation: {violations} records with '{timestamp_column}' outside business hours ({:.2}% compliance)",
577                compliance_rate * 100.0
578            ),
579            TemporalValidationType::DateRange {
580                timestamp_column, ..
581            } => format!(
582                "Date range violation: {violations} records with '{timestamp_column}' outside valid range ({:.2}% compliance)",
583                compliance_rate * 100.0
584            ),
585            TemporalValidationType::MaxTimeGap { .. } => format!(
586                "Time gap violation: {violations} gaps exceed maximum allowed ({:.2}% compliance)",
587                compliance_rate * 100.0
588            ),
589            _ => format!(
590                "Temporal validation failed: {violations} violations ({:.2}% compliance)",
591                compliance_rate * 100.0
592            ),
593        };
594
595        warn!("{}", message);
596
597        Ok(ConstraintResult {
598            status: ConstraintStatus::Failure,
599            metric: Some(compliance_rate),
600            message: Some(message),
601        })
602    }
603
604    fn name(&self) -> &str {
605        "temporal_ordering"
606    }
607}
608
609#[cfg(test)]
610mod tests {
611    use super::*;
612    use crate::test_utils::create_test_context;
613
614    #[tokio::test]
615    async fn test_before_after_success() -> Result<()> {
616        let ctx = create_test_context().await?;
617
618        // Create test table with proper ordering
619        ctx.sql(
620            "CREATE TABLE events_ordered (id BIGINT, created_at TIMESTAMP, processed_at TIMESTAMP)",
621        )
622        .await?
623        .collect()
624        .await?;
625        ctx.sql(
626            "INSERT INTO events_ordered VALUES 
627            (1, '2024-01-01 10:00:00', '2024-01-01 10:05:00'),
628            (2, '2024-01-01 11:00:00', '2024-01-01 11:10:00')",
629        )
630        .await?
631        .collect()
632        .await?;
633
634        let constraint = TemporalOrderingConstraint::new("events_ordered")
635            .before_after("created_at", "processed_at");
636
637        let result = constraint.evaluate(&ctx).await?;
638        assert_eq!(result.status, ConstraintStatus::Success);
639
640        Ok(())
641    }
642
643    #[tokio::test]
644    async fn test_before_after_violation() -> Result<()> {
645        let ctx = create_test_context().await?;
646
647        // Create test table with ordering violations
648        ctx.sql("CREATE TABLE events_violated (id BIGINT, created_at TIMESTAMP, processed_at TIMESTAMP)")
649            .await?
650            .collect()
651            .await?;
652        ctx.sql(
653            "INSERT INTO events_violated VALUES 
654            (1, '2024-01-01 10:00:00', '2024-01-01 09:00:00'),
655            (2, '2024-01-01 11:00:00', '2024-01-01 11:10:00')",
656        )
657        .await?
658        .collect()
659        .await?;
660
661        let constraint = TemporalOrderingConstraint::new("events_violated")
662            .before_after("created_at", "processed_at");
663
664        let result = constraint.evaluate(&ctx).await?;
665        assert_eq!(result.status, ConstraintStatus::Failure);
666        assert!(result.message.is_some());
667
668        Ok(())
669    }
670
671    #[test]
672    fn test_constraint_configuration() {
673        let constraint = TemporalOrderingConstraint::new("transactions")
674            .business_hours("timestamp", "09:00", "17:00")
675            .weekdays_only(true)
676            .allow_nulls(true)
677            .tolerance_seconds(60);
678
679        assert_eq!(constraint.table_name, "transactions");
680        assert!(constraint.allow_nulls);
681        assert_eq!(constraint.tolerance_seconds, 60);
682
683        if let TemporalValidationType::BusinessHours { weekdays_only, .. } =
684            constraint.validation_type
685        {
686            assert!(weekdays_only);
687        } else {
688            panic!("Expected BusinessHours validation type");
689        }
690    }
691}