Skip to main content

scouter_dataframe/parquet/bifrost/
manager.rs

1use crate::error::DatasetEngineError;
2use crate::parquet::bifrost::buffer::start_buffer;
3use crate::parquet::bifrost::catalog::DatasetCatalogProvider;
4use crate::parquet::bifrost::engine::{DatasetEngine, TableCommand};
5use crate::parquet::bifrost::explain::{
6    logical_plan_to_tree, physical_plan_to_tree, sanitize_plan_text, ExplainResult,
7};
8use crate::parquet::bifrost::query::{QueryExecutionMetadata, QueryResult, QueryTracker};
9use crate::parquet::bifrost::registry::{DatasetRegistry, RegistrationResult};
10use crate::parquet::bifrost::stats;
11use crate::storage::ObjectStore;
12use arrow::datatypes::SchemaRef;
13use arrow_array::RecordBatch;
14use dashmap::DashMap;
15use datafusion::physical_plan::displayable;
16use datafusion::prelude::SessionContext;
17use scouter_settings::ObjectStorageSettings;
18use scouter_types::dataset::schema::{
19    SCOUTER_BATCH_ID, SCOUTER_CREATED_AT, SCOUTER_PARTITION_DATE,
20};
21use scouter_types::dataset::{DatasetFingerprint, DatasetNamespace, DatasetRegistration};
22use std::collections::{HashMap, HashSet};
23use std::sync::atomic::{AtomicI64, Ordering};
24use std::sync::Arc;
25use std::time::Instant;
26use tokio::sync::{mpsc, Mutex, Notify};
27use tokio::time::{interval, Duration};
28use tracing::{info, warn};
29
30const DEFAULT_ENGINE_TTL_SECS: u64 = 30 * 60; // 30 minutes
31const DEFAULT_MAX_ACTIVE_ENGINES: usize = 50;
32const DEFAULT_FLUSH_INTERVAL_SECS: u64 = 60;
33const DEFAULT_MAX_BUFFER_ROWS: usize = 10_000;
34const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 30;
35const REAPER_INTERVAL_SECS: u64 = 5 * 60; // 5 minutes
36const DISCOVERY_INTERVAL_SECS: u64 = 60;
37
38pub struct DatasetTableHandle {
39    pub buffer_tx: mpsc::Sender<RecordBatch>,
40    pub engine_tx: mpsc::Sender<TableCommand>,
41    shutdown_tx: mpsc::Sender<()>,
42    pub schema: SchemaRef,
43    pub fingerprint: DatasetFingerprint,
44    pub namespace: DatasetNamespace,
45    pub partition_columns: Vec<String>,
46    pub last_active_at: Arc<AtomicI64>,
47    engine_handle: tokio::task::JoinHandle<()>,
48    buffer_handle: tokio::task::JoinHandle<()>,
49}
50
51impl DatasetTableHandle {
52    fn touch(&self) {
53        self.last_active_at
54            .store(chrono::Utc::now().timestamp(), Ordering::Relaxed);
55    }
56}
57
58/// Top-level coordinator for all dataset tables.
59///
60/// Manages a registry of table metadata, lazy-loads engine actors on demand,
61/// and evicts idle engines based on TTL and a hard cap.
62pub struct DatasetEngineManager {
63    registry: Arc<DatasetRegistry>,
64    active_engines: Arc<DashMap<String, DatasetTableHandle>>,
65    activating: Arc<Mutex<HashMap<String, Arc<Notify>>>>,
66    query_ctx: Arc<SessionContext>,
67    catalog_provider: Arc<DatasetCatalogProvider>,
68    object_store: ObjectStore,
69    query_tracker: QueryTracker,
70    engine_ttl_secs: u64,
71    max_active_engines: usize,
72    flush_interval_secs: u64,
73    max_buffer_rows: usize,
74    refresh_interval_secs: u64,
75}
76
77/// Validate that a SQL string contains exactly one SELECT statement.
78/// Rejects DDL, DML, SHOW, and DataFusion extension statements.
79fn validate_sql(sql: &str) -> Result<(), DatasetEngineError> {
80    use datafusion::sql::parser::{DFParser, Statement as DFStatement};
81    use datafusion::sql::sqlparser::ast::Statement as SqlStatement;
82
83    let statements = DFParser::parse_sql(sql)
84        .map_err(|e| DatasetEngineError::SqlValidationError(format!("Failed to parse SQL: {e}")))?;
85
86    if statements.len() != 1 {
87        return Err(DatasetEngineError::SqlValidationError(
88            "Exactly one SQL statement is required".to_string(),
89        ));
90    }
91
92    match &statements[0] {
93        DFStatement::Statement(stmt) => match stmt.as_ref() {
94            SqlStatement::Query(_) => Ok(()),
95            // Explicitly deny write-capable and DDL variants as defense-in-depth
96            SqlStatement::Copy { .. }
97            | SqlStatement::CreateTable(_)
98            | SqlStatement::Drop { .. }
99            | SqlStatement::Insert(_)
100            | SqlStatement::Update { .. }
101            | SqlStatement::Delete(_) => Err(DatasetEngineError::SqlValidationError(
102                "DDL and DML statements are not permitted".to_string(),
103            )),
104            other => Err(DatasetEngineError::SqlValidationError(format!(
105                "Only SELECT queries are allowed, got: {}",
106                other
107            ))),
108        },
109        _ => Err(DatasetEngineError::SqlValidationError(
110            "Only SELECT queries are allowed".to_string(),
111        )),
112    }
113}
114
115impl DatasetEngineManager {
116    pub async fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DatasetEngineError> {
117        let object_store = ObjectStore::new(storage_settings)?;
118        let query_ctx = Arc::new(object_store.get_session()?);
119        let catalog_provider = Arc::new(DatasetCatalogProvider::new());
120
121        // Register our catalog provider for each known catalog
122        // (catalogs are discovered dynamically as tables are registered)
123
124        let registry = Arc::new(DatasetRegistry::new(&object_store).await?);
125
126        let flush_interval_secs = std::env::var("SCOUTER_DATASET_FLUSH_INTERVAL_SECS")
127            .ok()
128            .and_then(|v| v.parse().ok())
129            .unwrap_or(DEFAULT_FLUSH_INTERVAL_SECS);
130
131        let manager = Self {
132            registry,
133            active_engines: Arc::new(DashMap::new()),
134            activating: Arc::new(Mutex::new(HashMap::new())),
135            query_ctx,
136            catalog_provider,
137            object_store,
138            query_tracker: QueryTracker::new(),
139            engine_ttl_secs: DEFAULT_ENGINE_TTL_SECS,
140            max_active_engines: DEFAULT_MAX_ACTIVE_ENGINES,
141            flush_interval_secs,
142            max_buffer_rows: DEFAULT_MAX_BUFFER_ROWS,
143            refresh_interval_secs: DEFAULT_REFRESH_INTERVAL_SECS,
144        };
145
146        // Pre-register catalog names from existing registrations so DataFusion
147        // can resolve them. No engines are spawned — all lazy-loaded.
148        for reg in manager.registry.list_active() {
149            manager.ensure_catalog_registered(&reg.namespace.catalog);
150        }
151
152        Ok(manager)
153    }
154
155    /// Create a manager with custom configuration (primarily for testing).
156    pub async fn with_config(
157        storage_settings: &ObjectStorageSettings,
158        engine_ttl_secs: u64,
159        max_active_engines: usize,
160        flush_interval_secs: u64,
161        max_buffer_rows: usize,
162        refresh_interval_secs: u64,
163    ) -> Result<Self, DatasetEngineError> {
164        let mut manager = Self::new(storage_settings).await?;
165        manager.engine_ttl_secs = engine_ttl_secs;
166        manager.max_active_engines = max_active_engines;
167        manager.flush_interval_secs = flush_interval_secs;
168        manager.max_buffer_rows = max_buffer_rows;
169        manager.refresh_interval_secs = refresh_interval_secs;
170        Ok(manager)
171    }
172
173    /// Register a dataset schema. Idempotent.
174    /// Does NOT spawn an engine — that's lazy on first write/query.
175    pub async fn register_dataset(
176        &self,
177        registration: &DatasetRegistration,
178    ) -> Result<RegistrationResult, DatasetEngineError> {
179        let result = self.registry.register(registration).await?;
180        self.ensure_catalog_registered(&registration.namespace.catalog);
181        Ok(result)
182    }
183
184    /// Get or activate an engine for the given namespace.
185    /// If the engine is already active, touches `last_active_at` and returns the handle reference.
186    /// If not active, lazy-loads the Delta table, spawns the actor pair, and returns it.
187    ///
188    /// Uses `activating` to prevent two concurrent callers from creating duplicate engine
189    /// actors for the same FQN (TOCTOU guard).
190    ///
191    /// Cancellation safety: a spawned cleanup task holds an `Arc` clone of `activating`
192    /// and waits for a oneshot signal. If this future is dropped mid-await, the oneshot
193    /// sender drops too, the cleanup task unblocks via `Err(RecvError)`, and it removes
194    /// the stale entry + fires `notify_waiters()` so waiters are not permanently hung.
195    async fn activate_engine(
196        &self,
197        namespace: &DatasetNamespace,
198    ) -> Result<(), DatasetEngineError> {
199        let fqn = namespace.fqn();
200
201        // Fast path: already active
202        if let Some(handle) = self.active_engines.get(&fqn) {
203            handle.touch();
204            return Ok(());
205        }
206
207        // Serialize concurrent activations for the same FQN
208        {
209            let mut pending = self.activating.lock().await;
210
211            // Re-check after acquiring lock — another task may have completed activation
212            if let Some(handle) = self.active_engines.get(&fqn) {
213                handle.touch();
214                return Ok(());
215            }
216
217            if let Some(notify) = pending.get(&fqn) {
218                // Another task is already activating this FQN — wait for it.
219                // Pin and enable the Notified future before releasing the lock so the
220                // waiter is registered before the activating task can call notify_waiters().
221                // Without enable(), a notify_waiters() fired between drop(pending) and the
222                // first poll of notified.await would be lost, hanging this task forever.
223                let notify = Arc::clone(notify);
224                let notified = notify.notified();
225                tokio::pin!(notified);
226                notified.as_mut().enable();
227                drop(pending);
228
229                // Timeout bounds worst-case wait if the cleanup task is somehow delayed.
230                match tokio::time::timeout(Duration::from_secs(30), notified).await {
231                    Ok(_) => {}
232                    Err(_) => {
233                        return Err(DatasetEngineError::RegistryError(format!(
234                            "Engine activation timed out for {fqn}"
235                        )));
236                    }
237                }
238
239                return if self.active_engines.contains_key(&fqn) {
240                    Ok(())
241                } else {
242                    Err(DatasetEngineError::RegistryError(format!(
243                        "Activation failed for {fqn}"
244                    )))
245                };
246            }
247
248            pending.insert(fqn.clone(), Arc::new(Notify::new()));
249        } // activating lock released — safe to .await
250
251        // Spawn a cleanup task that runs whether we complete normally or get cancelled.
252        // The oneshot sender is dropped in both cases, unblocking the cleanup task.
253        let (done_tx, done_rx) = tokio::sync::oneshot::channel::<()>();
254        let activating = Arc::clone(&self.activating);
255        let fqn_for_cleanup = fqn.clone();
256        tokio::spawn(async move {
257            // Waits for done_tx.send(()) on the happy path, or Err if sender is dropped
258            // (future cancelled). Either way, clean up the pending entry.
259            let _ = done_rx.await;
260            let mut pending = activating.lock().await;
261            if let Some(notify) = pending.remove(&fqn_for_cleanup) {
262                notify.notify_waiters();
263            }
264        });
265
266        let result = self.do_activate_engine_inner(namespace, &fqn).await;
267        let _ = done_tx.send(()); // signal cleanup; if already dropped, cleanup already ran
268        result
269    }
270
271    /// Inner activation logic, called only when we hold the pending-set reservation.
272    async fn do_activate_engine_inner(
273        &self,
274        namespace: &DatasetNamespace,
275        fqn: &str,
276    ) -> Result<(), DatasetEngineError> {
277        // Look up registration
278        let reg = self
279            .registry
280            .get(fqn)
281            .ok_or_else(|| DatasetEngineError::TableNotFound(fqn.to_string()))?;
282
283        // Check cap — evict LRU if needed
284        if self.active_engines.len() >= self.max_active_engines {
285            self.evict_lru().await;
286        }
287
288        // Parse the Arrow schema from the registration
289        let arrow_schema: arrow::datatypes::Schema = serde_json::from_str(&reg.arrow_schema_json)
290            .map_err(|e| {
291            DatasetEngineError::SerializationError(format!(
292                "Failed to deserialize Arrow schema for {}: {}",
293                fqn, e
294            ))
295        })?;
296        let schema = Arc::new(arrow_schema);
297
298        // Build full partition columns list
299        let mut partition_columns = vec![SCOUTER_PARTITION_DATE.to_string()];
300        for col in &reg.partition_columns {
301            if !partition_columns.contains(col) {
302                partition_columns.push(col.clone());
303            }
304        }
305
306        // Create the engine
307        let engine = DatasetEngine::new(
308            &self.object_store,
309            schema.clone(),
310            namespace.clone(),
311            partition_columns.clone(),
312            Arc::clone(&self.catalog_provider),
313        )
314        .await?;
315
316        // Start the engine actor
317        let (engine_tx, engine_handle) = engine.start_actor(self.refresh_interval_secs);
318
319        // Start the buffer actor
320        let (buffer_tx, batch_rx) = mpsc::channel::<RecordBatch>(100);
321        let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
322        let buffer_handle = start_buffer(
323            engine_tx.clone(),
324            batch_rx,
325            shutdown_rx,
326            self.flush_interval_secs,
327            self.max_buffer_rows,
328            fqn.to_string(),
329        );
330
331        self.ensure_catalog_registered(&namespace.catalog);
332
333        let handle = DatasetTableHandle {
334            buffer_tx,
335            engine_tx,
336            shutdown_tx,
337            schema,
338            fingerprint: reg.fingerprint.clone(),
339            namespace: namespace.clone(),
340            partition_columns,
341            last_active_at: Arc::new(AtomicI64::new(chrono::Utc::now().timestamp())),
342            engine_handle,
343            buffer_handle,
344        };
345
346        self.active_engines.insert(fqn.to_string(), handle);
347        info!("Activated engine for [{}]", fqn);
348
349        Ok(())
350    }
351
352    /// Insert a RecordBatch into a dataset table.
353    /// Activates the engine on demand if not already active.
354    pub async fn insert_batch(
355        &self,
356        namespace: &DatasetNamespace,
357        fingerprint: &DatasetFingerprint,
358        batch: RecordBatch,
359    ) -> Result<(), DatasetEngineError> {
360        let fqn = namespace.fqn();
361
362        // Activate if needed
363        self.activate_engine(namespace).await?;
364
365        let handle = self
366            .active_engines
367            .get(&fqn)
368            .ok_or_else(|| DatasetEngineError::TableNotFound(fqn.clone()))?;
369
370        // Validate fingerprint
371        if handle.fingerprint.as_str() != fingerprint.as_str() {
372            warn!(
373                table = %fqn,
374                "Fingerprint mismatch: expected={}, actual={}",
375                handle.fingerprint.as_str(),
376                fingerprint.as_str()
377            );
378            return Err(DatasetEngineError::FingerprintMismatch {
379                table: fqn,
380                expected: handle.fingerprint.as_str().to_string(),
381                actual: fingerprint.as_str().to_string(),
382            });
383        }
384
385        handle.touch();
386
387        // Send to buffer
388        handle
389            .buffer_tx
390            .send(batch)
391            .await
392            .map_err(|_| DatasetEngineError::ChannelClosed)?;
393
394        Ok(())
395    }
396
397    /// Execute a SQL query against the shared query context.
398    ///
399    /// Only SELECT statements are allowed. All other statement types (DDL, DML,
400    /// SHOW, etc.) are rejected at parse time.
401    pub async fn query(&self, sql: &str) -> Result<Vec<RecordBatch>, DatasetEngineError> {
402        validate_sql(sql)?;
403        let df = self.query_ctx.sql(sql).await?;
404        let batches = df.collect().await?;
405        Ok(batches)
406    }
407
408    /// List all registered datasets (from registry cache, not just active engines).
409    pub fn list_datasets(&self) -> Vec<DatasetRegistration> {
410        self.registry.list_active()
411    }
412
413    /// Get registration info for a specific dataset.
414    pub fn get_dataset_info(&self, namespace: &DatasetNamespace) -> Option<DatasetRegistration> {
415        self.registry.get_by_namespace(namespace)
416    }
417
418    // ── Catalog Browser APIs ────────────────────────────────────────────
419
420    /// List all distinct catalogs with schema and table counts.
421    pub fn list_catalogs(&self) -> Vec<CatalogSummary> {
422        let datasets = self.registry.list_active();
423        let mut catalog_map: HashMap<String, (HashSet<String>, u32)> = HashMap::new();
424
425        for d in &datasets {
426            let entry = catalog_map
427                .entry(d.namespace.catalog.clone())
428                .or_insert_with(|| (HashSet::new(), 0));
429            entry.0.insert(d.namespace.schema_name.clone());
430            entry.1 += 1;
431        }
432
433        catalog_map
434            .into_iter()
435            .map(|(catalog, (schemas, table_count))| CatalogSummary {
436                catalog,
437                schema_count: schemas.len() as u32,
438                table_count,
439            })
440            .collect()
441    }
442
443    /// List schemas within a catalog with table counts.
444    pub fn list_schemas(&self, catalog: &str) -> Vec<SchemaSummary> {
445        let datasets = self.registry.list_active();
446        let mut schema_map: HashMap<String, u32> = HashMap::new();
447
448        for d in datasets.iter().filter(|d| d.namespace.catalog == catalog) {
449            *schema_map
450                .entry(d.namespace.schema_name.clone())
451                .or_insert(0) += 1;
452        }
453
454        schema_map
455            .into_iter()
456            .map(|(schema_name, table_count)| SchemaSummary {
457                catalog: catalog.to_string(),
458                schema_name,
459                table_count,
460            })
461            .collect()
462    }
463
464    /// List tables within a catalog.schema with summary info.
465    pub fn list_tables(&self, catalog: &str, schema_name: &str) -> Vec<TableSummaryInfo> {
466        self.registry
467            .list_active()
468            .into_iter()
469            .filter(|d| d.namespace.catalog == catalog && d.namespace.schema_name == schema_name)
470            .map(|d| TableSummaryInfo {
471                catalog: d.namespace.catalog,
472                schema_name: d.namespace.schema_name,
473                table: d.namespace.table,
474                status: d.status.to_string(),
475                created_at: d.created_at.to_rfc3339(),
476                updated_at: d.updated_at.to_rfc3339(),
477            })
478            .collect()
479    }
480
481    /// Get detailed info for a table: columns, partition info, and Delta stats.
482    pub async fn get_table_detail(
483        &self,
484        namespace: &DatasetNamespace,
485    ) -> Result<TableDetail, DatasetEngineError> {
486        let reg = self
487            .registry
488            .get_by_namespace(namespace)
489            .ok_or_else(|| DatasetEngineError::TableNotFound(namespace.fqn()))?;
490
491        // Parse Arrow schema from registration
492        let arrow_schema: arrow::datatypes::Schema = serde_json::from_str(&reg.arrow_schema_json)
493            .map_err(|e| {
494            DatasetEngineError::SerializationError(format!(
495                "Failed to deserialize Arrow schema: {e}"
496            ))
497        })?;
498
499        let partition_set: HashSet<&str> =
500            reg.partition_columns.iter().map(|s| s.as_str()).collect();
501        let system_cols: HashSet<&str> =
502            [SCOUTER_CREATED_AT, SCOUTER_PARTITION_DATE, SCOUTER_BATCH_ID]
503                .into_iter()
504                .collect();
505
506        let columns: Vec<ColumnDetail> = arrow_schema
507            .fields()
508            .iter()
509            .map(|f| ColumnDetail {
510                name: f.name().clone(),
511                arrow_type: format!("{}", f.data_type()),
512                nullable: f.is_nullable(),
513                is_partition: partition_set.contains(f.name().as_str()),
514                is_system: system_cols.contains(f.name().as_str()),
515            })
516            .collect();
517
518        // Load stats from Delta log (transient load for inactive tables)
519        let table_stats = stats::load_table_stats(&self.object_store, namespace).await?;
520
521        Ok(TableDetail {
522            registration: reg,
523            columns,
524            stats: table_stats,
525        })
526    }
527
528    /// Preview a table's data (SELECT * LIMIT max_rows).
529    pub async fn preview_table(
530        &self,
531        namespace: &DatasetNamespace,
532        max_rows: usize,
533    ) -> Result<Vec<RecordBatch>, DatasetEngineError> {
534        let max_rows = max_rows.min(1000);
535        let sql = format!(
536            "SELECT * FROM {} LIMIT {}",
537            namespace.quoted_fqn(),
538            max_rows
539        );
540        self.activate_engine(namespace).await?;
541        let df = self.query_ctx.sql(&sql).await?;
542        let batches = df.collect().await?;
543        Ok(batches)
544    }
545
546    // ── Enhanced Query Execution ────────────────────────────────────────
547
548    /// Execute a SQL query with row limits, cancellation support, and metadata.
549    pub async fn execute_query(
550        &self,
551        sql: &str,
552        query_id: &str,
553        max_rows: usize,
554    ) -> Result<QueryResult, DatasetEngineError> {
555        validate_sql(sql)?;
556        let max_rows = max_rows.clamp(1, 100_000);
557
558        let cancel_token = self.query_tracker.register(query_id).await?;
559        let start = Instant::now();
560
561        let exec_result: Result<_, DatasetEngineError> = async {
562            let df = self.query_ctx.sql(sql).await?;
563            // Request max_rows + 1 to detect truncation
564            let limited_df = df.limit(0, Some(max_rows + 1))?;
565            tokio::select! {
566                result = limited_df.collect() => result.map_err(DatasetEngineError::from),
567                _ = cancel_token.cancelled() => {
568                    Err(DatasetEngineError::QueryCancelled(query_id.to_string()))
569                }
570            }
571        }
572        .await;
573
574        self.query_tracker.remove(query_id).await;
575        let batches = exec_result?;
576
577        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
578        let truncated = total_rows > max_rows;
579
580        // If truncated, we need to trim the last batch
581        let final_batches = if truncated {
582            let mut remaining = max_rows;
583            let mut result = Vec::new();
584            for batch in batches {
585                if remaining == 0 {
586                    break;
587                }
588                if batch.num_rows() <= remaining {
589                    remaining -= batch.num_rows();
590                    result.push(batch);
591                } else {
592                    result.push(batch.slice(0, remaining));
593                    remaining = 0;
594                }
595            }
596            result
597        } else {
598            batches
599        };
600
601        let rows_returned: usize = final_batches.iter().map(|b| b.num_rows()).sum();
602
603        Ok(QueryResult {
604            batches: final_batches,
605            metadata: QueryExecutionMetadata {
606                query_id: query_id.to_string(),
607                rows_returned: rows_returned as u64,
608                truncated,
609                execution_time_ms: start.elapsed().as_millis() as u64,
610                bytes_scanned: None,
611            },
612        })
613    }
614
615    /// Cancel a running query by ID.
616    pub async fn cancel_query(&self, query_id: &str) -> bool {
617        self.query_tracker.cancel(query_id).await
618    }
619
620    // ── Query Plan ──────────────────────────────────────────────────────
621
622    /// Generate a structured query plan, optionally with ANALYZE execution.
623    pub async fn explain_query(
624        &self,
625        sql: &str,
626        analyze: bool,
627        max_rows: usize,
628    ) -> Result<ExplainResult, DatasetEngineError> {
629        validate_sql(sql)?;
630        let df = self.query_ctx.sql(sql).await?;
631
632        // Logical plan (optimized)
633        let logical_plan = df.logical_plan().clone();
634        let logical_tree = logical_plan_to_tree(&logical_plan);
635        let logical_text = sanitize_plan_text(&format!("{}", logical_plan.display_indent()));
636
637        // Physical plan
638        let physical_plan = df.create_physical_plan().await?;
639        let physical_tree = physical_plan_to_tree(physical_plan.as_ref());
640        let physical_text =
641            sanitize_plan_text(&displayable(physical_plan.as_ref()).indent(true).to_string());
642
643        let execution_metadata = if analyze {
644            let max_rows = max_rows.clamp(1, 100_000);
645            let analyze_df = self.query_ctx.sql(sql).await?;
646            let limited = analyze_df.limit(0, Some(max_rows + 1))?;
647            let start = Instant::now();
648            let batches = limited.collect().await?;
649            let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
650
651            Some(QueryExecutionMetadata {
652                query_id: String::new(),
653                rows_returned: rows.min(max_rows) as u64,
654                truncated: rows > max_rows,
655                execution_time_ms: start.elapsed().as_millis() as u64,
656                bytes_scanned: None,
657            })
658        } else {
659            None
660        };
661
662        Ok(ExplainResult {
663            logical_plan: logical_tree,
664            physical_plan: physical_tree,
665            logical_plan_text: logical_text,
666            physical_plan_text: physical_text,
667            execution_metadata,
668        })
669    }
670
671    /// Evict the least-recently-used engine.
672    async fn evict_lru(&self) {
673        let lru_fqn = self
674            .active_engines
675            .iter()
676            .min_by_key(|e| e.value().last_active_at.load(Ordering::Relaxed))
677            .map(|e| e.key().clone());
678
679        if let Some(fqn) = lru_fqn {
680            self.evict_engine(&fqn).await;
681        }
682    }
683
684    /// Evict a specific engine by FQN.
685    async fn evict_engine(&self, fqn: &str) {
686        if let Some((_, handle)) = self.active_engines.remove(fqn) {
687            info!("Evicting engine [{}]", fqn);
688
689            // 1. Signal buffer to flush remaining batches and exit
690            let _ = handle.shutdown_tx.send(()).await;
691
692            // 2. Wait for buffer to complete its final flush before shutting down the engine.
693            //    This ensures all buffered Write commands are in the engine channel.
694            let _ = handle.buffer_handle.await;
695
696            // 3. Now shut down the engine — all buffered Writes are queued (mpsc FIFO)
697            let _ = handle.engine_tx.send(TableCommand::Shutdown).await;
698            let _ = handle.engine_handle.await;
699
700            // 4. Remove from catalog
701            self.catalog_provider.remove_table(&handle.namespace);
702        }
703    }
704
705    /// Shutdown all active engines gracefully.
706    pub async fn shutdown(&self) {
707        info!(
708            "Shutting down DatasetEngineManager ({} active engines)",
709            self.active_engines.len()
710        );
711
712        let fqns: Vec<String> = self
713            .active_engines
714            .iter()
715            .map(|e| e.key().clone())
716            .collect();
717
718        for fqn in fqns {
719            self.evict_engine(&fqn).await;
720        }
721    }
722
723    /// Start the reaper loop that evicts idle engines.
724    ///
725    /// Returns a future suitable for `TaskManager::spawn()`. The loop exits
726    /// when the shutdown receiver fires.
727    pub fn start_reaper_loop(
728        self: &Arc<Self>,
729        mut shutdown_rx: tokio::sync::watch::Receiver<()>,
730    ) -> impl std::future::Future<Output = ()> + Send + 'static {
731        let manager = Arc::clone(self);
732        async move {
733            let mut ticker = interval(Duration::from_secs(REAPER_INTERVAL_SECS));
734            ticker.tick().await; // skip immediate
735
736            loop {
737                tokio::select! {
738                    _ = ticker.tick() => {
739                        let now = chrono::Utc::now().timestamp();
740                        let ttl = manager.engine_ttl_secs as i64;
741
742                        let to_evict: Vec<String> = manager
743                            .active_engines
744                            .iter()
745                            .filter(|e| now - e.value().last_active_at.load(Ordering::Relaxed) > ttl)
746                            .map(|e| e.key().clone())
747                            .collect();
748
749                        for fqn in to_evict {
750                            manager.evict_engine(&fqn).await;
751                        }
752                    }
753                    _ = shutdown_rx.changed() => {
754                        info!("Reaper loop shutting down");
755                        break;
756                    }
757                }
758            }
759        }
760    }
761
762    /// Start the discovery loop that refreshes the registry from other pods.
763    ///
764    /// Returns a future suitable for `TaskManager::spawn()`. The loop exits
765    /// when the shutdown receiver fires.
766    pub fn start_discovery_loop(
767        self: &Arc<Self>,
768        mut shutdown_rx: tokio::sync::watch::Receiver<()>,
769    ) -> impl std::future::Future<Output = ()> + Send + 'static {
770        let manager = Arc::clone(self);
771        async move {
772            let mut ticker = interval(Duration::from_secs(DISCOVERY_INTERVAL_SECS));
773            ticker.tick().await; // skip immediate
774
775            loop {
776                tokio::select! {
777                    _ = ticker.tick() => {
778                        if let Err(e) = manager.registry.refresh().await {
779                            warn!("Registry discovery refresh failed: {}", e);
780                        }
781
782                        // Register any new catalogs discovered
783                        for reg in manager.registry.list_active() {
784                            manager.ensure_catalog_registered(&reg.namespace.catalog);
785                        }
786                    }
787                    _ = shutdown_rx.changed() => {
788                        info!("Discovery loop shutting down");
789                        break;
790                    }
791                }
792            }
793        }
794    }
795
796    /// Access the shared query context (for Phase 3 gRPC/HTTP integration).
797    pub fn query_ctx(&self) -> &Arc<SessionContext> {
798        &self.query_ctx
799    }
800
801    /// Access the registry (for Phase 3 gRPC/HTTP integration).
802    pub fn registry(&self) -> &Arc<DatasetRegistry> {
803        &self.registry
804    }
805
806    /// Number of currently active engines.
807    pub fn active_engine_count(&self) -> usize {
808        self.active_engines.len()
809    }
810
811    /// Register a catalog name with DataFusion (idempotent).
812    fn ensure_catalog_registered(&self, catalog: &str) {
813        self.query_ctx.register_catalog(
814            catalog,
815            Arc::clone(&self.catalog_provider) as Arc<dyn datafusion::catalog::CatalogProvider>,
816        );
817    }
818}
819
820// ── Catalog browser types ──────────────────────────────────────────────
821
822#[derive(Debug, Clone, serde::Serialize)]
823pub struct CatalogSummary {
824    pub catalog: String,
825    pub schema_count: u32,
826    pub table_count: u32,
827}
828
829#[derive(Debug, Clone, serde::Serialize)]
830pub struct SchemaSummary {
831    pub catalog: String,
832    pub schema_name: String,
833    pub table_count: u32,
834}
835
836#[derive(Debug, Clone, serde::Serialize)]
837pub struct TableSummaryInfo {
838    pub catalog: String,
839    pub schema_name: String,
840    pub table: String,
841    pub status: String,
842    pub created_at: String,
843    pub updated_at: String,
844}
845
846#[derive(Debug, Clone, serde::Serialize)]
847pub struct ColumnDetail {
848    pub name: String,
849    pub arrow_type: String,
850    pub nullable: bool,
851    pub is_partition: bool,
852    pub is_system: bool,
853}
854
855pub struct TableDetail {
856    pub registration: DatasetRegistration,
857    pub columns: Vec<ColumnDetail>,
858    pub stats: stats::TableStats,
859}
860
861#[cfg(test)]
862mod tests {
863    use super::*;
864    use arrow::array::AsArray;
865    use arrow::datatypes::{DataType, Field, Int64Type, Schema, TimeUnit};
866    use scouter_types::dataset::{DatasetFingerprint, DatasetRegistration};
867    use tempfile::TempDir;
868
869    fn test_storage_settings(dir: &TempDir) -> ObjectStorageSettings {
870        ObjectStorageSettings {
871            storage_uri: dir.path().to_str().unwrap().to_string(),
872            storage_type: scouter_types::StorageType::Local,
873            region: "us-east-1".to_string(),
874            trace_compaction_interval_hours: 24,
875            trace_flush_interval_secs: 5,
876            trace_refresh_interval_secs: 10,
877        }
878    }
879
880    fn test_schema() -> Schema {
881        Schema::new(vec![
882            Field::new("user_id", DataType::Utf8, false),
883            Field::new("score", DataType::Float64, false),
884            Field::new("model_name", DataType::Utf8, true),
885            // System columns
886            Field::new(
887                "scouter_created_at",
888                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
889                false,
890            ),
891            Field::new("scouter_partition_date", DataType::Date32, false),
892            Field::new("scouter_batch_id", DataType::Utf8, false),
893        ])
894    }
895
896    fn test_registration(schema: &Schema) -> DatasetRegistration {
897        let arrow_schema_json = serde_json::to_string(schema).unwrap();
898        let fingerprint = DatasetFingerprint::from_schema_json(&arrow_schema_json);
899        let namespace =
900            DatasetNamespace::new("test_catalog", "test_schema", "predictions").unwrap();
901
902        DatasetRegistration::new(
903            namespace,
904            fingerprint,
905            arrow_schema_json,
906            "{}".to_string(),
907            vec![],
908        )
909    }
910
911    fn make_test_batch(schema: &Schema) -> RecordBatch {
912        use arrow::array::*;
913        use chrono::{Datelike, Utc};
914
915        let now = Utc::now();
916        let epoch_days = now.date_naive().num_days_from_ce() - 719_163;
917
918        RecordBatch::try_new(
919            Arc::new(schema.clone()),
920            vec![
921                Arc::new(StringArray::from(vec!["user_1", "user_2", "user_3"])),
922                Arc::new(Float64Array::from(vec![0.95, 0.87, 0.92])),
923                Arc::new(StringArray::from(vec![
924                    Some("model_a"),
925                    None,
926                    Some("model_b"),
927                ])),
928                Arc::new(
929                    TimestampMicrosecondArray::from(vec![
930                        now.timestamp_micros(),
931                        now.timestamp_micros(),
932                        now.timestamp_micros(),
933                    ])
934                    .with_timezone("UTC"),
935                ),
936                Arc::new(Date32Array::from(vec![epoch_days, epoch_days, epoch_days])),
937                Arc::new(StringArray::from(vec![
938                    "batch-001",
939                    "batch-001",
940                    "batch-001",
941                ])),
942            ],
943        )
944        .unwrap()
945    }
946
947    #[tokio::test]
948    async fn test_register_and_insert() {
949        let dir = TempDir::new().unwrap();
950        let settings = test_storage_settings(&dir);
951
952        let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
953            .await
954            .unwrap();
955
956        let schema = test_schema();
957        let reg = test_registration(&schema);
958
959        // Register
960        let result = manager.register_dataset(&reg).await.unwrap();
961        assert_eq!(result, RegistrationResult::Created);
962
963        // Idempotent re-register
964        let result2 = manager.register_dataset(&reg).await.unwrap();
965        assert_eq!(result2, RegistrationResult::AlreadyExists);
966
967        // No engines spawned yet (lazy)
968        assert_eq!(manager.active_engine_count(), 0);
969
970        // Insert a batch — triggers lazy activation
971        let batch = make_test_batch(&schema);
972        manager
973            .insert_batch(&reg.namespace, &reg.fingerprint, batch)
974            .await
975            .unwrap();
976
977        // Engine should now be active
978        assert_eq!(manager.active_engine_count(), 1);
979
980        // Wait for buffer to flush (flush interval = 1s in test config)
981        tokio::time::sleep(Duration::from_secs(2)).await;
982
983        // Shutdown cleanly
984        manager.shutdown().await;
985        assert_eq!(manager.active_engine_count(), 0);
986    }
987
988    #[tokio::test]
989    async fn test_fingerprint_mismatch() {
990        let dir = TempDir::new().unwrap();
991        let settings = test_storage_settings(&dir);
992
993        let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
994            .await
995            .unwrap();
996
997        let schema = test_schema();
998        let reg = test_registration(&schema);
999        manager.register_dataset(&reg).await.unwrap();
1000
1001        // Try inserting with wrong fingerprint
1002        let wrong_fp = DatasetFingerprint::from_schema_json("wrong");
1003        let batch = make_test_batch(&schema);
1004
1005        let result = manager.insert_batch(&reg.namespace, &wrong_fp, batch).await;
1006
1007        assert!(result.is_err());
1008        if let Err(DatasetEngineError::FingerprintMismatch { .. }) = result {
1009            // expected
1010        } else {
1011            panic!("Expected FingerprintMismatch error");
1012        }
1013
1014        manager.shutdown().await;
1015    }
1016
1017    #[tokio::test]
1018    async fn test_table_not_found() {
1019        let dir = TempDir::new().unwrap();
1020        let settings = test_storage_settings(&dir);
1021
1022        let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
1023            .await
1024            .unwrap();
1025
1026        let ns = DatasetNamespace::new("no", "such", "table").unwrap();
1027        let fp = DatasetFingerprint::from_schema_json("x");
1028        let schema = test_schema();
1029        let batch = make_test_batch(&schema);
1030
1031        let result = manager.insert_batch(&ns, &fp, batch).await;
1032        assert!(matches!(result, Err(DatasetEngineError::TableNotFound(_))));
1033
1034        manager.shutdown().await;
1035    }
1036
1037    #[tokio::test]
1038    async fn test_list_datasets() {
1039        let dir = TempDir::new().unwrap();
1040        let settings = test_storage_settings(&dir);
1041
1042        let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
1043            .await
1044            .unwrap();
1045
1046        assert!(manager.list_datasets().is_empty());
1047
1048        let schema = test_schema();
1049        let reg = test_registration(&schema);
1050        manager.register_dataset(&reg).await.unwrap();
1051
1052        let datasets = manager.list_datasets();
1053        assert_eq!(datasets.len(), 1);
1054        assert_eq!(
1055            datasets[0].namespace.fqn(),
1056            "test_catalog.test_schema.predictions"
1057        );
1058
1059        manager.shutdown().await;
1060    }
1061
1062    #[tokio::test]
1063    async fn test_multiple_tables_isolation() {
1064        let dir = TempDir::new().unwrap();
1065        let settings = test_storage_settings(&dir);
1066
1067        let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
1068            .await
1069            .unwrap();
1070
1071        let schema = test_schema();
1072
1073        // Register two different tables
1074        let ns1 = DatasetNamespace::new("cat", "sch", "table_a").unwrap();
1075        let ns2 = DatasetNamespace::new("cat", "sch", "table_b").unwrap();
1076        let arrow_json = serde_json::to_string(&schema).unwrap();
1077        let fp = DatasetFingerprint::from_schema_json(&arrow_json);
1078
1079        let reg1 = DatasetRegistration::new(
1080            ns1.clone(),
1081            fp.clone(),
1082            arrow_json.clone(),
1083            "{}".into(),
1084            vec![],
1085        );
1086        let reg2 = DatasetRegistration::new(
1087            ns2.clone(),
1088            fp.clone(),
1089            arrow_json.clone(),
1090            "{}".into(),
1091            vec![],
1092        );
1093
1094        manager.register_dataset(&reg1).await.unwrap();
1095        manager.register_dataset(&reg2).await.unwrap();
1096
1097        // Insert into both
1098        let batch1 = make_test_batch(&schema);
1099        let batch2 = make_test_batch(&schema);
1100        manager.insert_batch(&ns1, &fp, batch1).await.unwrap();
1101        manager.insert_batch(&ns2, &fp, batch2).await.unwrap();
1102
1103        assert_eq!(manager.active_engine_count(), 2);
1104
1105        manager.shutdown().await;
1106    }
1107
1108    #[tokio::test]
1109    async fn test_max_active_engines_cap() {
1110        let dir = TempDir::new().unwrap();
1111        let settings = test_storage_settings(&dir);
1112
1113        // Cap at 2 active engines
1114        let manager = DatasetEngineManager::with_config(&settings, 1800, 2, 1, 100, 30)
1115            .await
1116            .unwrap();
1117
1118        let schema = test_schema();
1119        let arrow_json = serde_json::to_string(&schema).unwrap();
1120        let fp = DatasetFingerprint::from_schema_json(&arrow_json);
1121
1122        // Register 3 tables
1123        for i in 0..3 {
1124            let ns = DatasetNamespace::new("cat", "sch", format!("tbl_{i}")).unwrap();
1125            let reg =
1126                DatasetRegistration::new(ns, fp.clone(), arrow_json.clone(), "{}".into(), vec![]);
1127            manager.register_dataset(&reg).await.unwrap();
1128        }
1129
1130        // Activate first two
1131        let ns0 = DatasetNamespace::new("cat", "sch", "tbl_0").unwrap();
1132        let ns1 = DatasetNamespace::new("cat", "sch", "tbl_1").unwrap();
1133        let ns2 = DatasetNamespace::new("cat", "sch", "tbl_2").unwrap();
1134
1135        manager
1136            .insert_batch(&ns0, &fp, make_test_batch(&schema))
1137            .await
1138            .unwrap();
1139        manager
1140            .insert_batch(&ns1, &fp, make_test_batch(&schema))
1141            .await
1142            .unwrap();
1143
1144        assert_eq!(manager.active_engine_count(), 2);
1145
1146        // Third should evict the LRU
1147        manager
1148            .insert_batch(&ns2, &fp, make_test_batch(&schema))
1149            .await
1150            .unwrap();
1151
1152        // Still at cap
1153        assert_eq!(manager.active_engine_count(), 2);
1154
1155        manager.shutdown().await;
1156    }
1157
1158    #[tokio::test]
1159    async fn test_write_and_query() {
1160        let dir = TempDir::new().unwrap();
1161        let settings = test_storage_settings(&dir);
1162
1163        let manager = DatasetEngineManager::with_config(
1164            &settings, 1800, 10, 1,   // 1s flush interval
1165            100, // small buffer for testing
1166            30,
1167        )
1168        .await
1169        .unwrap();
1170
1171        let schema = test_schema();
1172        let reg = test_registration(&schema);
1173        manager.register_dataset(&reg).await.unwrap();
1174
1175        // Insert data
1176        let batch = make_test_batch(&schema);
1177        manager
1178            .insert_batch(&reg.namespace, &reg.fingerprint, batch)
1179            .await
1180            .unwrap();
1181
1182        // Wait for buffer flush + write
1183        tokio::time::sleep(Duration::from_secs(3)).await;
1184
1185        // Query via three-level name
1186        let sql = "SELECT COUNT(*) as cnt FROM test_catalog.test_schema.predictions";
1187        let results = manager.query(sql).await.unwrap();
1188
1189        assert!(!results.is_empty());
1190        let count_col = results[0]
1191            .column_by_name("cnt")
1192            .unwrap()
1193            .as_primitive_opt::<Int64Type>()
1194            .unwrap();
1195        assert_eq!(count_col.value(0), 3);
1196
1197        manager.shutdown().await;
1198    }
1199
1200    #[tokio::test]
1201    async fn test_registry_persistence() {
1202        let dir = TempDir::new().unwrap();
1203        let settings = test_storage_settings(&dir);
1204
1205        // Register a dataset
1206        {
1207            let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
1208                .await
1209                .unwrap();
1210
1211            let schema = test_schema();
1212            let reg = test_registration(&schema);
1213            manager.register_dataset(&reg).await.unwrap();
1214            manager.shutdown().await;
1215        }
1216
1217        // Create a new manager from same storage — should find the registration
1218        {
1219            let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
1220                .await
1221                .unwrap();
1222
1223            let datasets = manager.list_datasets();
1224            assert_eq!(datasets.len(), 1);
1225            assert_eq!(
1226                datasets[0].namespace.fqn(),
1227                "test_catalog.test_schema.predictions"
1228            );
1229
1230            manager.shutdown().await;
1231        }
1232    }
1233}