use crate::error::DatasetEngineError;
use crate::parquet::bifrost::buffer::start_buffer;
use crate::parquet::bifrost::catalog::DatasetCatalogProvider;
use crate::parquet::bifrost::engine::{DatasetEngine, TableCommand};
use crate::parquet::bifrost::explain::{
logical_plan_to_tree, physical_plan_to_tree, sanitize_plan_text, ExplainResult,
};
use crate::parquet::bifrost::query::{QueryExecutionMetadata, QueryResult, QueryTracker};
use crate::parquet::bifrost::registry::{DatasetRegistry, RegistrationResult};
use crate::parquet::bifrost::stats;
use crate::storage::ObjectStore;
use arrow::datatypes::SchemaRef;
use arrow_array::RecordBatch;
use dashmap::DashMap;
use datafusion::physical_plan::displayable;
use datafusion::prelude::SessionContext;
use scouter_settings::ObjectStorageSettings;
use scouter_types::dataset::schema::{
SCOUTER_BATCH_ID, SCOUTER_CREATED_AT, SCOUTER_PARTITION_DATE,
};
use scouter_types::dataset::{DatasetFingerprint, DatasetNamespace, DatasetRegistration};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::{mpsc, Mutex, Notify};
use tokio::time::{interval, Duration};
use tracing::{info, warn};
const DEFAULT_ENGINE_TTL_SECS: u64 = 30 * 60; const DEFAULT_MAX_ACTIVE_ENGINES: usize = 50;
const DEFAULT_FLUSH_INTERVAL_SECS: u64 = 60;
const DEFAULT_MAX_BUFFER_ROWS: usize = 10_000;
const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 30;
const REAPER_INTERVAL_SECS: u64 = 5 * 60; const DISCOVERY_INTERVAL_SECS: u64 = 60;
pub struct DatasetTableHandle {
pub buffer_tx: mpsc::Sender<RecordBatch>,
pub engine_tx: mpsc::Sender<TableCommand>,
shutdown_tx: mpsc::Sender<()>,
pub schema: SchemaRef,
pub fingerprint: DatasetFingerprint,
pub namespace: DatasetNamespace,
pub partition_columns: Vec<String>,
pub last_active_at: Arc<AtomicI64>,
engine_handle: tokio::task::JoinHandle<()>,
buffer_handle: tokio::task::JoinHandle<()>,
}
impl DatasetTableHandle {
fn touch(&self) {
self.last_active_at
.store(chrono::Utc::now().timestamp(), Ordering::Relaxed);
}
}
pub struct DatasetEngineManager {
registry: Arc<DatasetRegistry>,
active_engines: Arc<DashMap<String, DatasetTableHandle>>,
activating: Arc<Mutex<HashMap<String, Arc<Notify>>>>,
query_ctx: Arc<SessionContext>,
catalog_provider: Arc<DatasetCatalogProvider>,
object_store: ObjectStore,
query_tracker: QueryTracker,
engine_ttl_secs: u64,
max_active_engines: usize,
flush_interval_secs: u64,
max_buffer_rows: usize,
refresh_interval_secs: u64,
}
fn validate_sql(sql: &str) -> Result<(), DatasetEngineError> {
use datafusion::sql::parser::{DFParser, Statement as DFStatement};
use datafusion::sql::sqlparser::ast::Statement as SqlStatement;
let statements = DFParser::parse_sql(sql)
.map_err(|e| DatasetEngineError::SqlValidationError(format!("Failed to parse SQL: {e}")))?;
if statements.len() != 1 {
return Err(DatasetEngineError::SqlValidationError(
"Exactly one SQL statement is required".to_string(),
));
}
match &statements[0] {
DFStatement::Statement(stmt) => match stmt.as_ref() {
SqlStatement::Query(_) => Ok(()),
SqlStatement::Copy { .. }
| SqlStatement::CreateTable(_)
| SqlStatement::Drop { .. }
| SqlStatement::Insert(_)
| SqlStatement::Update { .. }
| SqlStatement::Delete(_) => Err(DatasetEngineError::SqlValidationError(
"DDL and DML statements are not permitted".to_string(),
)),
other => Err(DatasetEngineError::SqlValidationError(format!(
"Only SELECT queries are allowed, got: {}",
other
))),
},
_ => Err(DatasetEngineError::SqlValidationError(
"Only SELECT queries are allowed".to_string(),
)),
}
}
impl DatasetEngineManager {
pub async fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DatasetEngineError> {
let object_store = ObjectStore::new(storage_settings)?;
let query_ctx = Arc::new(object_store.get_session()?);
let catalog_provider = Arc::new(DatasetCatalogProvider::new());
let registry = Arc::new(DatasetRegistry::new(&object_store).await?);
let flush_interval_secs = std::env::var("SCOUTER_DATASET_FLUSH_INTERVAL_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_FLUSH_INTERVAL_SECS);
let manager = Self {
registry,
active_engines: Arc::new(DashMap::new()),
activating: Arc::new(Mutex::new(HashMap::new())),
query_ctx,
catalog_provider,
object_store,
query_tracker: QueryTracker::new(),
engine_ttl_secs: DEFAULT_ENGINE_TTL_SECS,
max_active_engines: DEFAULT_MAX_ACTIVE_ENGINES,
flush_interval_secs,
max_buffer_rows: DEFAULT_MAX_BUFFER_ROWS,
refresh_interval_secs: DEFAULT_REFRESH_INTERVAL_SECS,
};
for reg in manager.registry.list_active() {
manager.ensure_catalog_registered(®.namespace.catalog);
}
Ok(manager)
}
pub async fn with_config(
storage_settings: &ObjectStorageSettings,
engine_ttl_secs: u64,
max_active_engines: usize,
flush_interval_secs: u64,
max_buffer_rows: usize,
refresh_interval_secs: u64,
) -> Result<Self, DatasetEngineError> {
let mut manager = Self::new(storage_settings).await?;
manager.engine_ttl_secs = engine_ttl_secs;
manager.max_active_engines = max_active_engines;
manager.flush_interval_secs = flush_interval_secs;
manager.max_buffer_rows = max_buffer_rows;
manager.refresh_interval_secs = refresh_interval_secs;
Ok(manager)
}
pub async fn register_dataset(
&self,
registration: &DatasetRegistration,
) -> Result<RegistrationResult, DatasetEngineError> {
let result = self.registry.register(registration).await?;
self.ensure_catalog_registered(®istration.namespace.catalog);
Ok(result)
}
async fn activate_engine(
&self,
namespace: &DatasetNamespace,
) -> Result<(), DatasetEngineError> {
let fqn = namespace.fqn();
if let Some(handle) = self.active_engines.get(&fqn) {
handle.touch();
return Ok(());
}
{
let mut pending = self.activating.lock().await;
if let Some(handle) = self.active_engines.get(&fqn) {
handle.touch();
return Ok(());
}
if let Some(notify) = pending.get(&fqn) {
let notify = Arc::clone(notify);
let notified = notify.notified();
tokio::pin!(notified);
notified.as_mut().enable();
drop(pending);
match tokio::time::timeout(Duration::from_secs(30), notified).await {
Ok(_) => {}
Err(_) => {
return Err(DatasetEngineError::RegistryError(format!(
"Engine activation timed out for {fqn}"
)));
}
}
return if self.active_engines.contains_key(&fqn) {
Ok(())
} else {
Err(DatasetEngineError::RegistryError(format!(
"Activation failed for {fqn}"
)))
};
}
pending.insert(fqn.clone(), Arc::new(Notify::new()));
}
let (done_tx, done_rx) = tokio::sync::oneshot::channel::<()>();
let activating = Arc::clone(&self.activating);
let fqn_for_cleanup = fqn.clone();
tokio::spawn(async move {
let _ = done_rx.await;
let mut pending = activating.lock().await;
if let Some(notify) = pending.remove(&fqn_for_cleanup) {
notify.notify_waiters();
}
});
let result = self.do_activate_engine_inner(namespace, &fqn).await;
let _ = done_tx.send(()); result
}
async fn do_activate_engine_inner(
&self,
namespace: &DatasetNamespace,
fqn: &str,
) -> Result<(), DatasetEngineError> {
let reg = self
.registry
.get(fqn)
.ok_or_else(|| DatasetEngineError::TableNotFound(fqn.to_string()))?;
if self.active_engines.len() >= self.max_active_engines {
self.evict_lru().await;
}
let arrow_schema: arrow::datatypes::Schema = serde_json::from_str(®.arrow_schema_json)
.map_err(|e| {
DatasetEngineError::SerializationError(format!(
"Failed to deserialize Arrow schema for {}: {}",
fqn, e
))
})?;
let schema = Arc::new(arrow_schema);
let mut partition_columns = vec![SCOUTER_PARTITION_DATE.to_string()];
for col in ®.partition_columns {
if !partition_columns.contains(col) {
partition_columns.push(col.clone());
}
}
let engine = DatasetEngine::new(
&self.object_store,
schema.clone(),
namespace.clone(),
partition_columns.clone(),
Arc::clone(&self.catalog_provider),
)
.await?;
let (engine_tx, engine_handle) = engine.start_actor(self.refresh_interval_secs);
let (buffer_tx, batch_rx) = mpsc::channel::<RecordBatch>(100);
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
let buffer_handle = start_buffer(
engine_tx.clone(),
batch_rx,
shutdown_rx,
self.flush_interval_secs,
self.max_buffer_rows,
fqn.to_string(),
);
self.ensure_catalog_registered(&namespace.catalog);
let handle = DatasetTableHandle {
buffer_tx,
engine_tx,
shutdown_tx,
schema,
fingerprint: reg.fingerprint.clone(),
namespace: namespace.clone(),
partition_columns,
last_active_at: Arc::new(AtomicI64::new(chrono::Utc::now().timestamp())),
engine_handle,
buffer_handle,
};
self.active_engines.insert(fqn.to_string(), handle);
info!("Activated engine for [{}]", fqn);
Ok(())
}
pub async fn insert_batch(
&self,
namespace: &DatasetNamespace,
fingerprint: &DatasetFingerprint,
batch: RecordBatch,
) -> Result<(), DatasetEngineError> {
let fqn = namespace.fqn();
self.activate_engine(namespace).await?;
let handle = self
.active_engines
.get(&fqn)
.ok_or_else(|| DatasetEngineError::TableNotFound(fqn.clone()))?;
if handle.fingerprint.as_str() != fingerprint.as_str() {
warn!(
table = %fqn,
"Fingerprint mismatch: expected={}, actual={}",
handle.fingerprint.as_str(),
fingerprint.as_str()
);
return Err(DatasetEngineError::FingerprintMismatch {
table: fqn,
expected: handle.fingerprint.as_str().to_string(),
actual: fingerprint.as_str().to_string(),
});
}
handle.touch();
handle
.buffer_tx
.send(batch)
.await
.map_err(|_| DatasetEngineError::ChannelClosed)?;
Ok(())
}
pub async fn query(&self, sql: &str) -> Result<Vec<RecordBatch>, DatasetEngineError> {
validate_sql(sql)?;
let df = self.query_ctx.sql(sql).await?;
let batches = df.collect().await?;
Ok(batches)
}
pub fn list_datasets(&self) -> Vec<DatasetRegistration> {
self.registry.list_active()
}
pub fn get_dataset_info(&self, namespace: &DatasetNamespace) -> Option<DatasetRegistration> {
self.registry.get_by_namespace(namespace)
}
pub fn list_catalogs(&self) -> Vec<CatalogSummary> {
let datasets = self.registry.list_active();
let mut catalog_map: HashMap<String, (HashSet<String>, u32)> = HashMap::new();
for d in &datasets {
let entry = catalog_map
.entry(d.namespace.catalog.clone())
.or_insert_with(|| (HashSet::new(), 0));
entry.0.insert(d.namespace.schema_name.clone());
entry.1 += 1;
}
catalog_map
.into_iter()
.map(|(catalog, (schemas, table_count))| CatalogSummary {
catalog,
schema_count: schemas.len() as u32,
table_count,
})
.collect()
}
pub fn list_schemas(&self, catalog: &str) -> Vec<SchemaSummary> {
let datasets = self.registry.list_active();
let mut schema_map: HashMap<String, u32> = HashMap::new();
for d in datasets.iter().filter(|d| d.namespace.catalog == catalog) {
*schema_map
.entry(d.namespace.schema_name.clone())
.or_insert(0) += 1;
}
schema_map
.into_iter()
.map(|(schema_name, table_count)| SchemaSummary {
catalog: catalog.to_string(),
schema_name,
table_count,
})
.collect()
}
pub fn list_tables(&self, catalog: &str, schema_name: &str) -> Vec<TableSummaryInfo> {
self.registry
.list_active()
.into_iter()
.filter(|d| d.namespace.catalog == catalog && d.namespace.schema_name == schema_name)
.map(|d| TableSummaryInfo {
catalog: d.namespace.catalog,
schema_name: d.namespace.schema_name,
table: d.namespace.table,
status: d.status.to_string(),
created_at: d.created_at.to_rfc3339(),
updated_at: d.updated_at.to_rfc3339(),
})
.collect()
}
pub async fn get_table_detail(
&self,
namespace: &DatasetNamespace,
) -> Result<TableDetail, DatasetEngineError> {
let reg = self
.registry
.get_by_namespace(namespace)
.ok_or_else(|| DatasetEngineError::TableNotFound(namespace.fqn()))?;
let arrow_schema: arrow::datatypes::Schema = serde_json::from_str(®.arrow_schema_json)
.map_err(|e| {
DatasetEngineError::SerializationError(format!(
"Failed to deserialize Arrow schema: {e}"
))
})?;
let partition_set: HashSet<&str> =
reg.partition_columns.iter().map(|s| s.as_str()).collect();
let system_cols: HashSet<&str> =
[SCOUTER_CREATED_AT, SCOUTER_PARTITION_DATE, SCOUTER_BATCH_ID]
.into_iter()
.collect();
let columns: Vec<ColumnDetail> = arrow_schema
.fields()
.iter()
.map(|f| ColumnDetail {
name: f.name().clone(),
arrow_type: format!("{}", f.data_type()),
nullable: f.is_nullable(),
is_partition: partition_set.contains(f.name().as_str()),
is_system: system_cols.contains(f.name().as_str()),
})
.collect();
let table_stats = stats::load_table_stats(&self.object_store, namespace).await?;
Ok(TableDetail {
registration: reg,
columns,
stats: table_stats,
})
}
pub async fn preview_table(
&self,
namespace: &DatasetNamespace,
max_rows: usize,
) -> Result<Vec<RecordBatch>, DatasetEngineError> {
let max_rows = max_rows.min(1000);
let sql = format!(
"SELECT * FROM {} LIMIT {}",
namespace.quoted_fqn(),
max_rows
);
self.activate_engine(namespace).await?;
let df = self.query_ctx.sql(&sql).await?;
let batches = df.collect().await?;
Ok(batches)
}
pub async fn execute_query(
&self,
sql: &str,
query_id: &str,
max_rows: usize,
) -> Result<QueryResult, DatasetEngineError> {
validate_sql(sql)?;
let max_rows = max_rows.clamp(1, 100_000);
let cancel_token = self.query_tracker.register(query_id).await?;
let start = Instant::now();
let exec_result: Result<_, DatasetEngineError> = async {
let df = self.query_ctx.sql(sql).await?;
let limited_df = df.limit(0, Some(max_rows + 1))?;
tokio::select! {
result = limited_df.collect() => result.map_err(DatasetEngineError::from),
_ = cancel_token.cancelled() => {
Err(DatasetEngineError::QueryCancelled(query_id.to_string()))
}
}
}
.await;
self.query_tracker.remove(query_id).await;
let batches = exec_result?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
let truncated = total_rows > max_rows;
let final_batches = if truncated {
let mut remaining = max_rows;
let mut result = Vec::new();
for batch in batches {
if remaining == 0 {
break;
}
if batch.num_rows() <= remaining {
remaining -= batch.num_rows();
result.push(batch);
} else {
result.push(batch.slice(0, remaining));
remaining = 0;
}
}
result
} else {
batches
};
let rows_returned: usize = final_batches.iter().map(|b| b.num_rows()).sum();
Ok(QueryResult {
batches: final_batches,
metadata: QueryExecutionMetadata {
query_id: query_id.to_string(),
rows_returned: rows_returned as u64,
truncated,
execution_time_ms: start.elapsed().as_millis() as u64,
bytes_scanned: None,
},
})
}
pub async fn cancel_query(&self, query_id: &str) -> bool {
self.query_tracker.cancel(query_id).await
}
pub async fn explain_query(
&self,
sql: &str,
analyze: bool,
max_rows: usize,
) -> Result<ExplainResult, DatasetEngineError> {
validate_sql(sql)?;
let df = self.query_ctx.sql(sql).await?;
let logical_plan = df.logical_plan().clone();
let logical_tree = logical_plan_to_tree(&logical_plan);
let logical_text = sanitize_plan_text(&format!("{}", logical_plan.display_indent()));
let physical_plan = df.create_physical_plan().await?;
let physical_tree = physical_plan_to_tree(physical_plan.as_ref());
let physical_text =
sanitize_plan_text(&displayable(physical_plan.as_ref()).indent(true).to_string());
let execution_metadata = if analyze {
let max_rows = max_rows.clamp(1, 100_000);
let analyze_df = self.query_ctx.sql(sql).await?;
let limited = analyze_df.limit(0, Some(max_rows + 1))?;
let start = Instant::now();
let batches = limited.collect().await?;
let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
Some(QueryExecutionMetadata {
query_id: String::new(),
rows_returned: rows.min(max_rows) as u64,
truncated: rows > max_rows,
execution_time_ms: start.elapsed().as_millis() as u64,
bytes_scanned: None,
})
} else {
None
};
Ok(ExplainResult {
logical_plan: logical_tree,
physical_plan: physical_tree,
logical_plan_text: logical_text,
physical_plan_text: physical_text,
execution_metadata,
})
}
async fn evict_lru(&self) {
let lru_fqn = self
.active_engines
.iter()
.min_by_key(|e| e.value().last_active_at.load(Ordering::Relaxed))
.map(|e| e.key().clone());
if let Some(fqn) = lru_fqn {
self.evict_engine(&fqn).await;
}
}
async fn evict_engine(&self, fqn: &str) {
if let Some((_, handle)) = self.active_engines.remove(fqn) {
info!("Evicting engine [{}]", fqn);
let _ = handle.shutdown_tx.send(()).await;
let _ = handle.buffer_handle.await;
let _ = handle.engine_tx.send(TableCommand::Shutdown).await;
let _ = handle.engine_handle.await;
self.catalog_provider.remove_table(&handle.namespace);
}
}
pub async fn shutdown(&self) {
info!(
"Shutting down DatasetEngineManager ({} active engines)",
self.active_engines.len()
);
let fqns: Vec<String> = self
.active_engines
.iter()
.map(|e| e.key().clone())
.collect();
for fqn in fqns {
self.evict_engine(&fqn).await;
}
}
pub fn start_reaper_loop(
self: &Arc<Self>,
mut shutdown_rx: tokio::sync::watch::Receiver<()>,
) -> impl std::future::Future<Output = ()> + Send + 'static {
let manager = Arc::clone(self);
async move {
let mut ticker = interval(Duration::from_secs(REAPER_INTERVAL_SECS));
ticker.tick().await;
loop {
tokio::select! {
_ = ticker.tick() => {
let now = chrono::Utc::now().timestamp();
let ttl = manager.engine_ttl_secs as i64;
let to_evict: Vec<String> = manager
.active_engines
.iter()
.filter(|e| now - e.value().last_active_at.load(Ordering::Relaxed) > ttl)
.map(|e| e.key().clone())
.collect();
for fqn in to_evict {
manager.evict_engine(&fqn).await;
}
}
_ = shutdown_rx.changed() => {
info!("Reaper loop shutting down");
break;
}
}
}
}
}
pub fn start_discovery_loop(
self: &Arc<Self>,
mut shutdown_rx: tokio::sync::watch::Receiver<()>,
) -> impl std::future::Future<Output = ()> + Send + 'static {
let manager = Arc::clone(self);
async move {
let mut ticker = interval(Duration::from_secs(DISCOVERY_INTERVAL_SECS));
ticker.tick().await;
loop {
tokio::select! {
_ = ticker.tick() => {
if let Err(e) = manager.registry.refresh().await {
warn!("Registry discovery refresh failed: {}", e);
}
for reg in manager.registry.list_active() {
manager.ensure_catalog_registered(®.namespace.catalog);
}
}
_ = shutdown_rx.changed() => {
info!("Discovery loop shutting down");
break;
}
}
}
}
}
pub fn query_ctx(&self) -> &Arc<SessionContext> {
&self.query_ctx
}
pub fn registry(&self) -> &Arc<DatasetRegistry> {
&self.registry
}
pub fn active_engine_count(&self) -> usize {
self.active_engines.len()
}
fn ensure_catalog_registered(&self, catalog: &str) {
self.query_ctx.register_catalog(
catalog,
Arc::clone(&self.catalog_provider) as Arc<dyn datafusion::catalog::CatalogProvider>,
);
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct CatalogSummary {
pub catalog: String,
pub schema_count: u32,
pub table_count: u32,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct SchemaSummary {
pub catalog: String,
pub schema_name: String,
pub table_count: u32,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct TableSummaryInfo {
pub catalog: String,
pub schema_name: String,
pub table: String,
pub status: String,
pub created_at: String,
pub updated_at: String,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct ColumnDetail {
pub name: String,
pub arrow_type: String,
pub nullable: bool,
pub is_partition: bool,
pub is_system: bool,
}
pub struct TableDetail {
pub registration: DatasetRegistration,
pub columns: Vec<ColumnDetail>,
pub stats: stats::TableStats,
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::AsArray;
use arrow::datatypes::{DataType, Field, Int64Type, Schema, TimeUnit};
use scouter_types::dataset::{DatasetFingerprint, DatasetRegistration};
use tempfile::TempDir;
fn test_storage_settings(dir: &TempDir) -> ObjectStorageSettings {
ObjectStorageSettings {
storage_uri: dir.path().to_str().unwrap().to_string(),
storage_type: scouter_types::StorageType::Local,
region: "us-east-1".to_string(),
trace_compaction_interval_hours: 24,
trace_flush_interval_secs: 5,
trace_refresh_interval_secs: 10,
}
}
fn test_schema() -> Schema {
Schema::new(vec![
Field::new("user_id", DataType::Utf8, false),
Field::new("score", DataType::Float64, false),
Field::new("model_name", DataType::Utf8, true),
Field::new(
"scouter_created_at",
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
false,
),
Field::new("scouter_partition_date", DataType::Date32, false),
Field::new("scouter_batch_id", DataType::Utf8, false),
])
}
fn test_registration(schema: &Schema) -> DatasetRegistration {
let arrow_schema_json = serde_json::to_string(schema).unwrap();
let fingerprint = DatasetFingerprint::from_schema_json(&arrow_schema_json);
let namespace =
DatasetNamespace::new("test_catalog", "test_schema", "predictions").unwrap();
DatasetRegistration::new(
namespace,
fingerprint,
arrow_schema_json,
"{}".to_string(),
vec![],
)
}
fn make_test_batch(schema: &Schema) -> RecordBatch {
use arrow::array::*;
use chrono::{Datelike, Utc};
let now = Utc::now();
let epoch_days = now.date_naive().num_days_from_ce() - 719_163;
RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(StringArray::from(vec!["user_1", "user_2", "user_3"])),
Arc::new(Float64Array::from(vec![0.95, 0.87, 0.92])),
Arc::new(StringArray::from(vec![
Some("model_a"),
None,
Some("model_b"),
])),
Arc::new(
TimestampMicrosecondArray::from(vec![
now.timestamp_micros(),
now.timestamp_micros(),
now.timestamp_micros(),
])
.with_timezone("UTC"),
),
Arc::new(Date32Array::from(vec![epoch_days, epoch_days, epoch_days])),
Arc::new(StringArray::from(vec![
"batch-001",
"batch-001",
"batch-001",
])),
],
)
.unwrap()
}
#[tokio::test]
async fn test_register_and_insert() {
let dir = TempDir::new().unwrap();
let settings = test_storage_settings(&dir);
let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
.await
.unwrap();
let schema = test_schema();
let reg = test_registration(&schema);
let result = manager.register_dataset(®).await.unwrap();
assert_eq!(result, RegistrationResult::Created);
let result2 = manager.register_dataset(®).await.unwrap();
assert_eq!(result2, RegistrationResult::AlreadyExists);
assert_eq!(manager.active_engine_count(), 0);
let batch = make_test_batch(&schema);
manager
.insert_batch(®.namespace, ®.fingerprint, batch)
.await
.unwrap();
assert_eq!(manager.active_engine_count(), 1);
tokio::time::sleep(Duration::from_secs(2)).await;
manager.shutdown().await;
assert_eq!(manager.active_engine_count(), 0);
}
#[tokio::test]
async fn test_fingerprint_mismatch() {
let dir = TempDir::new().unwrap();
let settings = test_storage_settings(&dir);
let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
.await
.unwrap();
let schema = test_schema();
let reg = test_registration(&schema);
manager.register_dataset(®).await.unwrap();
let wrong_fp = DatasetFingerprint::from_schema_json("wrong");
let batch = make_test_batch(&schema);
let result = manager.insert_batch(®.namespace, &wrong_fp, batch).await;
assert!(result.is_err());
if let Err(DatasetEngineError::FingerprintMismatch { .. }) = result {
} else {
panic!("Expected FingerprintMismatch error");
}
manager.shutdown().await;
}
#[tokio::test]
async fn test_table_not_found() {
let dir = TempDir::new().unwrap();
let settings = test_storage_settings(&dir);
let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
.await
.unwrap();
let ns = DatasetNamespace::new("no", "such", "table").unwrap();
let fp = DatasetFingerprint::from_schema_json("x");
let schema = test_schema();
let batch = make_test_batch(&schema);
let result = manager.insert_batch(&ns, &fp, batch).await;
assert!(matches!(result, Err(DatasetEngineError::TableNotFound(_))));
manager.shutdown().await;
}
#[tokio::test]
async fn test_list_datasets() {
let dir = TempDir::new().unwrap();
let settings = test_storage_settings(&dir);
let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
.await
.unwrap();
assert!(manager.list_datasets().is_empty());
let schema = test_schema();
let reg = test_registration(&schema);
manager.register_dataset(®).await.unwrap();
let datasets = manager.list_datasets();
assert_eq!(datasets.len(), 1);
assert_eq!(
datasets[0].namespace.fqn(),
"test_catalog.test_schema.predictions"
);
manager.shutdown().await;
}
#[tokio::test]
async fn test_multiple_tables_isolation() {
let dir = TempDir::new().unwrap();
let settings = test_storage_settings(&dir);
let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
.await
.unwrap();
let schema = test_schema();
let ns1 = DatasetNamespace::new("cat", "sch", "table_a").unwrap();
let ns2 = DatasetNamespace::new("cat", "sch", "table_b").unwrap();
let arrow_json = serde_json::to_string(&schema).unwrap();
let fp = DatasetFingerprint::from_schema_json(&arrow_json);
let reg1 = DatasetRegistration::new(
ns1.clone(),
fp.clone(),
arrow_json.clone(),
"{}".into(),
vec![],
);
let reg2 = DatasetRegistration::new(
ns2.clone(),
fp.clone(),
arrow_json.clone(),
"{}".into(),
vec![],
);
manager.register_dataset(®1).await.unwrap();
manager.register_dataset(®2).await.unwrap();
let batch1 = make_test_batch(&schema);
let batch2 = make_test_batch(&schema);
manager.insert_batch(&ns1, &fp, batch1).await.unwrap();
manager.insert_batch(&ns2, &fp, batch2).await.unwrap();
assert_eq!(manager.active_engine_count(), 2);
manager.shutdown().await;
}
#[tokio::test]
async fn test_max_active_engines_cap() {
let dir = TempDir::new().unwrap();
let settings = test_storage_settings(&dir);
let manager = DatasetEngineManager::with_config(&settings, 1800, 2, 1, 100, 30)
.await
.unwrap();
let schema = test_schema();
let arrow_json = serde_json::to_string(&schema).unwrap();
let fp = DatasetFingerprint::from_schema_json(&arrow_json);
for i in 0..3 {
let ns = DatasetNamespace::new("cat", "sch", format!("tbl_{i}")).unwrap();
let reg =
DatasetRegistration::new(ns, fp.clone(), arrow_json.clone(), "{}".into(), vec![]);
manager.register_dataset(®).await.unwrap();
}
let ns0 = DatasetNamespace::new("cat", "sch", "tbl_0").unwrap();
let ns1 = DatasetNamespace::new("cat", "sch", "tbl_1").unwrap();
let ns2 = DatasetNamespace::new("cat", "sch", "tbl_2").unwrap();
manager
.insert_batch(&ns0, &fp, make_test_batch(&schema))
.await
.unwrap();
manager
.insert_batch(&ns1, &fp, make_test_batch(&schema))
.await
.unwrap();
assert_eq!(manager.active_engine_count(), 2);
manager
.insert_batch(&ns2, &fp, make_test_batch(&schema))
.await
.unwrap();
assert_eq!(manager.active_engine_count(), 2);
manager.shutdown().await;
}
#[tokio::test]
async fn test_write_and_query() {
let dir = TempDir::new().unwrap();
let settings = test_storage_settings(&dir);
let manager = DatasetEngineManager::with_config(
&settings, 1800, 10, 1, 100, 30,
)
.await
.unwrap();
let schema = test_schema();
let reg = test_registration(&schema);
manager.register_dataset(®).await.unwrap();
let batch = make_test_batch(&schema);
manager
.insert_batch(®.namespace, ®.fingerprint, batch)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(3)).await;
let sql = "SELECT COUNT(*) as cnt FROM test_catalog.test_schema.predictions";
let results = manager.query(sql).await.unwrap();
assert!(!results.is_empty());
let count_col = results[0]
.column_by_name("cnt")
.unwrap()
.as_primitive_opt::<Int64Type>()
.unwrap();
assert_eq!(count_col.value(0), 3);
manager.shutdown().await;
}
#[tokio::test]
async fn test_registry_persistence() {
let dir = TempDir::new().unwrap();
let settings = test_storage_settings(&dir);
{
let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
.await
.unwrap();
let schema = test_schema();
let reg = test_registration(&schema);
manager.register_dataset(®).await.unwrap();
manager.shutdown().await;
}
{
let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 100, 30)
.await
.unwrap();
let datasets = manager.list_datasets();
assert_eq!(datasets.len(), 1);
assert_eq!(
datasets[0].namespace.fqn(),
"test_catalog.test_schema.predictions"
);
manager.shutdown().await;
}
}
}