use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use crate::{
platform::Platform,
query::result::{QueryResult, QueryRow},
schema::{SchemaManager, TableSchema},
storage::sstable_data_manager::{
CacheStatistics, DataRow, SSTableDataManager, SSTableDataManagerConfig, TableDiscovery,
TableInfo,
},
Config, Error, Result, Value,
};
#[derive(Debug, Clone)]
pub struct ReplDataConfig {
pub data_manager_config: SSTableDataManagerConfig,
pub default_timeout_seconds: u64,
pub auto_detect_schema: bool,
pub max_rows_per_query: usize,
pub enable_query_cache: bool,
pub query_cache_ttl_seconds: u64,
}
impl Default for ReplDataConfig {
fn default() -> Self {
Self {
data_manager_config: SSTableDataManagerConfig::default(),
default_timeout_seconds: 30,
auto_detect_schema: true,
max_rows_per_query: 10000,
enable_query_cache: true,
query_cache_ttl_seconds: 300,
}
}
}
#[derive(Debug, Clone)]
pub struct QueryContext {
pub keyspace: Option<String>,
pub timeout: Duration,
pub limit: Option<usize>,
pub timing_enabled: bool,
pub page_size: Option<usize>,
pub page_offset: usize,
}
impl Default for QueryContext {
fn default() -> Self {
Self {
keyspace: None,
timeout: Duration::from_secs(30),
limit: Some(100),
timing_enabled: false,
page_size: Some(50),
page_offset: 0,
}
}
}
#[derive(Debug, Clone)]
pub struct ReplQueryResult {
pub result: QueryResult,
pub metadata: QueryMetadata,
pub schema: Option<TableSchema>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryMetadata {
pub execution_time: Duration,
pub rows_returned: usize,
pub total_rows_available: Option<usize>,
pub from_cache: bool,
pub source_files: Vec<PathBuf>,
pub bytes_read: u64,
pub cache_hit_ratio: f64,
}
#[derive(Debug, Clone)]
pub struct TableListing {
pub keyspace: String,
pub tables: Vec<TableSummary>,
pub discovered_at: Instant,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TableSummary {
pub name: String,
pub estimated_rows: usize,
pub size_bytes: u64,
pub sstable_count: usize,
pub has_schema: bool,
pub last_modified: Option<std::time::SystemTime>,
pub health_status: String,
}
pub struct ReplDataApi {
config: ReplDataConfig,
data_manager: Arc<SSTableDataManager>,
query_context: Arc<RwLock<QueryContext>>,
query_cache: Arc<RwLock<HashMap<String, (ReplQueryResult, Instant)>>>,
discovery_cache: Arc<RwLock<Option<(TableDiscovery, Instant)>>>,
}
impl ReplDataApi {
pub async fn new(
config: ReplDataConfig,
platform: Arc<Platform>,
core_config: Config,
schema_manager: Arc<SchemaManager>,
) -> Result<Self> {
let data_manager = Arc::new(
SSTableDataManager::new(
config.data_manager_config.clone(),
platform,
core_config,
schema_manager,
)
.await?,
);
Ok(Self {
config,
data_manager,
query_context: Arc::new(RwLock::new(QueryContext::default())),
query_cache: Arc::new(RwLock::new(HashMap::new())),
discovery_cache: Arc::new(RwLock::new(None)),
})
}
pub async fn initialize(&self, data_dir: &Path) -> Result<TableDiscovery> {
let discovery = self.data_manager.discover_tables(data_dir).await?;
{
let mut cache = self.discovery_cache.write().await;
*cache = Some((discovery.clone(), Instant::now()));
}
Ok(discovery)
}
pub async fn use_keyspace(&self, keyspace: &str) -> Result<()> {
let keyspaces = self.list_keyspaces().await?;
if !keyspaces.contains(&keyspace.to_string()) {
return Err(Error::CqlParse(format!(
"Keyspace '{}' does not exist",
keyspace
)));
}
let mut context = self.query_context.write().await;
context.keyspace = Some(keyspace.to_string());
Ok(())
}
pub async fn current_keyspace(&self) -> Option<String> {
let context = self.query_context.read().await;
context.keyspace.clone()
}
pub async fn select(
&self,
table: &str,
columns: Option<Vec<String>>,
where_clause: Option<String>,
limit: Option<usize>,
) -> Result<ReplQueryResult> {
let start_time = Instant::now();
let context = self.query_context.read().await;
let keyspace = context.keyspace.as_ref().ok_or_else(|| {
Error::InvalidState("No keyspace selected. Use 'USE keyspace;' first.".to_string())
})?;
let effective_limit = limit
.or(context.limit)
.map(|l| l.min(self.config.max_rows_per_query))
.unwrap_or(self.config.max_rows_per_query);
if self.config.enable_query_cache {
let cache_key = format!(
"{}:{}:{}:{:?}:{:?}",
keyspace,
table,
columns.as_ref().map(|c| c.join(",")).unwrap_or_default(),
where_clause,
effective_limit
);
if let Some((cached_result, cached_at)) = self.get_cached_query(&cache_key).await {
let cache_ttl = Duration::from_secs(self.config.query_cache_ttl_seconds);
if cached_at.elapsed() < cache_ttl {
let mut result = cached_result;
result.metadata.from_cache = true;
result.metadata.execution_time = start_time.elapsed();
return Ok(result);
}
}
}
let rows = self
.data_manager
.query_data(
keyspace,
table,
where_clause.as_deref(),
Some(effective_limit),
)
.await?;
let schema = self.data_manager.get_table_schema(keyspace, table).await?;
let query_result = self.convert_to_query_result(rows.clone(), &columns, &schema)?;
let metadata = QueryMetadata {
execution_time: start_time.elapsed(),
rows_returned: rows.len(),
total_rows_available: None, from_cache: false,
source_files: rows
.iter()
.map(|r| r.metadata.source_file.clone())
.collect(),
bytes_read: self.estimate_bytes_read(&rows),
cache_hit_ratio: self.calculate_cache_hit_ratio().await,
};
let result = ReplQueryResult {
result: query_result,
metadata,
schema,
};
if self.config.enable_query_cache {
let cache_key = format!(
"{}:{}:{}:{:?}:{:?}",
keyspace,
table,
columns.as_ref().map(|c| c.join(",")).unwrap_or_default(),
where_clause,
effective_limit
);
self.cache_query_result(cache_key, result.clone()).await;
}
Ok(result)
}
pub async fn list_keyspaces(&self) -> Result<Vec<String>> {
self.data_manager.list_keyspaces().await
}
pub async fn list_tables(&self, keyspace: Option<&str>) -> Result<TableListing> {
let target_keyspace = if let Some(ks) = keyspace {
ks.to_string()
} else {
let context = self.query_context.read().await;
context
.keyspace
.as_ref()
.ok_or_else(|| Error::InvalidState("No keyspace selected".to_string()))?
.clone()
};
let table_names = self.data_manager.list_tables(&target_keyspace).await?;
let mut tables = Vec::new();
for table_name in table_names {
if let Ok(Some(_schema)) = self
.data_manager
.get_table_schema(&target_keyspace, &table_name)
.await
{
let discovery = self.get_discovery_cache().await;
if let Some((ref discovery_data, _)) = discovery {
for keyspace_info in &discovery_data.keyspaces {
if keyspace_info.name == target_keyspace {
for table_info in &keyspace_info.tables {
if table_info.name == table_name {
let summary = TableSummary {
name: table_name.clone(),
estimated_rows: table_info.estimated_rows,
size_bytes: table_info.total_size_bytes,
sstable_count: table_info.sstable_files.len(),
has_schema: table_info.schema.is_some(),
last_modified: table_info.last_modified,
health_status: self.assess_table_health(table_info),
};
tables.push(summary);
break;
}
}
break;
}
}
}
}
}
Ok(TableListing {
keyspace: target_keyspace,
tables,
discovered_at: Instant::now(),
})
}
pub async fn describe_table(&self, table: &str, keyspace: Option<&str>) -> Result<TableSchema> {
let target_keyspace = if let Some(ks) = keyspace {
ks.to_string()
} else {
let context = self.query_context.read().await;
context
.keyspace
.as_ref()
.ok_or_else(|| Error::InvalidState("No keyspace selected".to_string()))?
.clone()
};
self.data_manager
.get_table_schema(&target_keyspace, table)
.await?
.ok_or_else(|| {
Error::Table(format!(
"Table {}.{} not found or no schema available",
target_keyspace, table
))
})
}
pub async fn get_system_info(&self) -> Result<SystemInfo> {
let cache_stats = self.data_manager.get_cache_stats();
let (discovery_in_progress, last_discovery) = self.data_manager.get_discovery_status();
let discovery_info = self.get_discovery_cache().await;
let (total_keyspaces, total_tables, total_sstables) =
if let Some((ref discovery, _)) = discovery_info {
(
discovery.keyspaces.len(),
discovery.keyspaces.iter().map(|ks| ks.tables.len()).sum(),
discovery.total_sstables,
)
} else {
(0, 0, 0)
};
let memory_usage_mb = cache_stats.current_cache_size_bytes / (1024 * 1024);
Ok(SystemInfo {
total_keyspaces,
total_tables,
total_sstables,
cache_stats,
discovery_in_progress,
last_discovery_time: last_discovery,
memory_usage_mb,
active_connections: 1, })
}
pub async fn update_context(&self, updates: QueryContextUpdate) -> Result<()> {
let mut context = self.query_context.write().await;
if let Some(timeout) = updates.timeout_seconds {
context.timeout = Duration::from_secs(timeout);
}
if let Some(limit) = updates.limit {
context.limit = Some(limit.min(self.config.max_rows_per_query));
}
if let Some(timing) = updates.timing_enabled {
context.timing_enabled = timing;
}
if let Some(page_size) = updates.page_size {
context.page_size = Some(page_size);
}
Ok(())
}
pub async fn get_context(&self) -> QueryContext {
let context = self.query_context.read().await;
context.clone()
}
pub async fn clear_caches(&self) -> Result<()> {
{
let mut query_cache = self.query_cache.write().await;
query_cache.clear();
}
{
let mut discovery_cache = self.discovery_cache.write().await;
*discovery_cache = None;
}
Ok(())
}
async fn get_cached_query(&self, cache_key: &str) -> Option<(ReplQueryResult, Instant)> {
let cache = self.query_cache.read().await;
cache.get(cache_key).cloned()
}
async fn cache_query_result(&self, cache_key: String, result: ReplQueryResult) {
let mut cache = self.query_cache.write().await;
cache.insert(cache_key, (result, Instant::now()));
if cache.len() > 100 {
let oldest_key = cache
.iter()
.min_by_key(|(_, (_, time))| time)
.map(|(key, _)| key.clone());
if let Some(key) = oldest_key {
cache.remove(&key);
}
}
}
async fn get_discovery_cache(&self) -> Option<(TableDiscovery, Instant)> {
let cache = self.discovery_cache.read().await;
cache.clone()
}
fn convert_to_query_result(
&self,
rows: Vec<DataRow>,
requested_columns: &Option<Vec<String>>,
schema: &Option<TableSchema>,
) -> Result<QueryResult> {
let mut query_rows = Vec::new();
for data_row in rows {
let mut row_values = Vec::new();
let columns = if let Some(cols) = requested_columns {
cols.clone()
} else if let Some(schema) = schema {
schema.columns.iter().map(|c| c.name.clone()).collect()
} else {
data_row.columns.keys().cloned().collect()
};
for column_name in &columns {
let value = data_row
.columns
.get(column_name)
.cloned()
.unwrap_or(Value::Null);
row_values.push(value);
}
let query_row = QueryRow {
values: row_values
.into_iter()
.enumerate()
.map(|(i, value)| (format!("col_{i}"), value))
.collect(),
key: data_row.key.clone(),
metadata: crate::query::result::RowMetadata {
version: Some(data_row.metadata.generation),
ttl: data_row.metadata.ttl.map(|duration| duration.as_secs()),
tags: std::collections::HashMap::new(),
},
};
query_rows.push(query_row);
}
Ok(QueryResult {
rows: query_rows,
rows_affected: 0,
execution_time_ms: 0,
metadata: crate::query::result::QueryMetadata::default(),
})
}
fn estimate_bytes_read(&self, rows: &[DataRow]) -> u64 {
(rows.len() * 256) as u64
}
async fn calculate_cache_hit_ratio(&self) -> f64 {
let stats = self.data_manager.get_cache_stats();
let total = stats.cache_hits + stats.cache_misses;
if total > 0 {
stats.cache_hits as f64 / total as f64
} else {
0.0
}
}
fn assess_table_health(&self, table_info: &TableInfo) -> String {
let healthy_files = table_info
.sstable_files
.iter()
.filter(|f| {
f.health_status == crate::storage::sstable_data_manager::FileHealthStatus::Healthy
})
.count();
let total_files = table_info.sstable_files.len();
if healthy_files == total_files {
"Healthy".to_string()
} else if healthy_files > total_files / 2 {
"Degraded".to_string()
} else {
"Corrupted".to_string()
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemInfo {
pub total_keyspaces: usize,
pub total_tables: usize,
pub total_sstables: usize,
pub cache_stats: CacheStatistics,
pub discovery_in_progress: bool,
pub last_discovery_time: Option<Duration>,
pub memory_usage_mb: usize,
pub active_connections: usize,
}
#[derive(Debug, Clone, Default)]
pub struct QueryContextUpdate {
pub timeout_seconds: Option<u64>,
pub limit: Option<usize>,
pub timing_enabled: Option<bool>,
pub page_size: Option<usize>,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_repl_api_creation() {
let temp_dir = TempDir::new().unwrap();
let config = ReplDataConfig::default();
let core_config = Config::default();
let platform = Arc::new(Platform::new(&core_config).await.unwrap());
let schema_manager = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
let api = ReplDataApi::new(config, platform, core_config, schema_manager)
.await
.unwrap();
let context = api.get_context().await;
assert!(context.keyspace.is_none());
assert_eq!(context.limit, Some(100));
}
#[tokio::test]
async fn test_query_context_updates() {
let temp_dir = TempDir::new().unwrap();
let config = ReplDataConfig::default();
let core_config = Config::default();
let platform = Arc::new(Platform::new(&core_config).await.unwrap());
let schema_manager = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
let api = ReplDataApi::new(config, platform, core_config, schema_manager)
.await
.unwrap();
let updates = QueryContextUpdate {
timeout_seconds: Some(60),
limit: Some(200),
timing_enabled: Some(true),
page_size: Some(25),
};
api.update_context(updates).await.unwrap();
let context = api.get_context().await;
assert_eq!(context.timeout, Duration::from_secs(60));
assert_eq!(context.limit, Some(200));
assert!(context.timing_enabled);
assert_eq!(context.page_size, Some(25));
}
}