use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::discovery::DiscoveryService;
use crate::error::{Error, Result};
use crate::schema::{
aggregator::{AggregatorConfig, LoadResult, SchemaAggregator},
registry::{SchemaRegistry, SchemaRegistryConfig},
};
use crate::Config;
use crate::Database;
use crate::Platform;
#[derive(Debug, Clone)]
pub struct IngestionConfig {
pub schema_paths: Vec<PathBuf>,
pub data_dir: PathBuf,
pub version_hint: Option<String>,
pub core_config: Config,
pub table_directory_filter: Option<String>,
}
#[derive(Debug)]
pub struct IngestionResult {
pub database: Database,
pub schema_load_result: LoadResult,
pub discovery_summary: DiscoverySummary,
pub schema_registry: Arc<RwLock<SchemaRegistry>>,
}
#[derive(Debug, Clone)]
pub struct DiscoverySummary {
pub sstables_found: usize,
pub keyspaces: Vec<String>,
pub tables: Vec<String>,
pub table_directories: Vec<PathBuf>,
pub resolved_version: Option<String>,
}
pub async fn ingest(config: IngestionConfig) -> Result<IngestionResult> {
if !config.data_dir.exists() {
return Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!(
"Data directory does not exist: {}",
config.data_dir.display()
),
)));
}
if !config.data_dir.is_dir() {
return Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!(
"Data directory path is not a directory: {}",
config.data_dir.display()
),
)));
}
let platform = Arc::new(Platform::new(&config.core_config).await?);
let registry_config = SchemaRegistryConfig::default();
let schema_registry = Arc::new(RwLock::new(
SchemaRegistry::new(
registry_config,
platform.clone(),
config.core_config.clone(),
)
.await
.map_err(|e| Error::Schema(format!("Failed to create schema registry: {}", e)))?,
));
let udt_registry = schema_registry.read().await.get_udt_registry();
let aggregator_config = AggregatorConfig {
graceful_degradation: false, validate_udt_dependencies: true,
};
let mut aggregator = SchemaAggregator::new(
schema_registry.clone(),
udt_registry.clone(),
aggregator_config,
);
let schema_load_result = if !config.schema_paths.is_empty() {
aggregator
.load_from_paths(&config.schema_paths)
.await
.map_err(|e| Error::Schema(format!("Schema loading failed: {}", e)))?
} else {
LoadResult {
schemas_loaded: 0,
udts_loaded: 0,
errors: Vec::new(),
warnings: Vec::new(),
}
};
if !schema_load_result.errors.is_empty() {
let error_messages: Vec<String> = schema_load_result
.errors
.iter()
.map(|e| format!("{:?}: {}", e.error_type, e.message))
.collect();
return Err(Error::Schema(format!(
"Schema loading failed with {} error(s): {}",
schema_load_result.errors.len(),
error_messages.join("; ")
)));
}
let discovery_service = DiscoveryService::with_schema_registry(
config.data_dir.clone(),
config.version_hint.clone(),
schema_registry.clone(),
);
let service_summary = discovery_service.scan().await.map_err(|e| {
match e {
Error::Io(_) => e,
_ => Error::Io(std::io::Error::other(format!(
"SSTable discovery failed: {}",
e
))),
}
})?;
let filtered_table_dirs = if let Some(ref filter_pattern) = config.table_directory_filter {
service_summary
.table_directories
.iter()
.filter(|path| path.to_string_lossy().contains(filter_pattern))
.cloned()
.collect()
} else {
service_summary.table_directories.clone()
};
let database = Database::open_with_discovered_sstables_and_registry(
&config.data_dir,
filtered_table_dirs.clone(),
config.core_config.clone(),
Some(schema_registry.clone()),
)
.await
.map_err(|e| {
match e {
Error::Schema(_) => e,
Error::Io(_) => e,
#[cfg(feature = "state_machine")]
Error::QueryExecution(_) => e,
_ => Error::QueryExecution(format!("Database initialization failed: {}", e)),
}
})?;
let discovery_summary = DiscoverySummary {
sstables_found: service_summary.sstables_found,
keyspaces: service_summary.keyspaces,
tables: service_summary.tables,
table_directories: filtered_table_dirs,
resolved_version: service_summary.resolved_version,
};
Ok(IngestionResult {
database,
schema_load_result,
discovery_summary,
schema_registry: schema_registry.clone(),
})
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_ingest_invalid_data_dir() {
let config = IngestionConfig {
schema_paths: vec![],
data_dir: PathBuf::from("/nonexistent/path"),
version_hint: None,
core_config: Config::default(),
table_directory_filter: None,
};
let result = ingest(config).await;
assert!(result.is_err());
if let Err(Error::Io(io_err)) = result {
assert_eq!(io_err.kind(), std::io::ErrorKind::NotFound);
} else {
panic!("Expected Io error for nonexistent directory");
}
}
#[tokio::test]
async fn test_ingest_with_empty_schema_paths() {
let temp_dir = TempDir::new().unwrap();
let config = IngestionConfig {
schema_paths: vec![],
data_dir: temp_dir.path().to_path_buf(),
version_hint: Some("5.0".to_string()),
core_config: Config::default(),
table_directory_filter: None,
};
let result = ingest(config).await;
assert!(result.is_ok());
let ingestion_result = result.unwrap();
assert_eq!(ingestion_result.schema_load_result.schemas_loaded, 0);
assert_eq!(ingestion_result.schema_load_result.udts_loaded, 0);
}
}