1use crate::error::DatasetEngineError;
2use crate::parquet::bifrost::buffer::start_buffer;
3use crate::parquet::bifrost::catalog::DatasetCatalogProvider;
4use crate::parquet::bifrost::engine::{DatasetEngine, TableCommand};
5use crate::parquet::bifrost::explain::{
6 logical_plan_to_tree, physical_plan_to_tree, sanitize_plan_text, ExplainResult,
7};
8use crate::parquet::bifrost::query::{QueryExecutionMetadata, QueryResult, QueryTracker};
9use crate::parquet::bifrost::registry::{DatasetRegistry, RegistrationResult};
10use crate::parquet::bifrost::stats;
11use crate::storage::ObjectStore;
12use arrow::datatypes::SchemaRef;
13use arrow_array::RecordBatch;
14use dashmap::DashMap;
15use datafusion::physical_plan::displayable;
16use datafusion::prelude::SessionContext;
17use scouter_settings::ObjectStorageSettings;
18use scouter_types::dataset::schema::{
19 SCOUTER_BATCH_ID, SCOUTER_CREATED_AT, SCOUTER_PARTITION_DATE,
20};
21use scouter_types::dataset::{DatasetFingerprint, DatasetNamespace, DatasetRegistration};
22use std::collections::{HashMap, HashSet};
23use std::sync::atomic::{AtomicI64, Ordering};
24use std::sync::Arc;
25use std::time::Instant;
26use tokio::sync::{mpsc, Mutex, Notify};
27use tokio::time::{interval, Duration};
28use tracing::{info, warn};
29
30const DEFAULT_ENGINE_TTL_SECS: u64 = 30 * 60; const DEFAULT_MAX_ACTIVE_ENGINES: usize = 50;
32const DEFAULT_FLUSH_INTERVAL_SECS: u64 = 60;
33const DEFAULT_MAX_BUFFER_ROWS: usize = 10_000;
34const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 30;
35const REAPER_INTERVAL_SECS: u64 = 5 * 60; const DISCOVERY_INTERVAL_SECS: u64 = 60;
37
38pub struct DatasetTableHandle {
39 pub buffer_tx: mpsc::Sender<RecordBatch>,
40 pub engine_tx: mpsc::Sender<TableCommand>,
41 shutdown_tx: mpsc::Sender<()>,
42 pub schema: SchemaRef,
43 pub fingerprint: DatasetFingerprint,
44 pub namespace: DatasetNamespace,
45 pub partition_columns: Vec<String>,
46 pub last_active_at: Arc<AtomicI64>,
47 engine_handle: tokio::task::JoinHandle<()>,
48 buffer_handle: tokio::task::JoinHandle<()>,
49}
50
51impl DatasetTableHandle {
52 fn touch(&self) {
53 self.last_active_at
54 .store(chrono::Utc::now().timestamp(), Ordering::Relaxed);
55 }
56}
57
58pub struct DatasetEngineManager {
63 registry: Arc<DatasetRegistry>,
64 active_engines: Arc<DashMap<String, DatasetTableHandle>>,
65 activating: Arc<Mutex<HashMap<String, Arc<Notify>>>>,
66 query_ctx: Arc<SessionContext>,
67 catalog_provider: Arc<DatasetCatalogProvider>,
68 object_store: ObjectStore,
69 query_tracker: QueryTracker,
70 engine_ttl_secs: u64,
71 max_active_engines: usize,
72 flush_interval_secs: u64,
73 max_buffer_rows: usize,
74 refresh_interval_secs: u64,
75}
76
77fn validate_sql(sql: &str) -> Result<(), DatasetEngineError> {
80 use datafusion::sql::parser::{DFParser, Statement as DFStatement};
81 use datafusion::sql::sqlparser::ast::Statement as SqlStatement;
82
83 let statements = DFParser::parse_sql(sql)
84 .map_err(|e| DatasetEngineError::SqlValidationError(format!("Failed to parse SQL: {e}")))?;
85
86 if statements.len() != 1 {
87 return Err(DatasetEngineError::SqlValidationError(
88 "Exactly one SQL statement is required".to_string(),
89 ));
90 }
91
92 match &statements[0] {
93 DFStatement::Statement(stmt) => match stmt.as_ref() {
94 SqlStatement::Query(_) => Ok(()),
95 SqlStatement::Copy { .. }
97 | SqlStatement::CreateTable(_)
98 | SqlStatement::Drop { .. }
99 | SqlStatement::Insert(_)
100 | SqlStatement::Update { .. }
101 | SqlStatement::Delete(_) => Err(DatasetEngineError::SqlValidationError(
102 "DDL and DML statements are not permitted".to_string(),
103 )),
104 other => Err(DatasetEngineError::SqlValidationError(format!(
105 "Only SELECT queries are allowed, got: {}",
106 other
107 ))),
108 },
109 _ => Err(DatasetEngineError::SqlValidationError(
110 "Only SELECT queries are allowed".to_string(),
111 )),
112 }
113}
114
115impl DatasetEngineManager {
116 pub async fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DatasetEngineError> {
117 let object_store = ObjectStore::new(storage_settings)?;
118 let query_ctx = Arc::new(object_store.get_session()?);
119 let catalog_provider = Arc::new(DatasetCatalogProvider::new());
120
121 let registry = Arc::new(DatasetRegistry::new(&object_store).await?);
125
126 let flush_interval_secs = std::env::var("SCOUTER_DATASET_FLUSH_INTERVAL_SECS")
127 .ok()
128 .and_then(|v| v.parse().ok())
129 .unwrap_or(DEFAULT_FLUSH_INTERVAL_SECS);
130
131 let manager = Self {
132 registry,
133 active_engines: Arc::new(DashMap::new()),
134 activating: Arc::new(Mutex::new(HashMap::new())),
135 query_ctx,
136 catalog_provider,
137 object_store,
138 query_tracker: QueryTracker::new(),
139 engine_ttl_secs: DEFAULT_ENGINE_TTL_SECS,
140 max_active_engines: DEFAULT_MAX_ACTIVE_ENGINES,
141 flush_interval_secs,
142 max_buffer_rows: DEFAULT_MAX_BUFFER_ROWS,
143 refresh_interval_secs: DEFAULT_REFRESH_INTERVAL_SECS,
144 };
145
146 for reg in manager.registry.list_active() {
149 manager.ensure_catalog_registered(®.namespace.catalog);
150 }
151
152 Ok(manager)
153 }
154
155 pub async fn with_config(
157 storage_settings: &ObjectStorageSettings,
158 engine_ttl_secs: u64,
159 max_active_engines: usize,
160 flush_interval_secs: u64,
161 max_buffer_rows: usize,
162 refresh_interval_secs: u64,
163 ) -> Result<Self, DatasetEngineError> {
164 let mut manager = Self::new(storage_settings).await?;
165 manager.engine_ttl_secs = engine_ttl_secs;
166 manager.max_active_engines = max_active_engines;
167 manager.flush_interval_secs = flush_interval_secs;
168 manager.max_buffer_rows = max_buffer_rows;
169 manager.refresh_interval_secs = refresh_interval_secs;
170 Ok(manager)
171 }
172
173 pub async fn register_dataset(
176 &self,
177 registration: &DatasetRegistration,
178 ) -> Result<RegistrationResult, DatasetEngineError> {
179 let result = self.registry.register(registration).await?;
180 self.ensure_catalog_registered(®istration.namespace.catalog);
181 Ok(result)
182 }
183
184 async fn activate_engine(
196 &self,
197 namespace: &DatasetNamespace,
198 ) -> Result<(), DatasetEngineError> {
199 let fqn = namespace.fqn();
200
201 if let Some(handle) = self.active_engines.get(&fqn) {
203 handle.touch();
204 return Ok(());
205 }
206
207 {
209 let mut pending = self.activating.lock().await;
210
211 if let Some(handle) = self.active_engines.get(&fqn) {
213 handle.touch();
214 return Ok(());
215 }
216
217 if let Some(notify) = pending.get(&fqn) {
218 let notify = Arc::clone(notify);
224 let notified = notify.notified();
225 tokio::pin!(notified);
226 notified.as_mut().enable();
227 drop(pending);
228
229 match tokio::time::timeout(Duration::from_secs(30), notified).await {
231 Ok(_) => {}
232 Err(_) => {
233 return Err(DatasetEngineError::RegistryError(format!(
234 "Engine activation timed out for {fqn}"
235 )));
236 }
237 }
238
239 return if self.active_engines.contains_key(&fqn) {
240 Ok(())
241 } else {
242 Err(DatasetEngineError::RegistryError(format!(
243 "Activation failed for {fqn}"
244 )))
245 };
246 }
247
248 pending.insert(fqn.clone(), Arc::new(Notify::new()));
249 } let (done_tx, done_rx) = tokio::sync::oneshot::channel::<()>();
254 let activating = Arc::clone(&self.activating);
255 let fqn_for_cleanup = fqn.clone();
256 tokio::spawn(async move {
257 let _ = done_rx.await;
260 let mut pending = activating.lock().await;
261 if let Some(notify) = pending.remove(&fqn_for_cleanup) {
262 notify.notify_waiters();
263 }
264 });
265
266 let result = self.do_activate_engine_inner(namespace, &fqn).await;
267 let _ = done_tx.send(()); result
269 }
270
271 async fn do_activate_engine_inner(
273 &self,
274 namespace: &DatasetNamespace,
275 fqn: &str,
276 ) -> Result<(), DatasetEngineError> {
277 let reg = self
279 .registry
280 .get(fqn)
281 .ok_or_else(|| DatasetEngineError::TableNotFound(fqn.to_string()))?;
282
283 if self.active_engines.len() >= self.max_active_engines {
285 self.evict_lru().await;
286 }
287
288 let arrow_schema: arrow::datatypes::Schema = serde_json::from_str(®.arrow_schema_json)
290 .map_err(|e| {
291 DatasetEngineError::SerializationError(format!(
292 "Failed to deserialize Arrow schema for {}: {}",
293 fqn, e
294 ))
295 })?;
296 let schema = Arc::new(arrow_schema);
297
298 let mut partition_columns = vec![SCOUTER_PARTITION_DATE.to_string()];
300 for col in ®.partition_columns {
301 if !partition_columns.contains(col) {
302 partition_columns.push(col.clone());
303 }
304 }
305
306 let engine = DatasetEngine::new(
308 &self.object_store,
309 schema.clone(),
310 namespace.clone(),
311 partition_columns.clone(),
312 Arc::clone(&self.catalog_provider),
313 )
314 .await?;
315
316 let (engine_tx, engine_handle) = engine.start_actor(self.refresh_interval_secs);
318
319 let (buffer_tx, batch_rx) = mpsc::channel::<RecordBatch>(100);
321 let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
322 let buffer_handle = start_buffer(
323 engine_tx.clone(),
324 batch_rx,
325 shutdown_rx,
326 self.flush_interval_secs,
327 self.max_buffer_rows,
328 fqn.to_string(),
329 );
330
331 self.ensure_catalog_registered(&namespace.catalog);
332
333 let handle = DatasetTableHandle {
334 buffer_tx,
335 engine_tx,
336 shutdown_tx,
337 schema,
338 fingerprint: reg.fingerprint.clone(),
339 namespace: namespace.clone(),
340 partition_columns,
341 last_active_at: Arc::new(AtomicI64::new(chrono::Utc::now().timestamp())),
342 engine_handle,
343 buffer_handle,
344 };
345
346 self.active_engines.insert(fqn.to_string(), handle);
347 info!("Activated engine for [{}]", fqn);
348
349 Ok(())
350 }
351
352 pub async fn insert_batch(
355 &self,
356 namespace: &DatasetNamespace,
357 fingerprint: &DatasetFingerprint,
358 batch: RecordBatch,
359 ) -> Result<(), DatasetEngineError> {
360 let fqn = namespace.fqn();
361
362 self.activate_engine(namespace).await?;
364
365 let handle = self
366 .active_engines
367 .get(&fqn)
368 .ok_or_else(|| DatasetEngineError::TableNotFound(fqn.clone()))?;
369
370 if handle.fingerprint.as_str() != fingerprint.as_str() {
372 warn!(
373 table = %fqn,
374 "Fingerprint mismatch: expected={}, actual={}",
375 handle.fingerprint.as_str(),
376 fingerprint.as_str()
377 );
378 return Err(DatasetEngineError::FingerprintMismatch {
379 table: fqn,
380 expected: handle.fingerprint.as_str().to_string(),
381 actual: fingerprint.as_str().to_string(),
382 });
383 }
384
385 handle.touch();
386
387 handle
389 .buffer_tx
390 .send(batch)
391 .await
392 .map_err(|_| DatasetEngineError::ChannelClosed)?;
393
394 Ok(())
395 }
396
397 pub async fn query(&self, sql: &str) -> Result<Vec<RecordBatch>, DatasetEngineError> {
402 validate_sql(sql)?;
403 let df = self.query_ctx.sql(sql).await?;
404 let batches = df.collect().await?;
405 Ok(batches)
406 }
407
408 pub fn list_datasets(&self) -> Vec<DatasetRegistration> {
410 self.registry.list_active()
411 }
412
413 pub fn get_dataset_info(&self, namespace: &DatasetNamespace) -> Option<DatasetRegistration> {
415 self.registry.get_by_namespace(namespace)
416 }
417
418 pub fn list_catalogs(&self) -> Vec<CatalogSummary> {
422 let datasets = self.registry.list_active();
423 let mut catalog_map: HashMap<String, (HashSet<String>, u32)> = HashMap::new();
424
425 for d in &datasets {
426 let entry = catalog_map
427 .entry(d.namespace.catalog.clone())
428 .or_insert_with(|| (HashSet::new(), 0));
429 entry.0.insert(d.namespace.schema_name.clone());
430 entry.1 += 1;
431 }
432
433 catalog_map
434 .into_iter()
435 .map(|(catalog, (schemas, table_count))| CatalogSummary {
436 catalog,
437 schema_count: schemas.len() as u32,
438 table_count,
439 })
440 .collect()
441 }
442
443 pub fn list_schemas(&self, catalog: &str) -> Vec<SchemaSummary> {
445 let datasets = self.registry.list_active();
446 let mut schema_map: HashMap<String, u32> = HashMap::new();
447
448 for d in datasets.iter().filter(|d| d.namespace.catalog == catalog) {
449 *schema_map
450 .entry(d.namespace.schema_name.clone())
451 .or_insert(0) += 1;
452 }
453
454 schema_map
455 .into_iter()
456 .map(|(schema_name, table_count)| SchemaSummary {
457 catalog: catalog.to_string(),
458 schema_name,
459 table_count,
460 })
461 .collect()
462 }
463
464 pub fn list_tables(&self, catalog: &str, schema_name: &str) -> Vec<TableSummaryInfo> {
466 self.registry
467 .list_active()
468 .into_iter()
469 .filter(|d| d.namespace.catalog == catalog && d.namespace.schema_name == schema_name)
470 .map(|d| TableSummaryInfo {
471 catalog: d.namespace.catalog,
472 schema_name: d.namespace.schema_name,
473 table: d.namespace.table,
474 status: d.status.to_string(),
475 created_at: d.created_at.to_rfc3339(),
476 updated_at: d.updated_at.to_rfc3339(),
477 })
478 .collect()
479 }
480
481 pub async fn get_table_detail(
483 &self,
484 namespace: &DatasetNamespace,
485 ) -> Result<TableDetail, DatasetEngineError> {
486 let reg = self
487 .registry
488 .get_by_namespace(namespace)
489 .ok_or_else(|| DatasetEngineError::TableNotFound(namespace.fqn()))?;
490
491 let arrow_schema: arrow::datatypes::Schema = serde_json::from_str(®.arrow_schema_json)
493 .map_err(|e| {
494 DatasetEngineError::SerializationError(format!(
495 "Failed to deserialize Arrow schema: {e}"
496 ))
497 })?;
498
499 let partition_set: HashSet<&str> =
500 reg.partition_columns.iter().map(|s| s.as_str()).collect();
501 let system_cols: HashSet<&str> =
502 [SCOUTER_CREATED_AT, SCOUTER_PARTITION_DATE, SCOUTER_BATCH_ID]
503 .into_iter()
504 .collect();
505
506 let columns: Vec<ColumnDetail> = arrow_schema
507 .fields()
508 .iter()
509 .map(|f| ColumnDetail {
510 name: f.name().clone(),
511 arrow_type: format!("{}", f.data_type()),
512 nullable: f.is_nullable(),
513 is_partition: partition_set.contains(f.name().as_str()),
514 is_system: system_cols.contains(f.name().as_str()),
515 })
516 .collect();
517
518 let table_stats = stats::load_table_stats(&self.object_store, namespace).await?;
520
521 Ok(TableDetail {
522 registration: reg,
523 columns,
524 stats: table_stats,
525 })
526 }
527
528 pub async fn preview_table(
530 &self,
531 namespace: &DatasetNamespace,
532 max_rows: usize,
533 ) -> Result<Vec<RecordBatch>, DatasetEngineError> {
534 let max_rows = max_rows.min(1000);
535 let sql = format!(
536 "SELECT * FROM {} LIMIT {}",
537 namespace.quoted_fqn(),
538 max_rows
539 );
540 self.activate_engine(namespace).await?;
541 let df = self.query_ctx.sql(&sql).await?;
542 let batches = df.collect().await?;
543 Ok(batches)
544 }
545
546 pub async fn execute_query(
550 &self,
551 sql: &str,
552 query_id: &str,
553 max_rows: usize,
554 ) -> Result<QueryResult, DatasetEngineError> {
555 validate_sql(sql)?;
556 let max_rows = max_rows.clamp(1, 100_000);
557
558 let cancel_token = self.query_tracker.register(query_id).await?;
559 let start = Instant::now();
560
561 let exec_result: Result<_, DatasetEngineError> = async {
562 let df = self.query_ctx.sql(sql).await?;
563 let limited_df = df.limit(0, Some(max_rows + 1))?;
565 tokio::select! {
566 result = limited_df.collect() => result.map_err(DatasetEngineError::from),
567 _ = cancel_token.cancelled() => {
568 Err(DatasetEngineError::QueryCancelled(query_id.to_string()))
569 }
570 }
571 }
572 .await;
573
574 self.query_tracker.remove(query_id).await;
575 let batches = exec_result?;
576
577 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
578 let truncated = total_rows > max_rows;
579
580 let final_batches = if truncated {
582 let mut remaining = max_rows;
583 let mut result = Vec::new();
584 for batch in batches {
585 if remaining == 0 {
586 break;
587 }
588 if batch.num_rows() <= remaining {
589 remaining -= batch.num_rows();
590 result.push(batch);
591 } else {
592 result.push(batch.slice(0, remaining));
593 remaining = 0;
594 }
595 }
596 result
597 } else {
598 batches
599 };
600
601 let rows_returned: usize = final_batches.iter().map(|b| b.num_rows()).sum();
602
603 Ok(QueryResult {
604 batches: final_batches,
605 metadata: QueryExecutionMetadata {
606 query_id: query_id.to_string(),
607 rows_returned: rows_returned as u64,
608 truncated,
609 execution_time_ms: start.elapsed().as_millis() as u64,
610 bytes_scanned: None,
611 },
612 })
613 }
614
615 pub async fn cancel_query(&self, query_id: &str) -> bool {
617 self.query_tracker.cancel(query_id).await
618 }
619
620 pub async fn explain_query(
624 &self,
625 sql: &str,
626 analyze: bool,
627 max_rows: usize,
628 ) -> Result<ExplainResult, DatasetEngineError> {
629 validate_sql(sql)?;
630 let df = self.query_ctx.sql(sql).await?;
631
632 let logical_plan = df.logical_plan().clone();
634 let logical_tree = logical_plan_to_tree(&logical_plan);
635 let logical_text = sanitize_plan_text(&format!("{}", logical_plan.display_indent()));
636
637 let physical_plan = df.create_physical_plan().await?;
639 let physical_tree = physical_plan_to_tree(physical_plan.as_ref());
640 let physical_text =
641 sanitize_plan_text(&displayable(physical_plan.as_ref()).indent(true).to_string());
642
643 let execution_metadata = if analyze {
644 let max_rows = max_rows.clamp(1, 100_000);
645 let analyze_df = self.query_ctx.sql(sql).await?;
646 let limited = analyze_df.limit(0, Some(max_rows + 1))?;
647 let start = Instant::now();
648 let batches = limited.collect().await?;
649 let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
650
651 Some(QueryExecutionMetadata {
652 query_id: String::new(),
653 rows_returned: rows.min(max_rows) as u64,
654 truncated: rows > max_rows,
655 execution_time_ms: start.elapsed().as_millis() as u64,
656 bytes_scanned: None,
657 })
658 } else {
659 None
660 };
661
662 Ok(ExplainResult {
663 logical_plan: logical_tree,
664 physical_plan: physical_tree,
665 logical_plan_text: logical_text,
666 physical_plan_text: physical_text,
667 execution_metadata,
668 })
669 }
670
671 async fn evict_lru(&self) {
673 let lru_fqn = self
674 .active_engines
675 .iter()
676 .min_by_key(|e| e.value().last_active_at.load(Ordering::Relaxed))
677 .map(|e| e.key().clone());
678
679 if let Some(fqn) = lru_fqn {
680 self.evict_engine(&fqn).await;
681 }
682 }
683
684 async fn evict_engine(&self, fqn: &str) {
686 if let Some((_, handle)) = self.active_engines.remove(fqn) {
687 info!("Evicting engine [{}]", fqn);
688
689 let _ = handle.shutdown_tx.send(()).await;
691
692 let _ = handle.buffer_handle.await;
695
696 let _ = handle.engine_tx.send(TableCommand::Shutdown).await;
698 let _ = handle.engine_handle.await;
699
700 self.catalog_provider.remove_table(&handle.namespace);
702 }
703 }
704
705 pub async fn shutdown(&self) {
707 info!(
708 "Shutting down DatasetEngineManager ({} active engines)",
709 self.active_engines.len()
710 );
711
712 let fqns: Vec<String> = self
713 .active_engines
714 .iter()
715 .map(|e| e.key().clone())
716 .collect();
717
718 for fqn in fqns {
719 self.evict_engine(&fqn).await;
720 }
721 }
722
723 pub fn start_reaper_loop(
728 self: &Arc<Self>,
729 mut shutdown_rx: tokio::sync::watch::Receiver<()>,
730 ) -> impl std::future::Future<Output = ()> + Send + 'static {
731 let manager = Arc::clone(self);
732 async move {
733 let mut ticker = interval(Duration::from_secs(REAPER_INTERVAL_SECS));
734 ticker.tick().await; loop {
737 tokio::select! {
738 _ = ticker.tick() => {
739 let now = chrono::Utc::now().timestamp();
740 let ttl = manager.engine_ttl_secs as i64;
741
742 let to_evict: Vec<String> = manager
743 .active_engines
744 .iter()
745 .filter(|e| now - e.value().last_active_at.load(Ordering::Relaxed) > ttl)
746 .map(|e| e.key().clone())
747 .collect();
748
749 for fqn in to_evict {
750 manager.evict_engine(&fqn).await;
751 }
752 }
753 _ = shutdown_rx.changed() => {
754 info!("Reaper loop shutting down");
755 break;
756 }
757 }
758 }
759 }
760 }
761
762 pub fn start_discovery_loop(
767 self: &Arc<Self>,
768 mut shutdown_rx: tokio::sync::watch::Receiver<()>,
769 ) -> impl std::future::Future<Output = ()> + Send + 'static {
770 let manager = Arc::clone(self);
771 async move {
772 let mut ticker = interval(Duration::from_secs(DISCOVERY_INTERVAL_SECS));
773 ticker.tick().await; loop {
776 tokio::select! {
777 _ = ticker.tick() => {
778 if let Err(e) = manager.registry.refresh().await {
779 warn!("Registry discovery refresh failed: {}", e);
780 }
781
782 for reg in manager.registry.list_active() {
784 manager.ensure_catalog_registered(®.namespace.catalog);
785 }
786 }
787 _ = shutdown_rx.changed() => {
788 info!("Discovery loop shutting down");
789 break;
790 }
791 }
792 }
793 }
794 }
795
796 pub fn query_ctx(&self) -> &Arc<SessionContext> {
798 &self.query_ctx
799 }
800
801 pub fn registry(&self) -> &Arc<DatasetRegistry> {
803 &self.registry
804 }
805
806 pub fn active_engine_count(&self) -> usize {
808 self.active_engines.len()
809 }
810
811 fn ensure_catalog_registered(&self, catalog: &str) {
813 self.query_ctx.register_catalog(
814 catalog,
815 Arc::clone(&self.catalog_provider) as Arc<dyn datafusion::catalog::CatalogProvider>,
816 );
817 }
818}
819
820#[derive(Debug, Clone, serde::Serialize)]
823pub struct CatalogSummary {
824 pub catalog: String,
825 pub schema_count: u32,
826 pub table_count: u32,
827}
828
829#[derive(Debug, Clone, serde::Serialize)]
830pub struct SchemaSummary {
831 pub catalog: String,
832 pub schema_name: String,
833 pub table_count: u32,
834}
835
836#[derive(Debug, Clone, serde::Serialize)]
837pub struct TableSummaryInfo {
838 pub catalog: String,
839 pub schema_name: String,
840 pub table: String,
841 pub status: String,
842 pub created_at: String,
843 pub updated_at: String,
844}
845
846#[derive(Debug, Clone, serde::Serialize)]
847pub struct ColumnDetail {
848 pub name: String,
849 pub arrow_type: String,
850 pub nullable: bool,
851 pub is_partition: bool,
852 pub is_system: bool,
853}
854
855pub struct TableDetail {
856 pub registration: DatasetRegistration,
857 pub columns: Vec<ColumnDetail>,
858 pub stats: stats::TableStats,
859}
860
861#[cfg(test)]
862mod tests {
863 use super::*;
864 use arrow::array::AsArray;
865 use arrow::datatypes::{DataType, Field, Int64Type, Schema, TimeUnit};
866 use scouter_types::dataset::{DatasetFingerprint, DatasetRegistration};
867 use tempfile::TempDir;
868
869 fn test_storage_settings(dir: &TempDir) -> ObjectStorageSettings {
870 ObjectStorageSettings {
871 storage_uri: dir.path().to_str().unwrap().to_string(),
872 storage_type: scouter_types::StorageType::Local,
873 region: "us-east-1".to_string(),
874 trace_compaction_interval_hours: 24,
875 trace_flush_interval_secs: 5,
876 trace_refresh_interval_secs: 10,
877 }
878 }
879
880 fn test_schema() -> Schema {
881 Schema::new(vec![
882 Field::new("user_id", DataType::Utf8, false),
883 Field::new("score", DataType::Float64, false),
884 Field::new("model_name", DataType::Utf8, true),
885 Field::new(
887 "scouter_created_at",
888 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
889 false,
890 ),
891 Field::new("scouter_partition_date", DataType::Date32, false),
892 Field::new("scouter_batch_id", DataType::Utf8, false),
893 ])
894 }
895
896 fn test_registration(schema: &Schema) -> DatasetRegistration {
897 let arrow_schema_json = serde_json::to_string(schema).unwrap();
898 let fingerprint = DatasetFingerprint::from_schema_json(&arrow_schema_json);
899 let namespace =
900 DatasetNamespace::new("test_catalog", "test_schema", "predictions").unwrap();
901
902 DatasetRegistration::new(
903 namespace,
904 fingerprint,
905 arrow_schema_json,
906 "{}".to_string(),
907 vec![],
908 )
909 }
910
911 fn make_test_batch(schema: &Schema) -> RecordBatch {
912 use arrow::array::*;
913 use chrono::{Datelike, Utc};
914
915 let now = Utc::now();
916 let epoch_days = now.date_naive().num_days_from_ce() - 719_163;
917
918 RecordBatch::try_new(
919 Arc::new(schema.clone()),
920 vec![
921 Arc::new(StringArray::from(vec!["user_1", "user_2", "user_3"])),
922 Arc::new(Float64Array::from(vec![0.95, 0.87, 0.92])),
923 Arc::new(StringArray::from(vec![
924 Some("model_a"),
925 None,
926 Some("model_b"),
927 ])),
928 Arc::new(
929 TimestampMicrosecondArray::from(vec![
930 now.timestamp_micros(),
931 now.timestamp_micros(),
932 now.timestamp_micros(),
933 ])
934 .with_timezone("UTC"),
935 ),
936 Arc::new(Date32Array::from(vec![epoch_days, epoch_days, epoch_days])),
937 Arc::new(StringArray::from(vec![
938 "batch-001",
939 "batch-001",
940 "batch-001",
941 ])),
942 ],
943 )
944 .unwrap()
945 }
946
947 #[tokio::test]
948 async fn test_register_and_insert() {
949 let dir = TempDir::new().unwrap();
950 let settings = test_storage_settings(&dir);
951
952 let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
953 .await
954 .unwrap();
955
956 let schema = test_schema();
957 let reg = test_registration(&schema);
958
959 let result = manager.register_dataset(®).await.unwrap();
961 assert_eq!(result, RegistrationResult::Created);
962
963 let result2 = manager.register_dataset(®).await.unwrap();
965 assert_eq!(result2, RegistrationResult::AlreadyExists);
966
967 assert_eq!(manager.active_engine_count(), 0);
969
970 let batch = make_test_batch(&schema);
972 manager
973 .insert_batch(®.namespace, ®.fingerprint, batch)
974 .await
975 .unwrap();
976
977 assert_eq!(manager.active_engine_count(), 1);
979
980 tokio::time::sleep(Duration::from_secs(2)).await;
982
983 manager.shutdown().await;
985 assert_eq!(manager.active_engine_count(), 0);
986 }
987
988 #[tokio::test]
989 async fn test_fingerprint_mismatch() {
990 let dir = TempDir::new().unwrap();
991 let settings = test_storage_settings(&dir);
992
993 let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
994 .await
995 .unwrap();
996
997 let schema = test_schema();
998 let reg = test_registration(&schema);
999 manager.register_dataset(®).await.unwrap();
1000
1001 let wrong_fp = DatasetFingerprint::from_schema_json("wrong");
1003 let batch = make_test_batch(&schema);
1004
1005 let result = manager.insert_batch(®.namespace, &wrong_fp, batch).await;
1006
1007 assert!(result.is_err());
1008 if let Err(DatasetEngineError::FingerprintMismatch { .. }) = result {
1009 } else {
1011 panic!("Expected FingerprintMismatch error");
1012 }
1013
1014 manager.shutdown().await;
1015 }
1016
1017 #[tokio::test]
1018 async fn test_table_not_found() {
1019 let dir = TempDir::new().unwrap();
1020 let settings = test_storage_settings(&dir);
1021
1022 let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
1023 .await
1024 .unwrap();
1025
1026 let ns = DatasetNamespace::new("no", "such", "table").unwrap();
1027 let fp = DatasetFingerprint::from_schema_json("x");
1028 let schema = test_schema();
1029 let batch = make_test_batch(&schema);
1030
1031 let result = manager.insert_batch(&ns, &fp, batch).await;
1032 assert!(matches!(result, Err(DatasetEngineError::TableNotFound(_))));
1033
1034 manager.shutdown().await;
1035 }
1036
1037 #[tokio::test]
1038 async fn test_list_datasets() {
1039 let dir = TempDir::new().unwrap();
1040 let settings = test_storage_settings(&dir);
1041
1042 let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
1043 .await
1044 .unwrap();
1045
1046 assert!(manager.list_datasets().is_empty());
1047
1048 let schema = test_schema();
1049 let reg = test_registration(&schema);
1050 manager.register_dataset(®).await.unwrap();
1051
1052 let datasets = manager.list_datasets();
1053 assert_eq!(datasets.len(), 1);
1054 assert_eq!(
1055 datasets[0].namespace.fqn(),
1056 "test_catalog.test_schema.predictions"
1057 );
1058
1059 manager.shutdown().await;
1060 }
1061
1062 #[tokio::test]
1063 async fn test_multiple_tables_isolation() {
1064 let dir = TempDir::new().unwrap();
1065 let settings = test_storage_settings(&dir);
1066
1067 let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
1068 .await
1069 .unwrap();
1070
1071 let schema = test_schema();
1072
1073 let ns1 = DatasetNamespace::new("cat", "sch", "table_a").unwrap();
1075 let ns2 = DatasetNamespace::new("cat", "sch", "table_b").unwrap();
1076 let arrow_json = serde_json::to_string(&schema).unwrap();
1077 let fp = DatasetFingerprint::from_schema_json(&arrow_json);
1078
1079 let reg1 = DatasetRegistration::new(
1080 ns1.clone(),
1081 fp.clone(),
1082 arrow_json.clone(),
1083 "{}".into(),
1084 vec![],
1085 );
1086 let reg2 = DatasetRegistration::new(
1087 ns2.clone(),
1088 fp.clone(),
1089 arrow_json.clone(),
1090 "{}".into(),
1091 vec![],
1092 );
1093
1094 manager.register_dataset(®1).await.unwrap();
1095 manager.register_dataset(®2).await.unwrap();
1096
1097 let batch1 = make_test_batch(&schema);
1099 let batch2 = make_test_batch(&schema);
1100 manager.insert_batch(&ns1, &fp, batch1).await.unwrap();
1101 manager.insert_batch(&ns2, &fp, batch2).await.unwrap();
1102
1103 assert_eq!(manager.active_engine_count(), 2);
1104
1105 manager.shutdown().await;
1106 }
1107
1108 #[tokio::test]
1109 async fn test_max_active_engines_cap() {
1110 let dir = TempDir::new().unwrap();
1111 let settings = test_storage_settings(&dir);
1112
1113 let manager = DatasetEngineManager::with_config(&settings, 1800, 2, 1, 100, 30)
1115 .await
1116 .unwrap();
1117
1118 let schema = test_schema();
1119 let arrow_json = serde_json::to_string(&schema).unwrap();
1120 let fp = DatasetFingerprint::from_schema_json(&arrow_json);
1121
1122 for i in 0..3 {
1124 let ns = DatasetNamespace::new("cat", "sch", format!("tbl_{i}")).unwrap();
1125 let reg =
1126 DatasetRegistration::new(ns, fp.clone(), arrow_json.clone(), "{}".into(), vec![]);
1127 manager.register_dataset(®).await.unwrap();
1128 }
1129
1130 let ns0 = DatasetNamespace::new("cat", "sch", "tbl_0").unwrap();
1132 let ns1 = DatasetNamespace::new("cat", "sch", "tbl_1").unwrap();
1133 let ns2 = DatasetNamespace::new("cat", "sch", "tbl_2").unwrap();
1134
1135 manager
1136 .insert_batch(&ns0, &fp, make_test_batch(&schema))
1137 .await
1138 .unwrap();
1139 manager
1140 .insert_batch(&ns1, &fp, make_test_batch(&schema))
1141 .await
1142 .unwrap();
1143
1144 assert_eq!(manager.active_engine_count(), 2);
1145
1146 manager
1148 .insert_batch(&ns2, &fp, make_test_batch(&schema))
1149 .await
1150 .unwrap();
1151
1152 assert_eq!(manager.active_engine_count(), 2);
1154
1155 manager.shutdown().await;
1156 }
1157
1158 #[tokio::test]
1159 async fn test_write_and_query() {
1160 let dir = TempDir::new().unwrap();
1161 let settings = test_storage_settings(&dir);
1162
1163 let manager = DatasetEngineManager::with_config(
1164 &settings, 1800, 10, 1, 100, 30,
1167 )
1168 .await
1169 .unwrap();
1170
1171 let schema = test_schema();
1172 let reg = test_registration(&schema);
1173 manager.register_dataset(®).await.unwrap();
1174
1175 let batch = make_test_batch(&schema);
1177 manager
1178 .insert_batch(®.namespace, ®.fingerprint, batch)
1179 .await
1180 .unwrap();
1181
1182 tokio::time::sleep(Duration::from_secs(3)).await;
1184
1185 let sql = "SELECT COUNT(*) as cnt FROM test_catalog.test_schema.predictions";
1187 let results = manager.query(sql).await.unwrap();
1188
1189 assert!(!results.is_empty());
1190 let count_col = results[0]
1191 .column_by_name("cnt")
1192 .unwrap()
1193 .as_primitive_opt::<Int64Type>()
1194 .unwrap();
1195 assert_eq!(count_col.value(0), 3);
1196
1197 manager.shutdown().await;
1198 }
1199
1200 #[tokio::test]
1201 async fn test_registry_persistence() {
1202 let dir = TempDir::new().unwrap();
1203 let settings = test_storage_settings(&dir);
1204
1205 {
1207 let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
1208 .await
1209 .unwrap();
1210
1211 let schema = test_schema();
1212 let reg = test_registration(&schema);
1213 manager.register_dataset(®).await.unwrap();
1214 manager.shutdown().await;
1215 }
1216
1217 {
1219 let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
1220 .await
1221 .unwrap();
1222
1223 let datasets = manager.list_datasets();
1224 assert_eq!(datasets.len(), 1);
1225 assert_eq!(
1226 datasets[0].namespace.fqn(),
1227 "test_catalog.test_schema.predictions"
1228 );
1229
1230 manager.shutdown().await;
1231 }
1232 }
1233}