term_guard/core/
multi_source.rs

1//! Multi-source validation engine for cross-table validation in Term.
2//!
3//! This module provides the core infrastructure for validating relationships, referential integrity,
4//! and business rules spanning multiple datasets. It addresses the critical market gap where 68% of
5//! data quality issues involve relationships between tables.
6//!
7//! # Architecture
8//!
9//! The `MultiSourceValidator` coordinates:
10//! - Registration of heterogeneous data sources
11//! - Query optimization across joined tables
12//! - Caching of intermediate results
13//! - Performance monitoring and telemetry
14//!
15//! # Examples
16//!
17//! ```rust
18//! use term_guard::core::{MultiSourceValidator, ValidationSuite, Check};
19//! use term_guard::sources::{CsvSource, ParquetSource};
20//! use datafusion::prelude::*;
21//!
22//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
23//! let mut validator = MultiSourceValidator::new();
24//!
25//! // Register multiple data sources
26//! validator.add_source("orders", CsvSource::new("orders.csv")?).await?;
27//! validator.add_source("customers", ParquetSource::new("customers.parquet")?).await?;
28//!
29//! // Create validation suite with cross-table constraints
30//! let suite = ValidationSuite::builder("cross_table_validation")
31//!     .check(
32//!         Check::builder("referential_integrity")
33//!             .foreign_key("orders.customer_id", "customers.id")
34//!             .build()
35//!     )
36//!     .build();
37//!
38//! // Run validation
39//! let results = validator.run_suite(&suite).await?;
40//! # Ok(())
41//! # }
42//! ```
43
44use crate::core::{ValidationResult, ValidationSuite};
45use crate::error::{Result, TermError};
46use crate::sources::DataSource;
47use crate::telemetry::TermTelemetry;
48use arrow::record_batch::RecordBatch;
49use datafusion::prelude::*;
50use std::collections::HashMap;
51use std::sync::Arc;
52use std::time::Instant;
53use tracing::{debug, info, instrument, span, Level};
54
55/// Multi-source validation engine for cross-table data validation.
56///
57/// This engine manages multiple data sources and provides efficient validation
58/// across heterogeneous datasets with optimized query execution and caching.
59pub struct MultiSourceValidator {
60    /// DataFusion session context for query execution
61    ctx: SessionContext,
62    /// Registered data sources by name
63    sources: HashMap<String, Arc<dyn DataSource>>,
64    /// Query result cache for performance optimization
65    query_cache: HashMap<String, CachedResult>,
66    /// Optional telemetry for observability
67    telemetry: Option<Arc<TermTelemetry>>,
68    /// Enable query result caching
69    enable_caching: bool,
70    /// Maximum cache size in bytes
71    max_cache_size: usize,
72    /// Current cache size in bytes
73    current_cache_size: usize,
74}
75
76/// Cached query result with metadata
77#[derive(Debug, Clone)]
78struct CachedResult {
79    /// The cached data
80    data: Vec<RecordBatch>,
81    /// When the result was cached
82    cached_at: Instant,
83    /// Size in bytes
84    size_bytes: usize,
85}
86
87impl MultiSourceValidator {
88    /// Create a new multi-source validator.
89    pub fn new() -> Self {
90        Self::with_context(SessionContext::new())
91    }
92
93    /// Create a new multi-source validator with a custom session context.
94    pub fn with_context(ctx: SessionContext) -> Self {
95        Self {
96            ctx,
97            sources: HashMap::new(),
98            query_cache: HashMap::new(),
99            telemetry: None,
100            enable_caching: true,
101            max_cache_size: 100 * 1024 * 1024, // 100MB default
102            current_cache_size: 0,
103        }
104    }
105
106    /// Set telemetry for observability.
107    pub fn with_telemetry(mut self, telemetry: Arc<TermTelemetry>) -> Self {
108        self.telemetry = Some(telemetry);
109        self
110    }
111
112    /// Enable or disable query result caching.
113    pub fn with_caching(mut self, enable: bool) -> Self {
114        self.enable_caching = enable;
115        self
116    }
117
118    /// Set maximum cache size in bytes.
119    pub fn with_max_cache_size(mut self, size_bytes: usize) -> Self {
120        self.max_cache_size = size_bytes;
121        self
122    }
123
124    /// Add a data source to the validator.
125    ///
126    /// # Arguments
127    ///
128    /// * `name` - The name to register the source as
129    /// * `source` - The data source to register
130    ///
131    /// # Examples
132    ///
133    /// ```rust,no_run
134    /// # use term_guard::core::MultiSourceValidator;
135    /// # use term_guard::sources::CsvSource;
136    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
137    /// let mut validator = MultiSourceValidator::new();
138    /// validator.add_source("orders", CsvSource::new("orders.csv")?).await?;
139    /// # Ok(())
140    /// # }
141    /// ```
142    #[instrument(skip(self, source, name))]
143    pub async fn add_source<S: DataSource + 'static>(
144        &mut self,
145        name: impl Into<String>,
146        source: S,
147    ) -> Result<()> {
148        let name = name.into();
149        info!("Adding data source: {}", name);
150
151        let source = Arc::new(source);
152
153        // Register with DataFusion context
154        source
155            .register_with_telemetry(&self.ctx, &name, self.telemetry.as_ref())
156            .await
157            .map_err(|e| {
158                TermError::data_source(
159                    "multi_source",
160                    format!("Failed to register source '{name}': {e}"),
161                )
162            })?;
163
164        self.sources.insert(name.clone(), source);
165        info!("Successfully added data source: {}", name);
166
167        Ok(())
168    }
169
170    /// Get the DataFusion session context.
171    ///
172    /// This provides direct access to the underlying context for advanced use cases.
173    pub fn context(&self) -> &SessionContext {
174        &self.ctx
175    }
176
177    /// Get a registered data source by name.
178    pub fn get_source(&self, name: &str) -> Option<&Arc<dyn DataSource>> {
179        self.sources.get(name)
180    }
181
182    /// List all registered data sources.
183    pub fn list_sources(&self) -> Vec<String> {
184        self.sources.keys().cloned().collect()
185    }
186
187    /// Run a validation suite across registered data sources.
188    ///
189    /// # Arguments
190    ///
191    /// * `suite` - The validation suite to run
192    ///
193    /// # Returns
194    ///
195    /// The validation results from running the suite
196    #[instrument(skip(self, suite), fields(suite_name = %suite.name()))]
197    pub async fn run_suite(&self, suite: &ValidationSuite) -> Result<ValidationResult> {
198        let span = span!(Level::INFO, "multi_source_validation", suite = %suite.name());
199        let _enter = span.enter();
200
201        info!(
202            "Running validation suite '{}' with {} registered sources",
203            suite.name(),
204            self.sources.len()
205        );
206
207        // Clear expired cache entries if caching is enabled
208        if self.enable_caching {
209            self.cleanup_cache();
210        }
211
212        // Run the suite with our context
213        let result = suite.run(&self.ctx).await?;
214
215        match &result {
216            ValidationResult::Success { report, .. } => {
217                info!(
218                    "Validation suite '{}' succeeded: {} checks passed",
219                    suite.name(),
220                    report.metrics.total_checks
221                );
222            }
223            ValidationResult::Failure { report } => {
224                info!(
225                    "Validation suite '{}' failed: {} issues found",
226                    suite.name(),
227                    report.issues.len()
228                );
229            }
230        }
231
232        Ok(result)
233    }
234
235    /// Execute a SQL query with optional caching.
236    ///
237    /// This method is used internally by constraints for performance optimization.
238    #[instrument(skip(self))]
239    pub async fn execute_query(&mut self, sql: &str) -> Result<Vec<RecordBatch>> {
240        // Generate cache key from SQL query using hash
241        use std::collections::hash_map::DefaultHasher;
242        use std::hash::{Hash, Hasher};
243
244        let mut hasher = DefaultHasher::new();
245        sql.hash(&mut hasher);
246        let cache_key = format!("{:x}", hasher.finish());
247
248        // Check cache if enabled
249        if self.enable_caching {
250            if let Some(cached) = self.query_cache.get(&cache_key) {
251                debug!("Cache hit for query");
252                return Ok(cached.data.clone());
253            }
254        }
255
256        debug!("Executing query: {}", sql);
257
258        // Execute query
259        let df = self.ctx.sql(sql).await.map_err(|e| {
260            TermError::data_source("multi_source", format!("Query execution failed: {e}"))
261        })?;
262
263        let batches = df.collect().await.map_err(|e| {
264            TermError::data_source("multi_source", format!("Failed to collect results: {e}"))
265        })?;
266
267        // Cache result if enabled
268        if self.enable_caching {
269            self.cache_result(cache_key, batches.clone());
270        }
271
272        Ok(batches)
273    }
274
275    /// Cache a query result.
276    fn cache_result(&mut self, key: String, data: Vec<RecordBatch>) {
277        let size_bytes = data.iter().map(|batch| batch.get_array_memory_size()).sum();
278
279        // Check if adding this would exceed cache size
280        if self.current_cache_size + size_bytes > self.max_cache_size {
281            // Evict oldest entries until we have space
282            self.evict_cache_entries(size_bytes);
283        }
284
285        let cached = CachedResult {
286            data,
287            cached_at: Instant::now(),
288            size_bytes,
289        };
290
291        self.current_cache_size += size_bytes;
292        self.query_cache.insert(key, cached);
293    }
294
295    /// Evict cache entries to make room for new data.
296    fn evict_cache_entries(&mut self, needed_bytes: usize) {
297        // Simple LRU eviction - remove oldest entries
298        let mut entries_to_remove = Vec::new();
299
300        {
301            let mut entries: Vec<_> = self.query_cache.iter().collect();
302            entries.sort_by_key(|(_, cached)| cached.cached_at);
303
304            for (key, cached) in entries {
305                if self.current_cache_size + needed_bytes <= self.max_cache_size {
306                    break;
307                }
308
309                entries_to_remove.push((key.clone(), cached.size_bytes));
310            }
311        }
312
313        // Now remove the entries after collecting them
314        for (key, size) in entries_to_remove {
315            self.query_cache.remove(&key);
316            self.current_cache_size -= size;
317            debug!("Evicted cache entry to free {} bytes", size);
318        }
319    }
320
321    /// Clean up expired cache entries.
322    fn cleanup_cache(&self) {
323        // This is a placeholder for more sophisticated cache management
324        // Currently we only evict based on size, but could add TTL-based eviction
325        debug!(
326            "Cache cleanup: {} entries, {} bytes",
327            self.query_cache.len(),
328            self.current_cache_size
329        );
330    }
331
332    /// Get cache statistics.
333    pub fn cache_stats(&self) -> CacheStats {
334        CacheStats {
335            entries: self.query_cache.len(),
336            size_bytes: self.current_cache_size,
337            max_size_bytes: self.max_cache_size,
338            hit_rate: 0.0, // Would need to track hits/misses for this
339        }
340    }
341}
342
343impl Default for MultiSourceValidator {
344    fn default() -> Self {
345        Self::new()
346    }
347}
348
349/// Cache statistics for monitoring.
350#[derive(Debug, Clone)]
351pub struct CacheStats {
352    /// Number of cached entries
353    pub entries: usize,
354    /// Current cache size in bytes
355    pub size_bytes: usize,
356    /// Maximum cache size in bytes
357    pub max_size_bytes: usize,
358    /// Cache hit rate (0.0 to 1.0)
359    pub hit_rate: f64,
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365    use crate::sources::CsvSource;
366    use std::io::Write;
367    use tempfile::NamedTempFile;
368
369    fn create_test_csv(data: &str) -> Result<NamedTempFile> {
370        let mut temp_file = NamedTempFile::with_suffix(".csv")?;
371        write!(temp_file, "{data}")?;
372        temp_file.flush()?;
373        Ok(temp_file)
374    }
375
376    #[tokio::test]
377    async fn test_multi_source_validator_creation() {
378        let validator = MultiSourceValidator::new();
379        assert_eq!(validator.sources.len(), 0);
380        assert!(validator.enable_caching);
381    }
382
383    #[tokio::test]
384    async fn test_add_source() -> Result<()> {
385        let mut validator = MultiSourceValidator::new();
386
387        let csv_data = "id,name\n1,Alice\n2,Bob";
388        let temp_file = create_test_csv(csv_data)?;
389        let source = CsvSource::new(temp_file.path().to_string_lossy().to_string())?;
390
391        validator.add_source("test_data", source).await?;
392
393        assert_eq!(validator.sources.len(), 1);
394        assert!(validator.get_source("test_data").is_some());
395
396        Ok(())
397    }
398
399    #[tokio::test]
400    async fn test_list_sources() -> Result<()> {
401        let mut validator = MultiSourceValidator::new();
402
403        let csv_data = "id,value\n1,100";
404        let temp_file1 = create_test_csv(csv_data)?;
405        let temp_file2 = create_test_csv(csv_data)?;
406
407        validator
408            .add_source(
409                "source1",
410                CsvSource::new(temp_file1.path().to_string_lossy().to_string())?,
411            )
412            .await?;
413        validator
414            .add_source(
415                "source2",
416                CsvSource::new(temp_file2.path().to_string_lossy().to_string())?,
417            )
418            .await?;
419
420        let sources = validator.list_sources();
421        assert_eq!(sources.len(), 2);
422        assert!(sources.contains(&"source1".to_string()));
423        assert!(sources.contains(&"source2".to_string()));
424
425        Ok(())
426    }
427
428    #[tokio::test]
429    async fn test_cache_configuration() {
430        let validator = MultiSourceValidator::new()
431            .with_caching(false)
432            .with_max_cache_size(1024 * 1024);
433
434        assert!(!validator.enable_caching);
435        assert_eq!(validator.max_cache_size, 1024 * 1024);
436    }
437
438    #[tokio::test]
439    async fn test_cache_stats() {
440        let validator = MultiSourceValidator::new();
441        let stats = validator.cache_stats();
442
443        assert_eq!(stats.entries, 0);
444        assert_eq!(stats.size_bytes, 0);
445        assert_eq!(stats.max_size_bytes, 100 * 1024 * 1024);
446    }
447}