1use crate::core::{ValidationResult, ValidationSuite};
45use crate::error::{Result, TermError};
46use crate::sources::DataSource;
47use crate::telemetry::TermTelemetry;
48use arrow::record_batch::RecordBatch;
49use datafusion::prelude::*;
50use std::collections::HashMap;
51use std::sync::Arc;
52use std::time::Instant;
53use tracing::{debug, info, instrument, span, Level};
54
55pub struct MultiSourceValidator {
60 ctx: SessionContext,
62 sources: HashMap<String, Arc<dyn DataSource>>,
64 query_cache: HashMap<String, CachedResult>,
66 telemetry: Option<Arc<TermTelemetry>>,
68 enable_caching: bool,
70 max_cache_size: usize,
72 current_cache_size: usize,
74}
75
76#[derive(Debug, Clone)]
78struct CachedResult {
79 data: Vec<RecordBatch>,
81 cached_at: Instant,
83 size_bytes: usize,
85}
86
87impl MultiSourceValidator {
88 pub fn new() -> Self {
90 Self::with_context(SessionContext::new())
91 }
92
93 pub fn with_context(ctx: SessionContext) -> Self {
95 Self {
96 ctx,
97 sources: HashMap::new(),
98 query_cache: HashMap::new(),
99 telemetry: None,
100 enable_caching: true,
101 max_cache_size: 100 * 1024 * 1024, current_cache_size: 0,
103 }
104 }
105
106 pub fn with_telemetry(mut self, telemetry: Arc<TermTelemetry>) -> Self {
108 self.telemetry = Some(telemetry);
109 self
110 }
111
112 pub fn with_caching(mut self, enable: bool) -> Self {
114 self.enable_caching = enable;
115 self
116 }
117
118 pub fn with_max_cache_size(mut self, size_bytes: usize) -> Self {
120 self.max_cache_size = size_bytes;
121 self
122 }
123
124 #[instrument(skip(self, source, name))]
143 pub async fn add_source<S: DataSource + 'static>(
144 &mut self,
145 name: impl Into<String>,
146 source: S,
147 ) -> Result<()> {
148 let name = name.into();
149 info!("Adding data source: {}", name);
150
151 let source = Arc::new(source);
152
153 source
155 .register_with_telemetry(&self.ctx, &name, self.telemetry.as_ref())
156 .await
157 .map_err(|e| {
158 TermError::data_source(
159 "multi_source",
160 format!("Failed to register source '{name}': {e}"),
161 )
162 })?;
163
164 self.sources.insert(name.clone(), source);
165 info!("Successfully added data source: {}", name);
166
167 Ok(())
168 }
169
170 pub fn context(&self) -> &SessionContext {
174 &self.ctx
175 }
176
177 pub fn get_source(&self, name: &str) -> Option<&Arc<dyn DataSource>> {
179 self.sources.get(name)
180 }
181
182 pub fn list_sources(&self) -> Vec<String> {
184 self.sources.keys().cloned().collect()
185 }
186
187 #[instrument(skip(self, suite), fields(suite_name = %suite.name()))]
197 pub async fn run_suite(&self, suite: &ValidationSuite) -> Result<ValidationResult> {
198 let span = span!(Level::INFO, "multi_source_validation", suite = %suite.name());
199 let _enter = span.enter();
200
201 info!(
202 "Running validation suite '{}' with {} registered sources",
203 suite.name(),
204 self.sources.len()
205 );
206
207 if self.enable_caching {
209 self.cleanup_cache();
210 }
211
212 let result = suite.run(&self.ctx).await?;
214
215 match &result {
216 ValidationResult::Success { report, .. } => {
217 info!(
218 "Validation suite '{}' succeeded: {} checks passed",
219 suite.name(),
220 report.metrics.total_checks
221 );
222 }
223 ValidationResult::Failure { report } => {
224 info!(
225 "Validation suite '{}' failed: {} issues found",
226 suite.name(),
227 report.issues.len()
228 );
229 }
230 }
231
232 Ok(result)
233 }
234
235 #[instrument(skip(self))]
239 pub async fn execute_query(&mut self, sql: &str) -> Result<Vec<RecordBatch>> {
240 use std::collections::hash_map::DefaultHasher;
242 use std::hash::{Hash, Hasher};
243
244 let mut hasher = DefaultHasher::new();
245 sql.hash(&mut hasher);
246 let cache_key = format!("{:x}", hasher.finish());
247
248 if self.enable_caching {
250 if let Some(cached) = self.query_cache.get(&cache_key) {
251 debug!("Cache hit for query");
252 return Ok(cached.data.clone());
253 }
254 }
255
256 debug!("Executing query: {}", sql);
257
258 let df = self.ctx.sql(sql).await.map_err(|e| {
260 TermError::data_source("multi_source", format!("Query execution failed: {e}"))
261 })?;
262
263 let batches = df.collect().await.map_err(|e| {
264 TermError::data_source("multi_source", format!("Failed to collect results: {e}"))
265 })?;
266
267 if self.enable_caching {
269 self.cache_result(cache_key, batches.clone());
270 }
271
272 Ok(batches)
273 }
274
275 fn cache_result(&mut self, key: String, data: Vec<RecordBatch>) {
277 let size_bytes = data.iter().map(|batch| batch.get_array_memory_size()).sum();
278
279 if self.current_cache_size + size_bytes > self.max_cache_size {
281 self.evict_cache_entries(size_bytes);
283 }
284
285 let cached = CachedResult {
286 data,
287 cached_at: Instant::now(),
288 size_bytes,
289 };
290
291 self.current_cache_size += size_bytes;
292 self.query_cache.insert(key, cached);
293 }
294
295 fn evict_cache_entries(&mut self, needed_bytes: usize) {
297 let mut entries_to_remove = Vec::new();
299
300 {
301 let mut entries: Vec<_> = self.query_cache.iter().collect();
302 entries.sort_by_key(|(_, cached)| cached.cached_at);
303
304 for (key, cached) in entries {
305 if self.current_cache_size + needed_bytes <= self.max_cache_size {
306 break;
307 }
308
309 entries_to_remove.push((key.clone(), cached.size_bytes));
310 }
311 }
312
313 for (key, size) in entries_to_remove {
315 self.query_cache.remove(&key);
316 self.current_cache_size -= size;
317 debug!("Evicted cache entry to free {} bytes", size);
318 }
319 }
320
321 fn cleanup_cache(&self) {
323 debug!(
326 "Cache cleanup: {} entries, {} bytes",
327 self.query_cache.len(),
328 self.current_cache_size
329 );
330 }
331
332 pub fn cache_stats(&self) -> CacheStats {
334 CacheStats {
335 entries: self.query_cache.len(),
336 size_bytes: self.current_cache_size,
337 max_size_bytes: self.max_cache_size,
338 hit_rate: 0.0, }
340 }
341}
342
343impl Default for MultiSourceValidator {
344 fn default() -> Self {
345 Self::new()
346 }
347}
348
349#[derive(Debug, Clone)]
351pub struct CacheStats {
352 pub entries: usize,
354 pub size_bytes: usize,
356 pub max_size_bytes: usize,
358 pub hit_rate: f64,
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365 use crate::sources::CsvSource;
366 use std::io::Write;
367 use tempfile::NamedTempFile;
368
369 fn create_test_csv(data: &str) -> Result<NamedTempFile> {
370 let mut temp_file = NamedTempFile::with_suffix(".csv")?;
371 write!(temp_file, "{data}")?;
372 temp_file.flush()?;
373 Ok(temp_file)
374 }
375
376 #[tokio::test]
377 async fn test_multi_source_validator_creation() {
378 let validator = MultiSourceValidator::new();
379 assert_eq!(validator.sources.len(), 0);
380 assert!(validator.enable_caching);
381 }
382
383 #[tokio::test]
384 async fn test_add_source() -> Result<()> {
385 let mut validator = MultiSourceValidator::new();
386
387 let csv_data = "id,name\n1,Alice\n2,Bob";
388 let temp_file = create_test_csv(csv_data)?;
389 let source = CsvSource::new(temp_file.path().to_string_lossy().to_string())?;
390
391 validator.add_source("test_data", source).await?;
392
393 assert_eq!(validator.sources.len(), 1);
394 assert!(validator.get_source("test_data").is_some());
395
396 Ok(())
397 }
398
399 #[tokio::test]
400 async fn test_list_sources() -> Result<()> {
401 let mut validator = MultiSourceValidator::new();
402
403 let csv_data = "id,value\n1,100";
404 let temp_file1 = create_test_csv(csv_data)?;
405 let temp_file2 = create_test_csv(csv_data)?;
406
407 validator
408 .add_source(
409 "source1",
410 CsvSource::new(temp_file1.path().to_string_lossy().to_string())?,
411 )
412 .await?;
413 validator
414 .add_source(
415 "source2",
416 CsvSource::new(temp_file2.path().to_string_lossy().to_string())?,
417 )
418 .await?;
419
420 let sources = validator.list_sources();
421 assert_eq!(sources.len(), 2);
422 assert!(sources.contains(&"source1".to_string()));
423 assert!(sources.contains(&"source2".to_string()));
424
425 Ok(())
426 }
427
428 #[tokio::test]
429 async fn test_cache_configuration() {
430 let validator = MultiSourceValidator::new()
431 .with_caching(false)
432 .with_max_cache_size(1024 * 1024);
433
434 assert!(!validator.enable_caching);
435 assert_eq!(validator.max_cache_size, 1024 * 1024);
436 }
437
438 #[tokio::test]
439 async fn test_cache_stats() {
440 let validator = MultiSourceValidator::new();
441 let stats = validator.cache_stats();
442
443 assert_eq!(stats.entries, 0);
444 assert_eq!(stats.size_bytes, 0);
445 assert_eq!(stats.max_size_bytes, 100 * 1024 * 1024);
446 }
447}